邢台学校网站建设费用wordpress必备插件 代码
2026/4/6 9:32:14 网站建设 项目流程
邢台学校网站建设费用,wordpress必备插件 代码,如何卸载win上的wordpress,徐州html5响应式网站建设从零开始学 Python#xff1a;自动化 / 运维开发实战#xff08;核心库 3 大实战场景#xff09; 在运维工作中#xff0c;重复的服务器巡检、批量部署、日志分析等任务不仅耗时耗力#xff0c;还容易出现人为失误。而 Python 凭借丰富的运维核心库#xff0c;能轻松实…从零开始学 Python自动化 / 运维开发实战核心库 3 大实战场景在运维工作中重复的服务器巡检、批量部署、日志分析等任务不仅耗时耗力还容易出现人为失误。而 Python 凭借丰富的运维核心库能轻松实现这些任务的自动化大幅提升运维效率、降低操作风险。本文将从运维开发的核心库入手详细讲解paramikoSSH 远程操作、fabric批量运维、ansible自动化配置、psutil系统监控的使用再通过 3 个落地实战场景服务器批量巡检、自动化部署、日志分析工具帮你从零基础掌握 Python 自动化 / 运维开发实现运维工作的 “提质增效”。一、运维开发核心库开箱即用的运维工具Python 运维生态成熟以下核心库覆盖了远程操作、批量管理、系统监控等核心场景是运维自动化的基石。1.1 paramikoSSH 远程操作核心库paramiko是 Python 实现的 SSH2 协议库支持 SSH 远程登录、执行命令、上传下载文件无需依赖系统 SSH 客户端是实现单台 / 多台服务器远程操作的基础。前置条件安装 paramikopip install paramiko核心用法场景 1SSH 远程执行命令import paramiko def ssh_execute_command(host, port, username, password, command): SSH远程登录并执行命令 :param host: 服务器IP :param port: SSH端口 :param username: 登录用户名 :param password: 登录密码 :param command: 要执行的系统命令 :return: 命令执行结果 # 1. 创建SSH客户端对象 ssh_client paramiko.SSHClient() # 2. 自动添加未知主机密钥生产环境建议手动配置 ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # 3. 连接服务器 ssh_client.connect( hostnamehost, portport, usernameusername, passwordpassword, timeout10 ) # 4. 执行命令stdin: 输入, stdout: 输出, stderr: 错误 stdin, stdout, stderr ssh_client.exec_command(command) # 5. 获取执行结果 stdout_result stdout.read().decode(utf-8, errorsignore) stderr_result stderr.read().decode(utf-8, errorsignore) return { status: success, stdout: stdout_result, stderr: stderr_result } except Exception as e: return { status: failed, error: str(e) } finally: # 6. 关闭连接 ssh_client.close() # 调用示例 result ssh_execute_command( host192.168.1.100, port22, usernameroot, passwordyour_server_password, commanddf -h # 查看磁盘使用情况 ) # 打印结果 if result[status] success: print(命令执行成功输出结果) print(result[stdout]) if result[stderr]: print(命令执行警告/错误) print(result[stderr]) else: print(f命令执行失败{result[error]})场景 2SFTP 上传 / 下载文件import paramiko def sftp_upload_file(host, port, username, password, local_file, remote_file): SFTP上传本地文件到远程服务器 transport paramiko.Transport((host, port)) try: # 连接SFTP transport.connect(usernameusername, passwordpassword) sftp paramiko.SFTPClient.from_transport(transport) # 上传文件 sftp.put(local_file, remote_file) print(f文件 {local_file} 已成功上传到 {remote_file}) return True except Exception as e: print(f文件上传失败{str(e)}) return False finally: transport.close() def sftp_download_file(host, port, username, password, remote_file, local_file): SFTP从远程服务器下载文件到本地 transport paramiko.Transport((host, port)) try: transport.connect(usernameusername, passwordpassword) sftp paramiko.SFTPClient.from_transport(transport) # 下载文件 sftp.get(remote_file, local_file) print(f文件 {remote_file} 已成功下载到 {local_file}) return True except Exception as e: print(f文件下载失败{str(e)}) return False finally: transport.close() # 调用示例 sftp_upload_file( host192.168.1.100, port22, usernameroot, passwordyour_server_password, local_filelocal_test.txt, remote_file/root/remote_test.txt ) sftp_download_file( host192.168.1.100, port22, usernameroot, passwordyour_server_password, remote_file/root/remote_test.txt, local_filedownloaded_test.txt )1.2 fabric批量运维自动化工具fabric是基于paramiko封装的高级批量运维库支持批量执行命令、批量上传下载文件、任务编排语法更简洁是批量管理服务器的首选工具当前主流版本为Fabric 3兼容 Python3。前置条件安装 fabricpip install fabric3核心用法批量执行运维任务from fabric import Connection from invoke import Responder # 1. 定义服务器列表可从配置文件读取 servers [ {host: 192.168.1.100, port: 22, user: root, password: your_server_password}, {host: 192.168.1.101, port: 22, user: root, password: your_server_password} ] # 2. 定义批量执行命令的函数 def batch_execute_command(servers, command): 批量在多台服务器上执行命令 for server in servers: print(f 开始处理服务器 {server[host]} ) try: # 创建连接 conn Connection( hostserver[host], portserver[port], userserver[user], connect_kwargs{password: server[password]} ) # 执行命令支持sudo提权若需要 sudo_responder Responder( patternr\[sudo\] password for .*:, responsef{server[password]}\n ) # 执行普通命令 result conn.run(command, hideFalse, warnTrue) if result.ok: print(f服务器 {server[host]} 命令执行成功输出\n{result.stdout}) else: print(f服务器 {server[host]} 命令执行失败错误\n{result.stderr}) except Exception as e: print(f服务器 {server[host]} 连接/执行失败{str(e)}) finally: print(f 结束处理服务器 {server[host]} \n) # 3. 定义批量上传文件的函数 def batch_upload_file(servers, local_file, remote_file): 批量上传文件到多台服务器 for server in servers: print(f 开始上传文件到服务器 {server[host]} ) try: conn Connection( hostserver[host], portserver[port], userserver[user], connect_kwargs{password: server[password]} ) # 上传文件 conn.put(local_file, remote_file) print(f文件已成功上传到服务器 {server[host]} 的 {remote_file}) except Exception as e: print(f服务器 {server[host]} 文件上传失败{str(e)}) finally: print(f 结束上传服务器 {server[host]} \n) # 4. 调用批量任务 if __name__ __main__: # 批量执行查看内存命令 batch_execute_command(servers, free -h) # 批量上传配置文件 batch_upload_file(servers, app.conf, /etc/app.conf)1.3 Ansible自动化配置与运维Python 脚本集成Ansible 是一款强大的 IT 自动化工具基于 SSH 实现无代理批量运维支持配置管理、应用部署、任务编排其核心功能可通过 Python 脚本调用实现更灵活的自动化流程。前置条件安装 Ansible# Ubuntu/Debian apt install ansible # CentOS/RHEL yum install ansible # 或通过pip安装 pip install ansible核心用法Python 脚本调用 Ansibleimport ansible.runner import ansible.playbook import ansible.inventory # 1. 定义Ansible配置 inventory ansible.inventory.Inventory([192.168.1.100, 192.168.1.101]) private_key_file /root/.ssh/id_rsa # 免密登录密钥推荐 remote_user root # 2. 执行单个命令runner方式 def ansible_execute_command(hosts, command): 用Ansible批量执行命令 runner ansible.runner.Runner( pattern*, module_namecommand, module_argscommand, inventoryinventory, remote_userremote_user, private_key_fileprivate_key_file ) # 执行并获取结果 result runner.run() if result: for host, host_result in result[contacted].items(): print(f 服务器 {host} ) if stdout in host_result: print(f执行成功\n{host_result[stdout]}) else: print(f执行失败\n{host_result[msg]}) # 处理未连接的服务器 for host in result[dark]: print(f服务器 {host} 无法连接) # 3. 执行Ansible Playbook更复杂的任务编排 def ansible_run_playbook(playbook_path): 用Ansible执行Playbook pb ansible.playbook.PlayBook( playbookplaybook_path, inventoryinventory, remote_userremote_user, private_key_fileprivate_key_file ) # 执行Playbook result pb.run() print(fPlaybook执行完成结果{result}) # 4. 调用示例 if __name__ __main__: # 批量执行磁盘查看命令 ansible_execute_command([192.168.1.100, 192.168.1.101], df -h) # 执行Playbook需提前编写deploy.yml # ansible_run_playbook(deploy.yml)补充简易 Ansible Playbook 示例deploy.yml- hosts: all remote_user: root tasks: - name: 安装nginx yum: name: nginx state: present - name: 启动nginx并设置开机自启 service: name: nginx state: started enabled: yes1.4 psutil系统监控与信息采集psutil是 Python 跨平台系统监控库支持获取 CPU、内存、磁盘、网络、进程等系统信息无需调用系统命令返回结构化数据是服务器巡检、监控工具开发的核心库。前置条件安装 psutilpip install psutil核心用法获取系统核心信息import psutil import datetime def get_system_info(): 采集服务器核心系统信息 :return: 系统信息字典 system_info {} # 1. 基本系统信息 system_info[boot_time] datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime(%Y-%m-%d %H:%M:%S) # 2. CPU信息 cpu_count psutil.cpu_count(logicalTrue) # 逻辑CPU数 cpu_percent psutil.cpu_percent(interval1, percpuTrue) # 每个CPU使用率间隔1秒 system_info[cpu] { logical_count: cpu_count, total_percent: psutil.cpu_percent(interval0), per_cpu_percent: cpu_percent } # 3. 内存信息 mem psutil.virtual_memory() system_info[memory] { total_gb: round(mem.total / (1024**3), 2), # 总内存GB used_gb: round(mem.used / (1024**3), 2), # 已使用内存GB free_gb: round(mem.free / (1024**3), 2), # 空闲内存GB used_percent: mem.percent # 内存使用率 } # 4. 磁盘信息仅获取挂载的本地磁盘 disk_info {} for partition in psutil.disk_partitions(allFalse): if partition.fstype: disk_usage psutil.disk_usage(partition.mountpoint) disk_info[partition.mountpoint] { total_gb: round(disk_usage.total / (1024**3), 2), used_gb: round(disk_usage.used / (1024**3), 2), free_gb: round(disk_usage.free / (1024**3), 2), used_percent: disk_usage.percent } system_info[disk] disk_info # 5. 网络信息获取总收发流量 net_io psutil.net_io_counters() system_info[network] { sent_gb: round(net_io.bytes_sent / (1024**3), 2), recv_gb: round(net_io.bytes_recv / (1024**3), 2) } return system_info # 调用示例打印系统信息 if __name__ __main__: sys_info get_system_info() print( 服务器系统信息 ) print(f开机时间{sys_info[boot_time]}) print(fCPU使用率{sys_info[cpu][total_percent]}%) print(f内存使用率{sys_info[memory][used_percent]}%) print(f磁盘使用率/{sys_info[disk][/][used_percent]}%)二、运维实战场景 1服务器批量巡检脚本场景需求批量巡检多台服务器的 CPU、内存、磁盘使用率设置阈值告警如 CPU 使用率 80%、内存 90%、磁盘 95%将巡检结果输出到日志文件并生成简易报告。完整实现代码import psutil import datetime import logging from fabric import Connection # 1. 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(server_inspection.log, encodingutf-8), logging.StreamHandler() ] ) logger logging.getLogger(__name__) # 2. 配置项 SERVERS [ {host: 192.168.1.100, port: 22, user: root, password: your_server_password}, {host: 192.168.1.101, port: 22, user: root, password: your_server_password} ] # 告警阈值 THRESHOLDS { cpu: 80, memory: 90, disk: 95 } # 巡检报告保存路径 REPORT_PATH server_inspection_report.txt # 3. 本地巡检函数若脚本在目标服务器运行 def local_inspection(): 本地服务器巡检 try: sys_info get_system_info() return {status: success, data: sys_info} except Exception as e: logger.error(f本地巡检失败{str(e)}) return {status: failed, error: str(e)} # 4. 远程巡检函数通过fabric远程调用psutil需目标服务器已安装psutil def remote_inspection(server): 远程服务器巡检 logger.info(f开始巡检远程服务器{server[host]}) try: # 连接远程服务器 conn Connection( hostserver[host], portserver[port], userserver[user], connect_kwargs{password: server[password]} ) # 远程执行Python脚本采集系统信息 remote_script import psutil import datetime def get_system_info(): system_info {} system_info[boot_time] datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(%Y-%m-%d %H:%M:%S) system_info[cpu] {total_percent: psutil.cpu_percent(interval1)} mem psutil.virtual_memory() system_info[memory] {used_percent: mem.percent} disk_info {} for p in psutil.disk_partitions(allFalse): if p.fstype: du psutil.disk_usage(p.mountpoint) disk_info[p.mountpoint] {used_percent: du.percent} system_info[disk] disk_info return system_info print(get_system_info()) # 执行远程脚本并获取结果 result conn.run(fpython3 -c {remote_script}, hideTrue, warnTrue) if not result.ok: logger.error(f服务器 {server[host]} 执行脚本失败{result.stderr}) return {status: failed, host: server[host], error: result.stderr} # 解析结果简化处理实际可使用json序列化 import ast sys_info ast.literal_eval(result.stdout) logger.info(f服务器 {server[host]} 巡检成功) return {status: success, host: server[host], data: sys_info} except Exception as e: logger.error(f服务器 {server[host]} 巡检异常{str(e)}) return {status: failed, host: server[host], error: str(e)} # 5. 生成巡检报告 def generate_report(inspection_results): 生成巡检报告 now datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S) report [f# 服务器批量巡检报告 {now}, *50, ] for result in inspection_results: if result[status] success: host result[host] data result[data] report.append(f## 服务器{host}) report.append(f 开机时间{data[boot_time]}) report.append(f CPU使用率{data[cpu][total_percent]}% {[告警] if data[cpu][total_percent] THRESHOLDS[cpu] else }) report.append(f 内存使用率{data[memory][used_percent]}% {[告警] if data[memory][used_percent] THRESHOLDS[memory] else }) report.append( 磁盘使用率) for mount_point, disk_data in data[disk].items(): alarm [告警] if disk_data[used_percent] THRESHOLDS[disk] else report.append(f {mount_point}{disk_data[used_percent]}% {alarm}) else: report.append(f## 服务器{result[host]} [巡检失败]) report.append(f 错误信息{result[error]}) report.append() # 写入报告文件 with open(REPORT_PATH, w, encodingutf-8) as f: f.write(\n.join(report)) logger.info(f巡检报告已生成保存路径{REPORT_PATH}) # 6. 核心系统信息采集与前文一致简化版 def get_system_info(): system_info {} system_info[boot_time] datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime(%Y-%m-%d %H:%M:%S) system_info[cpu] {total_percent: psutil.cpu_percent(interval1)} mem psutil.virtual_memory() system_info[memory] {used_percent: mem.percent} disk_info {} for partition in psutil.disk_partitions(allFalse): if partition.fstype: disk_usage psutil.disk_usage(partition.mountpoint) disk_info[partition.mountpoint] {used_percent: disk_usage.percent} system_info[disk] disk_info return system_info # 7. 主函数批量巡检 def main(): logger.info( 开始服务器批量巡检 ) inspection_results [] # 遍历所有服务器进行巡检 for server in SERVERS: result remote_inspection(server) inspection_results.append(result) # 生成巡检报告 generate_report(inspection_results) logger.info( 服务器批量巡检结束 ) if __name__ __main__: main()使用说明修改SERVERS配置中的服务器 IP、账号、密码目标服务器需安装 Python3 和psutil可通过批量命令提前安装pip3 install psutil运行脚本后生成server_inspection.log巡检日志和server_inspection_report.txt巡检报告超过阈值的指标会标注[告警]可后续扩展邮件 / 钉钉告警功能。三、运维实战场景 2自动化部署脚本场景需求实现 Python/Web 项目的自动化部署核心流程「从 Git 拉取最新代码」→「安装 / 更新依赖」→「项目打包可选」→「停止旧服务」→「启动新服务」→「验证服务可用性」。完整实现代码import logging from fabric import Connection from datetime import datetime # 1. 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(deploy_automation.log, encodingutf-8), logging.StreamHandler() ] ) logger logging.getLogger(__name__) # 2. 部署配置 DEPLOY_CONFIG { server: { host: 192.168.1.100, port: 22, user: root, password: your_server_password }, project: { name: my_python_project, git_repo: https://github.com/your-name/your-project.git, deploy_path: /opt/projects, venv_path: /opt/venv, # 虚拟环境路径 requirements_file: requirements.txt, service_name: my_project.service # systemd服务名 } } # 3. 定义单个部署步骤 def git_pull_code(conn, project_config): 从Git拉取最新代码 project_path f{project_config[deploy_path]}/{project_config[name]} logger.info(f开始拉取代码项目路径{project_path}) # 检查项目目录是否存在不存在则克隆 result conn.run(ftest -d {project_path}, warnTrue) if result.failed: logger.info(项目目录不存在开始克隆仓库) conn.run(fgit clone {project_config[git_repo]} {project_path}, hideFalse) else: logger.info(项目目录已存在拉取最新代码) conn.run(fcd {project_path} git pull, hideFalse) logger.info(代码拉取完成) def install_dependencies(conn, project_config): 安装/更新项目依赖 project_path f{project_config[deploy_path]}/{project_config[name]} venv_pip f{project_config[venv_path]}/bin/pip requirements_path f{project_path}/{project_config[requirements_file]} logger.info(开始安装/更新项目依赖) result conn.run( f{venv_pip} install -r {requirements_path} --upgrade, hideFalse, warnTrue ) if result.ok: logger.info(依赖安装/更新完成) else: logger.error(f依赖安装失败{result.stderr}) raise Exception(依赖安装步骤失败) def stop_old_service(conn, project_config): 停止旧服务 logger.info(开始停止旧服务) result conn.run( fsystemctl stop {project_config[service_name]}, warnTrue, hideFalse ) if result.ok or Unit * does not exist in result.stderr: logger.info(旧服务已停止或服务未存在) else: logger.error(f停止旧服务失败{result.stderr}) raise Exception(停止旧服务步骤失败) def start_new_service(conn, project_config): 启动新服务 logger.info(开始启动新服务) # 重新加载systemd配置若服务文件有修改 conn.run(systemctl daemon-reload, hideFalse) # 启动服务并设置开机自启 result conn.run( fsystemctl start {project_config[service_name]} systemctl enable {project_config[service_name]}, hideFalse, warnTrue ) if result.ok: logger.info(新服务启动成功并设置开机自启) else: logger.error(f启动新服务失败{result.stderr}) raise Exception(启动新服务步骤失败) def verify_service(conn, project_config): 验证服务可用性示例检查服务状态访问接口 logger.info(开始验证服务可用性) # 1. 检查systemd服务状态 service_result conn.run( fsystemctl is-active {project_config[service_name]}, hideTrue, warnTrue ) if service_result.stdout.strip() ! active: logger.error(f服务未正常运行状态{service_result.stdout}) raise Exception(服务验证失败服务未激活) # 2. 可选访问项目接口验证示例curl访问本地端口 api_result conn.run( fcurl -s -w %{{http_code}} http://127.0.0.1:8000/health -o /dev/null, hideTrue, warnTrue ) if api_result.stdout.strip() 200: logger.info(服务接口验证成功返回200 OK) else: logger.warning(f服务接口验证返回非200状态码{api_result.stdout}) # 4. 主部署流程 def automated_deploy(): 自动化部署主流程 logger.info( 开始项目自动化部署 ) server_config DEPLOY_CONFIG[server] project_config DEPLOY_CONFIG[project] # 创建服务器连接 try: conn Connection( hostserver_config[host], portserver_config[port], userserver_config[user], connect_kwargs{password: server_config[password]} ) # 执行部署步骤按顺序编排 git_pull_code(conn, project_config) install_dependencies(conn, project_config) stop_old_service(conn, project_config) start_new_service(conn, project_config) verify_service(conn, project_config) logger.info( 项目自动化部署全部完成 ) return True except Exception as e: logger.error(f 自动化部署失败{str(e)} ) return False if __name__ __main__: automated_deploy()补充说明项目需使用systemd管理服务创建my_project.service放在/etc/systemd/system/目标服务器需安装 Git、Python3、虚拟环境可提前批量配置若为 Java 项目可修改步骤为「打包mvn package」→「替换 JAR 包」→「重启服务」可扩展回滚功能部署前备份旧代码 / 旧包部署失败时恢复。示例 systemd 服务文件my_project.service[Unit] DescriptionMy Python Project Service Afternetwork.target [Service] Userroot WorkingDirectory/opt/projects/my_python_project ExecStart/opt/venv/bin/python3 app.py Restarton-failure RestartSec5 [Install] WantedBymulti-user.target四、运维实战场景 3日志分析工具场景需求开发一款通用日志分析工具支持「按关键词过滤日志」→「统计指定报错信息出现次数」→「按时间范围筛选日志」→「生成分析报告」适用于 Nginx、Python、Java 等各类日志文件。完整实现代码import logging import re from datetime import datetime from collections import Counter # 1. 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[logging.StreamHandler()] ) logger logging.getLogger(__name__) # 2. 日志分析核心功能 class LogAnalyzer: def __init__(self, log_file_path): self.log_file_path log_file_path self.all_log_lines [] # 存储所有日志行 self.load_log_file() def load_log_file(self): 加载日志文件到内存大文件可改为逐行处理 logger.info(f开始加载日志文件{self.log_file_path}) try: with open(self.log_file_path, r, encodingutf-8, errorsignore) as f: self.all_log_lines [line.strip() for line in f if line.strip()] logger.info(f日志文件加载完成共 {len(self.all_log_lines)} 行有效日志) except FileNotFoundError: logger.error(f日志文件不存在{self.log_file_path}) raise except Exception as e: logger.error(f加载日志文件失败{str(e)}) raise def filter_by_keyword(self, keywords, exclude_keywordsNone): 按关键词过滤日志 :param keywords: 包含的关键词列表任意匹配 :param exclude_keywords: 排除的关键词列表任意匹配 :return: 过滤后的日志列表 if exclude_keywords is None: exclude_keywords [] filtered_lines [] for line in self.all_log_lines: # 检查是否包含任意关键词 include_flag any(keyword.lower() in line.lower() for keyword in keywords) # 检查是否排除任意关键词 exclude_flag any(exclude_keyword.lower() in line.lower() for exclude_keyword in exclude_keywords) if include_flag and not exclude_flag: filtered_lines.append(line) logger.info(f按关键词过滤完成共 {len(filtered_lines)} 行匹配日志) return filtered_lines def count_error_messages(self, error_patterns): 统计报错信息出现次数 :param error_patterns: 报错正则表达式列表 :return: 报错统计结果Counter error_messages [] for line in self.all_log_lines: for pattern in error_patterns: match re.search(pattern, line, re.IGNORECASE) if match: error_msg match.group(0) if match.group(0) else line[:100] # 截取前100字符 error_messages.append(error_msg) break error_counter Counter(error_messages) logger.info(f报错信息统计完成共识别 {len(error_counter)} 种不同报错) return error_counter def filter_by_time_range(self, start_time, end_time, time_patternr\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}): 按时间范围筛选日志 :param start_time: 开始时间字符串格式匹配time_pattern :param end_time: 结束时间字符串格式匹配time_pattern :param time_pattern: 日志中的时间格式正则 :return: 过滤后的日志列表 # 解析开始/结束时间 try: start_dt datetime.strptime(start_time, %Y-%m-%d %H:%M:%S) end_dt datetime.strptime(end_time, %Y-%m-%d %H:%M:%S) except ValueError: logger.error(时间格式错误需符合YYYY-MM-DD HH:MM:SS) raise filtered_lines [] for line in self.all_log_lines: # 提取日志中的时间 time_match re.search(time_pattern, line) if not time_match: continue try: log_dt datetime.strptime(time_match.group(0), %Y-%m-%d %H:%M:%S) if start_dt log_dt end_dt: filtered_lines.append(line) except ValueError: continue logger.info(f按时间范围过滤完成共 {len(filtered_lines)} 行匹配日志) return filtered_lines def generate_analysis_report(self, report_path, keywordsNone, error_patternsNone, time_rangeNone): 生成日志分析报告 :param report_path: 报告保存路径 :param keywords: 过滤关键词 :param error_patterns: 报错正则 :param time_range: 时间范围(start_time, end_time) if keywords is None: keywords [] if error_patterns is None: error_patterns [rerror|exception|fail|fatal, r500 Internal Server Error, r404 Not Found] if time_range is None: time_range (, ) logger.info(f开始生成日志分析报告{report_path}) report [ f# 日志分析报告, f生成时间{datetime.now().strftime(%Y-%m-%d %H:%M:%S)}, f日志文件{self.log_file_path}, f日志总行数{len(self.all_log_lines)}, *60, ] # 1. 关键词过滤结果若指定关键词 if keywords: keyword_lines self.filter_by_keyword(keywords) report.append(f## 一、关键词过滤结果关键词{,.join(keywords)}) report.append(f匹配行数{len(keyword_lines)}) report.append(前10条匹配日志) for i, line in enumerate(keyword_lines[:10]): report.append(f {i1}. {line}) report.append() # 2. 报错信息统计 error_counter self.count_error_messages(error_patterns) report.append(f## 二、报错信息统计) report.append(f报错类型数{len(error_counter)}) report.append(f报错总次数{sum(error_counter.values())}) report.append(报错Top10排名) for i, (error_msg, count) in enumerate(error_counter.most_common(10), 1): report.append(f {i}. 「{error_msg}」{count} 次) report.append() # 3. 时间范围过滤结果若指定时间范围 if all(time_range): time_lines self.filter_by_time_range(time_range[0], time_range[1]) report.append(f## 三、时间范围过滤结果{time_range[0]} - {time_range[1]}) report.append(f匹配行数{len(time_lines)}) report.append() # 4. 总结 report.append(f## 四、分析总结) if sum(error_counter.values()) 0: report.append(未检测到明显报错信息日志整体正常。) else: report.append(f检测到 {sum(error_counter.values())} 次报错需重点关注Top3报错类型。) # 写入报告文件 with open(report_path, w, encodingutf-8) as f: f.write(\n.join(report)) logger.info(f日志分析报告生成完成保存路径{report_path}) # 3. 主函数使用日志分析工具 def main(): # 配置项 LOG_FILE /var/log/nginx/access.log # 日志文件路径 REPORT_FILE log_analysis_report.txt # 初始化日志分析器 try: analyzer LogAnalyzer(LOG_FILE) # 生成分析报告可自定义关键词、报错正则、时间范围 analyzer.generate_analysis_report( report_pathREPORT_FILE, keywords[404, 500], time_range(2024-01-01 00:00:00, 2024-01-31 23:59:59) ) except Exception as e: logger.error(f日志分析失败{str(e)}) if __name__ __main__: main()使用说明修改LOG_FILE为目标日志文件路径Nginx、Python、Java 日志均可支持自定义keywords过滤关键词、error_patterns报错正则、time_range时间范围大日志文件可优化为「逐行处理」避免内存溢出生成的log_analysis_report.txt包含详细分析结果可直接用于运维排查。五、总结本文系统讲解了 Python 自动化 / 运维开发的核心库和 3 个落地实战场景核心要点总结如下核心运维库paramiko实现基础 SSH/SFTP 操作fabric简化批量运维ansible适合复杂配置编排psutil实现系统信息采集四者覆盖运维核心需求批量巡检脚本基于fabricpsutil实现多服务器指标采集设置阈值告警生成结构化报告解决重复巡检痛点自动化部署脚本按「拉取代码→安装依赖→启停服务→验证可用性」编排任务支持扩展回滚、多环境部署提升部署效率和一致性日志分析工具基于re正则和Counter统计支持关键词 / 时间过滤、报错统计生成可视化报告辅助快速排查问题。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询