杭州网站怎么制作wordpress运行加速
2026/4/6 7:45:48 网站建设 项目流程
杭州网站怎么制作,wordpress运行加速,数据型网站,昆山建设局网站表格下编写了一个完整的Python脚本#xff0c;功能呢主要用于监控Windows远程登录失败事件#xff0c;并在1小时内同一IP密码错误5次时#xff0c;通过Windows防火墙封禁该IP24小时#xff1a; 完整实现脚本Windows远程登录失败监控与自动封禁脚本 功能…编写了一个完整的Python脚本功能呢主要用于监控Windows远程登录失败事件并在1小时内同一IP密码错误5次时通过Windows防火墙封禁该IP24小时完整实现脚本 Windows远程登录失败监控与自动封禁脚本 功能监控RDP登录失败1小时内同一IP失败5次则自动封禁24小时 importosimportsysimporttimeimportsqlite3importdatetimeimportsubprocessimportwin32evtlogimportwin32evtlogutilfromcollectionsimportdefaultdictfromthreadingimportThread,Lockimportloggingimportschedule# 配置日志logging.basicConfig(levellogging.INFO,format%(asctime)s - %(levelname)s - %(message)s,handlers[logging.FileHandler(rdp_monitor.log),logging.StreamHandler()])loggerlogging.getLogger(__name__)classRDPFailureMonitor:def__init__(self,db_pathrdp_monitor.db): 初始化监控器 Args: db_path: SQLite数据库路径 self.db_pathdb_path self.lockLock()self.setup_database()self.setup_firewall_rules()# 配置参数self.max_attempts5# 最大尝试次数self.time_window3600# 时间窗口秒- 1小时self.ban_duration86400# 封禁持续时间秒- 24小时defsetup_database(self):初始化数据库withsqlite3.connect(self.db_path)asconn:cursorconn.cursor()# 创建失败尝试记录表cursor.execute( CREATE TABLE IF NOT EXISTS failed_attempts ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip_address TEXT NOT NULL, username TEXT, attempt_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, event_id INTEGER ) )# 创建封禁记录表cursor.execute( CREATE TABLE IF NOT EXISTS banned_ips ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip_address TEXT NOT NULL UNIQUE, ban_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ban_duration INTEGER, unban_time TIMESTAMP, reason TEXT ) )# 创建索引以提高查询性能cursor.execute(CREATE INDEX IF NOT EXISTS idx_ip_time ON failed_attempts(ip_address, attempt_time))cursor.execute(CREATE INDEX IF NOT EXISTS idx_unban_time ON banned_ips(unban_time))conn.commit()defsetup_firewall_rules(self):设置防火墙规则组try:# 创建自定义防火墙规则组group_nameRDP_BLOCKED_IPSgroup_descAutomatically blocked IPs due to RDP brute force attacks# 检查规则组是否存在check_cmdfnetsh advfirewall firewall show rule nameRDP Monitor Groupresultsubprocess.run(check_cmd,shellTrue,capture_outputTrue,textTrue)if确定。notinresult.stdout:# 创建规则组create_groupf netsh advfirewall firewall add rule nameRDP Monitor Group dirin actionblock programany description{group_desc} group{group_name} subprocess.run(create_group,shellTrue,capture_outputTrue)logger.info(防火墙规则组创建成功)exceptExceptionase:logger.error(f设置防火墙规则组失败:{e})defget_event_logs(self,log_typeSecurity,event_id4625): 获取Windows事件日志 Args: log_type: 日志类型默认为Security event_id: 事件ID4625表示登录失败 Returns: 事件记录列表 events[]try:handwin32evtlog.OpenEventLog(None,log_type)flagswin32evtlog.EVENTLOG_BACKWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ totalwin32evtlog.GetNumberOfEventLogRecords(hand)logger.info(f正在读取{total}条安全事件日志...)events_listwin32evtlog.ReadEventLog(hand,flags,0)foreventinevents_list:ifevent.EventIDevent_id:event_data{}# 解析事件数据foriteminevent.StringInserts:ifitemand:initem:key,valueitem.split(:,1)event_data[key.strip()]value.strip()# 提取IP地址和用户名ip_addressevent_data.get(Source Network Address,)usernameevent_data.get(Target User Name,)ifip_addressandip_address!-:# 过滤掉本地登录events.append({time:event.TimeGenerated.Format(),ip:ip_address,username:username,event_id:event.EventID,raw_data:str(event.StringInserts)})win32evtlog.CloseEventLog(hand)exceptExceptionase:logger.error(f读取事件日志失败:{e})returneventsdefsave_failed_attempt(self,ip_address,username,event_id):保存失败尝试记录到数据库try:withsqlite3.connect(self.db_path)asconn:cursorconn.cursor()cursor.execute( INSERT INTO failed_attempts (ip_address, username, attempt_time, event_id) VALUES (?, ?, datetime(now), ?) ,(ip_address,username,event_id))conn.commit()logger.info(f记录失败尝试: IP{ip_address}, 用户{username})# 检查是否需要封禁self.check_and_block_ip(ip_address)exceptExceptionase:logger.error(f保存失败尝试记录失败:{e})defcheck_and_block_ip(self,ip_address): 检查IP是否需要封禁 Args: ip_address: 要检查的IP地址 try:withsqlite3.connect(self.db_path)asconn:cursorconn.cursor()# 查询1小时内该IP的失败次数query SELECT COUNT(*) as attempts FROM failed_attempts WHERE ip_address ? AND datetime(attempt_time) datetime(now, -1 hour) cursor.execute(query,(ip_address,))resultcursor.fetchone()attemptsresult[0]ifresultelse0logger.info(fIP{ip_address}在1小时内失败次数:{attempts})# 检查是否已经封禁cursor.execute( SELECT COUNT(*) FROM banned_ips WHERE ip_address ? AND unban_time datetime(now) ,(ip_address,))already_bannedcursor.fetchone()[0]0ifattemptsself.max_attemptsandnotalready_banned:# 需要封禁self.block_ip_with_firewall(ip_address)exceptExceptionase:logger.error(f检查IP封禁状态失败:{e})defblock_ip_with_firewall(self,ip_address): 使用Windows防火墙封禁IP Args: ip_address: 要封禁的IP地址 try:rule_namefBlock_RDP_Attacker_{ip_address.replace(.,_)}# 创建防火墙规则封禁该IPblock_cmdf netsh advfirewall firewall add rule name{rule_name} dirin actionblock protocolany remoteip{ip_address}descriptionBlocked due to RDP brute force attack enableyes profileany resultsubprocess.run(block_cmd,shellTrue,capture_outputTrue,textTrue)ifresult.returncode0:# 保存封禁记录到数据库self.save_ban_record(ip_address)logger.warning(f成功封禁IP:{ip_address})# 发送通知可选self.send_notification(ip_address)else:logger.error(f封禁IP失败:{result.stderr})exceptExceptionase:logger.error(f执行防火墙命令失败:{e})defsave_ban_record(self,ip_address):保存封禁记录到数据库try:withsqlite3.connect(self.db_path)asconn:cursorconn.cursor()# 计算解封时间ban_timedatetime.datetime.now()unban_timeban_timedatetime.timedelta(secondsself.ban_duration)cursor.execute( INSERT OR REPLACE INTO banned_ips (ip_address, ban_time, ban_duration, unban_time, reason) VALUES (?, ?, ?, ?, ?) ,(ip_address,ban_time,self.ban_duration,unban_time,RDP brute force attack detected))conn.commit()logger.info(f保存封禁记录: IP{ip_address})exceptExceptionase:logger.error(f保存封禁记录失败:{e})defunban_expired_ips(self):解除过期的封禁try:withsqlite3.connect(self.db_path)asconn:cursorconn.cursor()# 查询已过期的封禁cursor.execute( SELECT ip_address FROM banned_ips WHERE unban_time datetime(now) )expired_ipscursor.fetchall()for(ip_address,)inexpired_ips:# 删除防火墙规则rule_namefBlock_RDP_Attacker_{ip_address.replace(.,_)}unban_cmdfnetsh advfirewall firewall delete rule name{rule_name}resultsubprocess.run(unban_cmd,shellTrue,capture_outputTrue,textTrue)ifresult.returncode0:# 从数据库中删除记录cursor.execute(DELETE FROM banned_ips WHERE ip_address ?,(ip_address,))logger.info(f已解封IP:{ip_address})else:logger.error(f解封IP失败:{result.stderr})conn.commit()exceptExceptionase:logger.error(f解封过期IP失败:{e})defsend_notification(self,ip_address):发送通知可选功能try:# 可以发送邮件、写入系统日志、调用Webhook等messagef检测到RDP暴力破解攻击已封禁IP:{ip_address}# 写入Windows应用程序事件日志win32evtlogutil.ReportEvent(appNameRDP Monitor,eventID1001,eventTypewin32evtlog.EVENTLOG_WARNING_TYPE,strings[message],dataNone)logger.info(f已发送通知:{message})exceptExceptionase:logger.error(f发送通知失败:{e})defcleanup_old_records(self):清理旧的数据库记录try:withsqlite3.connect(self.db_path)asconn:cursorconn.cursor()# 删除48小时前的失败记录cursor.execute( DELETE FROM failed_attempts WHERE datetime(attempt_time) datetime(now, -48 hours) )deletedcursor.rowcount conn.commit()ifdeleted0:logger.info(f清理了{deleted}条旧的失败记录)exceptExceptionase:logger.error(f清理旧记录失败:{e})defmonitor_loop(self):监控主循环logger.info(开始监控RDP登录失败事件...)whileTrue:try:# 获取新的失败登录事件eventsself.get_event_logs()foreventinevents:self.save_failed_attempt(event[ip],event[username],event[event_id])# 每小时清理一次过期封禁self.unban_expired_ips()# 每6小时清理一次旧记录self.cleanup_old_records()# 休眠10秒后继续检查time.sleep(10)exceptKeyboardInterrupt:logger.info(监控已停止)breakexceptExceptionase:logger.error(f监控循环出错:{e})time.sleep(30)definstall_as_service():安装为Windows服务importwin32serviceutil script_pathos.path.abspath(__file__)python_pathsys.executable# 创建服务安装脚本service_scriptf import sys import os sys.path.insert(0, os.path.dirname(__file__)) from rdp_monitor import RDPFailureMonitor if __name__ __main__: monitor RDPFailureMonitor() monitor.monitor_loop() # 保存服务脚本service_fileos.path.join(os.path.dirname(__file__),rdp_service.py)withopen(service_file,w,encodingutf-8)asf:f.write(service_script)# 安装服务service_nameRDPFailureMonitorservice_display_nameRDP Login Failure Monitorinstall_cmdf{python_path} {service_file} install --startup autosubprocess.run(install_cmd,shellTrue)print(f服务 {service_display_name} 已安装)defmain():主函数importargparse parserargparse.ArgumentParser(descriptionWindows RDP登录失败监控器)parser.add_argument(--install,actionstore_true,help安装为Windows服务)parser.add_argument(--start,actionstore_true,help启动监控)parser.add_argument(--config,help配置文件路径)argsparser.parse_args()ifargs.install:install_as_service()else:# 直接运行监控monitorRDPFailureMonitor()# 立即执行一次检查和清理monitor.unban_expired_ips()monitor.cleanup_old_records()# 启动监控循环monitor.monitor_loop()if__name____main__:# 检查管理员权限try:importctypes is_adminctypes.windll.shell32.IsUserAnAdmin()except:is_adminFalseifnotis_admin:print(请以管理员身份运行此脚本)# 请求管理员权限ctypes.windll.shell32.ShellExecuteW(None,runas,sys.executable, .join(sys.argv),None,1)sys.exit()main()使用说明1. 安装依赖pipinstallpywin32 pipinstallschedule2. 运行监控# 以管理员身份运行PowerShellpython rdp_monitor.py--start3. 安装为Windows服务推荐# 安装服务python rdp_monitor.py--install# 启动服务scstartRDPFailureMonitor# 停止服务scstop RDPFailureMonitor# 查看服务状态scquery RDPFailureMonitor配置文件可选创建config.json{max_attempts:5,time_window:3600,ban_duration:86400,db_path:C:\\ProgramData\\RDPMonitor\\rdp_monitor.db,log_level:INFO,notification_email:adminexample.com,cleanup_interval:21600}关键功能说明事件监控监控Windows安全日志中的4625事件登录失败智能分析统计1小时内同一IP的失败次数自动封禁使用Windows防火墙自动封禁恶意IP自动解封24小时后自动解除封禁持久化存储使用SQLite数据库存储记录日志记录详细的日志记录便于排查注意事项必须以管理员身份运行因为需要读写事件日志和操作防火墙Windows防火墙必须启用事件日志服务必须运行建议在测试环境中先验证功能可以调整阈值参数以适应不同环境这个脚本是一个完整的解决方案可以有效地保护Windows服务器免受RDP暴力破解攻击。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询