10 Python Scripts to Automate 80% of Repetitive Work

leo 12/12/2025

I. Overview

1.1 Background Introduction

In operations and maintenance work, a large number of repetitive tasks occupy 60%-80% of engineers’ time: log analysis, batch operations, monitoring and alerting, resource cleanup, etc. While these tasks are simple, manual execution is inefficient and error-prone. Python, with its concise syntax, rich standard library, and third-party modules, has become the language of choice for operations automation. This article shares 10 Python scripts that have been validated in production environments, helping operations engineers break free from repetitive labor.

1.2 Technical Characteristics

  • High Development Efficiency: Python syntax is concise; the development cycle is only 1/3 of Shell scripts, suitable for quickly implementing operational requirements.
  • Mature Ecosystem: Provides mature Ops libraries like paramiko, requests, and psutil, avoiding reinventing the wheel.
  • Cross-Platform Compatibility: The same script can run on Linux, Windows, and macOS, reducing maintenance costs.
  • Easy Maintenance and Extension: Code is highly readable, facilitating team collaboration and feature iteration.

1.3 Applicable Scenarios

  • Scenario 1: Operations teams managing 100+ servers, requiring batch configuration distribution, command execution, and file synchronization.
  • Scenario 2: Business systems processing GB-level logs daily, requiring automated anomaly analysis, traffic statistics, and report generation.
  • Scenario 3: Multi-cloud environment resource management, including automated cleanup and cost optimization for VMs, containers, and storage.
  • Scenario 4: 24/7 monitoring scenarios requiring automated health checks, alert handling, and fault self-healing.

1.4 Environmental Requirements

ComponentVersion RequirementsDescription
Operating SystemCentOS 7+ / Ubuntu 18.04+LTS versions recommended
Python3.8+3.10+ recommended, supporting latest syntax features
pip20.0+For installing dependencies
Hardware Config2C4G+Adjust based on actual load

II. Detailed Steps

2.1 Preparations

◆ 2.1.1 System Checks

bash

# Check Python version
python3 --version

# Check pip version
pip3 --version

# Check system resources
free -h
df -h

◆ 2.1.2 Installing Dependencies

bash

# Upgrade pip
pip3 install --upgrade pip

# Install common Ops libraries
pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client

# Verify installation
pip3 list | grep -E "paramiko|requests|psutil"

2.2 Core Configuration

◆ 2.2.1 Configuring SSH Key Authentication

bash

# Generate SSH key pair
ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N ""

# Distribute public key to target server (example)
ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100

Note: Using key authentication instead of password login enhances security and supports batch operations. It’s recommended to create a dedicated key pair for Ops scripts for easier permission management and auditing.

◆ 2.2.2 Configuration File Example

yaml

# Configuration file: config.yml
servers:
  - host: 192.168.1.100
    port: 22
    user: root
    key_file: ~/.ssh/ops_rsa
  - host: 192.168.1.101
    port: 22
    user: root
    key_file: ~/.ssh/ops_rsa

mysql:
  host: 192.168.1.200
  port: 3306
  user: monitor
  password: your_password
  database: ops

redis:
  host: 192.168.1.201
  port: 6379
  password: your_redis_password
  db: 0

log:
  level: INFO
  file: /var/log/ops/automation.log
  max_size: 100 # MB
  backup_count: 10

Parameter Description:

  • servers: Target server list, supports batch operations.
  • mysql/redis: Database connection information, for storing execution results and status.
  • log: Log configuration, rotation recommended to avoid disk filling.

◆ 2.2.3 Log Configuration

python

# logging_config.py
import logging
from logging.handlers import RotatingFileHandler

def setup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO):
    logger = logging.getLogger('ops_automation')
    logger.setLevel(level)

    # Rotating file handler
    handler = RotatingFileHandler(
        log_file,
        maxBytes=100 * 1024 * 1024,  # 100MB
        backupCount=10
    )

    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger

2.3 Startup and Verification

◆ 2.3.1 Basic Tests

bash

# Test SSH connection
python3 -c "import paramiko; print('paramiko OK')"

# Test config file reading
python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"

◆ 2.3.2 Functional Verification

bash

# Verify batch SSH execution (Script 1 example)
python3 batch_ssh_executor.py "uptime"

# Expected output
# [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08
# [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18

III. Example Code and Configuration

3.1 Complete Configuration Examples

◆ 3.1.1 Script 1: Batch SSH Command Executor

python

#!/usr/bin/env python3
# File path: batch_ssh_executor.py
"""
Batch SSH Command Executor
Supports concurrent execution, result collection, exception handling
"""

import paramiko
import yaml
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging_config import setup_logger

logger = setup_logger()

class SSHExecutor:
    def __init__(self, config_file='config.yml'):
        with open(config_file) as f:
            self.config = yaml.safe_load(f)
        self.servers = self.config['servers']

    def execute_on_host(self, server, command, timeout=30):
        """Execute command on a single host"""
        host = server['host']
        try:
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            # Use key authentication
            key = paramiko.RSAKey.from_private_key_file(server['key_file'])
            client.connect(
                hostname=host,
                port=server['port'],
                username=server['user'],
                pkey=key,
                timeout=10
            )

            stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
            exit_code = stdout.channel.recv_exit_status()

            result = {
                'host': host,
                'success': exit_code == 0,
                'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),
                'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),
                'exit_code': exit_code
            }

            client.close()
            logger.info(f"[{host}] Command executed, exit_code={exit_code}")
            return result

        except Exception as e:
            logger.error(f"[{host}] Error: {str(e)}")
            return {
                'host': host,
                'success': False,
                'stdout': '',
                'stderr': str(e),
                'exit_code': -1
            }

    def execute_parallel(self, command, max_workers=10):
        """Execute commands concurrently"""
        results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.execute_on_host, server, command): server
                for server in self.servers
            }

            for future in as_completed(futures):
                results.append(future.result())

        return results

    def print_results(self, results):
        """Format output results"""
        success_count = sum(1 for r in results if r['success'])
        print(f"\nExecution complete: Successful {success_count}/{len(results)}\n")

        for result in sorted(results, key=lambda x: x['host']):
            status = "SUCCESS" if result['success'] else "FAILED"
            print(f"[{result['host']}] {status}")
            if result['stdout']:
                print(f"  Output: {result['stdout']}")
            if result['stderr']:
                print(f"  Error: {result['stderr']}")
            print()

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Usage: python3 batch_ssh_executor.py '<command>'")
        sys.exit(1)

    command = sys.argv[1]
    executor = SSHExecutor()
    results = executor.execute_parallel(command)
    executor.print_results(results)

◆ 3.1.2 Script 2: Log Analysis and Alerting

python

#!/usr/bin/env python3
# File name: log_analyzer.py
"""
Log Analysis Tool
Features: Error statistics, anomaly detection, automatic alerts
"""

import re
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import requests
from logging_config import setup_logger

logger = setup_logger()

class LogAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.error_patterns = {
            'http_5xx': r'HTTP/\d\.\d"\s5\d{2}',
            'exception': r'(Exception|Error|Fatal)',
            'timeout': r'(timeout|timed out)',
            'connection_refused': r'Connection refused',
            'out_of_memory': r'(OutOfMemory|OOM|Cannot allocate memory)'
        }

    def parse_nginx_log(self, line):
        """Parse Nginx log format"""
        pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'
        match = re.match(pattern, line)
        if match:
            return {
                'ip': match.group(1),
                'time': match.group(2),
                'request': match.group(3),
                'status': int(match.group(4)),
                'size': int(match.group(5)),
                'referer': match.group(6),
                'user_agent': match.group(7)
            }
        return None

    def analyze(self, time_window=60):
        """Analyze logs from the last N minutes"""
        now = datetime.now()
        cutoff_time = now - timedelta(minutes=time_window)

        stats = {
            'total_requests': 0,
            'error_count': defaultdict(int),
            'status_codes': Counter(),
            'top_ips': Counter(),
            'slow_requests': []
        }

        with open(self.log_file, 'r') as f:
            for line in f:
                entry = self.parse_nginx_log(line)
                if not entry:
                    continue

                # Time filtering
                log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')
                if log_time.replace(tzinfo=None) < cutoff_time:
                    continue

                stats['total_requests'] += 1
                stats['status_codes'][entry['status']] += 1
                stats['top_ips'][entry['ip']] += 1

                # Error detection
                for error_type, pattern in self.error_patterns.items():
                    if re.search(pattern, line):
                        stats['error_count'][error_type] += 1

                # 5xx error recording
                if 500 <= entry['status'] < 600:
                    stats['slow_requests'].append({
                        'time': entry['time'],
                        'request': entry['request'],
                        'status': entry['status']
                    })

        return stats

    def check_alert_conditions(self, stats):
        """Check alert conditions"""
        alerts = []

        # 5xx error rate > 5%
        if stats['total_requests'] > 0:
            error_5xx = sum(count for code, count in stats['status_codes'].items()
                            if 500 <= code < 600)
            error_rate = error_5xx / stats['total_requests']
            if error_rate > 0.05:
                alerts.append({
                    'level': 'critical',
                    'message': f'5xx error rate: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})'
                })

        # OOM errors
        if stats['error_count']['out_of_memory'] > 0:
            alerts.append({
                'level': 'critical',
                'message': f'OOM errors detected: {stats["error_count"]["out_of_memory"]} times'
            })

        # Connection timeout
        if stats['error_count']['timeout'] > 100:
            alerts.append({
                'level': 'warning',
                'message': f'Timeout errors abnormal: {stats["error_count"]["timeout"]} times'
            })

        return alerts

    def send_alert(self, alerts, webhook_url):
        """Send alerts to WeCom/DingTalk"""
        if not alerts:
            return

        message = "【Log Alert】\n" + "\n".join(
            f"[{a['level'].upper()}] {a['message']}" for a in alerts
        )

        payload = {
            "msgtype": "text",
            "text": {"content": message}
        }

        try:
            response = requests.post(webhook_url, json=payload, timeout=5)
            if response.status_code == 200:
                logger.info("Alert sent successfully")
            else:
                logger.error(f"Failed to send alert: {response.status_code}")
        except Exception as e:
            logger.error(f"Alert sending exception: {str(e)}")

if __name__ == '__main__':
    analyzer = LogAnalyzer('/var/log/nginx/access.log')
    stats = analyzer.analyze(time_window=5)

    print(f"Total requests: {stats['total_requests']}")
    print(f"Status code distribution: {dict(stats['status_codes'])}")
    print(f"Top 10 IPs: {stats['top_ips'].most_common(10)}")
    print(f"Error statistics: {dict(stats['error_count'])}")

    alerts = analyzer.check_alert_conditions(stats)
    if alerts:
        print("\nAlerts triggered:")
        for alert in alerts:
            print(f"  [{alert['level']}] {alert['message']}")

    # Send alert (replace with actual webhook URL)
    # analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')

◆ 3.1.3 Script 3: System Resource Monitoring

python

#!/usr/bin/env python3
# File name: system_monitor.py
"""
System Resource Monitor
Monitors CPU, memory, disk, network, supports Prometheus integration
"""

import psutil
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from logging_config import setup_logger

logger = setup_logger()

class SystemMonitor:
    def __init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):
        self.pushgateway_url = pushgateway_url
        self.job_name = job_name
        self.registry = CollectorRegistry()

        # Define metrics
        self.cpu_gauge = Gauge('system_cpu_percent', 'CPU usage', registry=self.registry)
        self.memory_gauge = Gauge('system_memory_percent', 'Memory usage', registry=self.registry)
        self.disk_gauge = Gauge('system_disk_percent', 'Disk usage',
                                ['mountpoint'], registry=self.registry)
        self.network_gauge = Gauge('system_network_bytes', 'Network traffic',
                                   ['interface', 'direction'], registry=self.registry)

    def collect_metrics(self):
        """Collect system metrics"""
        metrics = {}

        # CPU
        cpu_percent = psutil.cpu_percent(interval=1)
        metrics['cpu'] = cpu_percent
        self.cpu_gauge.set(cpu_percent)

        # Memory
        memory = psutil.virtual_memory()
        metrics['memory'] = {
            'percent': memory.percent,
            'total': memory.total,
            'available': memory.available,
            'used': memory.used
        }
        self.memory_gauge.set(memory.percent)

        # Disk
        metrics['disk'] = {}
        for partition in psutil.disk_partitions():
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                metrics['disk'][partition.mountpoint] = {
                    'percent': usage.percent,
                    'total': usage.total,
                    'used': usage.used,
                    'free': usage.free
                }
                self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)
            except PermissionError:
                continue

        # Network
        net_io = psutil.net_io_counters(pernic=True)
        metrics['network'] = {}
        for interface, stats in net_io.items():
            metrics['network'][interface] = {
                'bytes_sent': stats.bytes_sent,
                'bytes_recv': stats.bytes_recv
            }
            self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)
            self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)

        return metrics

    def check_thresholds(self, metrics):
        """Check threshold alerts"""
        alerts = []

        if metrics['cpu'] > 80:
            alerts.append(f"CPU usage too high: {metrics['cpu']:.1f}%")

        if metrics['memory']['percent'] > 85:
            alerts.append(f"Memory usage too high: {metrics['memory']['percent']:.1f}%")

        for mount, stats in metrics['disk'].items():
            if stats['percent'] > 90:
                alerts.append(f"Disk space low: {mount} ({stats['percent']:.1f}%)")

        return alerts

    def push_metrics(self):
        """Push metrics to Pushgateway"""
        try:
            push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry)
            logger.info("Metrics pushed successfully")
        except Exception as e:
            logger.error(f"Failed to push metrics: {str(e)}")

    def run(self, interval=60):
        """Continuous monitoring"""
        logger.info(f"Starting monitor, collection interval: {interval} seconds")
        while True:
            try:
                metrics = self.collect_metrics()
                alerts = self.check_thresholds(metrics)

                if alerts:
                    logger.warning("Alerts triggered: " + "; ".join(alerts))

                self.push_metrics()
                time.sleep(interval)

            except KeyboardInterrupt:
                logger.info("Monitoring stopped")
                break
            except Exception as e:
                logger.error(f"Monitoring exception: {str(e)}")
                time.sleep(interval)

if __name__ == '__main__':
    monitor = SystemMonitor()

    # Single collection
    metrics = monitor.collect_metrics()
    print(f"CPU: {metrics['cpu']:.1f}%")
    print(f"Memory: {metrics['memory']['percent']:.1f}%")
    print("Disk:")
    for mount, stats in metrics['disk'].items():
        print(f"  {mount}: {stats['percent']:.1f}%")

    # Continuous monitoring (uncomment to enable)
    # monitor.run(interval=60)

◆ 3.1.4 Script 4: MySQL Slow Query Analysis

python

#!/usr/bin/env python3
# File name: mysql_slow_query_analyzer.py
"""
MySQL Slow Query Analyzer
Parses slow query log, generates optimization suggestions
"""

import re
import pymysql
from collections import defaultdict
from logging_config import setup_logger

logger = setup_logger()

class SlowQueryAnalyzer:
    def __init__(self, slow_log_file, db_config):
        self.slow_log_file = slow_log_file
        self.db_config = db_config
        self.queries = []

    def parse_slow_log(self):
        """Parse slow query log"""
        current_query = {}

        with open(self.slow_log_file, 'r') as f:
            for line in f:
                # Time line
                if line.startswith('# Time:'):
                    if current_query:
                        self.queries.append(current_query)
                    current_query = {'time': line.split(':', 1)[1].strip()}

                # User@Host line
                elif line.startswith('# User@Host:'):
                    match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)
                    if match:
                        current_query['user'] = match.group(1)
                        current_query['host'] = match.group(3)

                # Query_time line
                elif line.startswith('# Query_time:'):
                    match = re.search(
                        r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)',
                        line
                    )
                    if match:
                        current_query['query_time'] = float(match.group(1))
                        current_query['lock_time'] = float(match.group(2))
                        current_query['rows_sent'] = int(match.group(3))
                        current_query['rows_examined'] = int(match.group(4))

                # SQL statement
                elif not line.startswith('#') and line.strip():
                    current_query['sql'] = current_query.get('sql', '') + line.strip() + ' '

        if current_query:
            self.queries.append(current_query)

        logger.info(f"Parsing complete, total {len(self.queries)} slow queries")

    def analyze(self):
        """Analyze slow queries"""
        stats = {
            'total': len(self.queries),
            'avg_query_time': 0,
            'max_query_time': 0,
            'top_queries': [],
            'table_scan': []
        }

        if not self.queries:
            return stats

        # Basic statistics
        total_time = sum(q['query_time'] for q in self.queries)
        stats['avg_query_time'] = total_time / len(self.queries)
        stats['max_query_time'] = max(q['query_time'] for q in self.queries)

        # Top 10 time-consuming queries
        sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True)
        stats['top_queries'] = sorted_queries[:10]

        # Full table scan detection (rows_examined > 10000)
        stats['table_scan'] = [
            q for q in self.queries
            if q.get('rows_examined', 0) > 10000
        ]

        return stats

    def get_explain_plan(self, sql):
        """Get EXPLAIN execution plan"""
        try:
            conn = pymysql.connect(**self.db_config)
            cursor = conn.cursor()

            cursor.execute(f"EXPLAIN {sql}")
            result = cursor.fetchall()

            cursor.close()
            conn.close()

            return result
        except Exception as e:
            logger.error(f"EXPLAIN failed: {str(e)}")
            return None

    def generate_report(self, stats):
        """Generate analysis report"""
        report = []
        report.append("=" * 80)
        report.append("MySQL Slow Query Analysis Report")
        report.append("=" * 80)
        report.append(f"Total slow queries: {stats['total']}")
        report.append(f"Average query time: {stats['avg_query_time']:.2f} seconds")
        report.append(f"Max query time: {stats['max_query_time']:.2f} seconds")
        report.append("")

        report.append("Top 10 time-consuming queries:")
        for i, query in enumerate(stats['top_queries'], 1):
            report.append(f"\n{i}. Query time: {query['query_time']:.2f} seconds")
            report.append(f"   Rows examined: {query.get('rows_examined', 0)}")
            report.append(f"   SQL: {query.get('sql', '')[:200]}")

        if stats['table_scan']:
            report.append(f"\nFound {len(stats['table_scan'])} full table scan queries")
            for query in stats['table_scan'][:5]:
                report.append(f"  - {query.get('sql', '')[:100]}")

        return "\n".join(report)

if __name__ == '__main__':
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'your_password',
        'database': 'test'
    }

    analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config)
    analyzer.parse_slow_log()
    stats = analyzer.analyze()

    print(analyzer.generate_report(stats))

◆ 3.1.5 Script 5: File Synchronization Tool

python

#!/usr/bin/env python3
# File name: file_sync.py
"""
File Synchronization Tool
Supports incremental sync, resumable transfer, verification
"""

import os
import hashlib
import paramiko
from pathlib import Path
from logging_config import setup_logger

logger = setup_logger()

class FileSync:
    def __init__(self, ssh_config):
        self.ssh_config = ssh_config
        self.client = None
        self.sftp = None

    def connect(self):
        """Establish SSH connection"""
        try:
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])
            self.client.connect(
                hostname=self.ssh_config['host'],
                port=self.ssh_config['port'],
                username=self.ssh_config['user'],
                pkey=key
            )

            self.sftp = self.client.open_sftp()
            logger.info(f"Connection successful: {self.ssh_config['host']}")

        except Exception as e:
            logger.error(f"Connection failed: {str(e)}")
            raise

    def disconnect(self):
        """Close connection"""
        if self.sftp:
            self.sftp.close()
        if self.client:
            self.client.close()

    def calculate_md5(self, file_path):
        """Calculate file MD5"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()

    def remote_file_exists(self, remote_path):
        """Check if remote file exists"""
        try:
            self.sftp.stat(remote_path)
            return True
        except FileNotFoundError:
            return False

    def sync_file(self, local_path, remote_path, check_md5=True):
        """Sync a single file"""
        try:
            # Ensure remote directory exists
            remote_dir = os.path.dirname(remote_path)
            try:
                self.sftp.stat(remote_dir)
            except FileNotFoundError:
                self._create_remote_dir(remote_dir)

            # MD5 verification
            need_upload = True
            if check_md5 and self.remote_file_exists(remote_path):
                local_md5 = self.calculate_md5(local_path)
                # Remote MD5 calculation (requires command execution)
                stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}")
                remote_md5 = stdout.read().decode().split()[0]

                if local_md5 == remote_md5:
                    logger.info(f"File unchanged, skipping: {local_path}")
                    need_upload = False

            if need_upload:
                self.sftp.put(local_path, remote_path)
                logger.info(f"Upload successful: {local_path} -> {remote_path}")
                return True

            return False

        except Exception as e:
            logger.error(f"Sync failed {local_path}: {str(e)}")
            return False

    def _create_remote_dir(self, remote_dir):
        """Recursively create remote directory"""
        dirs = []
        while remote_dir != '/':
            dirs.append(remote_dir)
            remote_dir = os.path.dirname(remote_dir)

        for dir_path in reversed(dirs):
            try:
                self.sftp.stat(dir_path)
            except FileNotFoundError:
                self.sftp.mkdir(dir_path)
                logger.info(f"Created directory: {dir_path}")

    def sync_directory(self, local_dir, remote_dir, exclude_patterns=None):
        """Sync entire directory"""
        exclude_patterns = exclude_patterns or []
        synced_count = 0
        skipped_count = 0

        for root, dirs, files in os.walk(local_dir):
            # Calculate relative path
            rel_path = os.path.relpath(root, local_dir)
            remote_root = os.path.join(remote_dir, rel_path).replace('\\', '/')

            for file in files:
                # Exclusion rules
                if any(pattern in file for pattern in exclude_patterns):
                    continue

                local_file = os.path.join(root, file)
                remote_file = os.path.join(remote_root, file).replace('\\', '/')

                if self.sync_file(local_file, remote_file):
                    synced_count += 1
                else:
                    skipped_count += 1

        logger.info(f"Sync complete: Uploaded {synced_count} files, skipped {skipped_count}")

if __name__ == '__main__':
    ssh_config = {
        'host': '192.168.1.100',
        'port': 22,
        'user': 'root',
        'key_file': '~/.ssh/ops_rsa'
    }

    sync = FileSync(ssh_config)
    sync.connect()

    # Sync single file
    # sync.sync_file('/local/config.yml', '/remote/config.yml')

    # Sync directory
    sync.sync_directory(
        '/local/app',
        '/remote/app',
        exclude_patterns=['.git', '.pyc', '__pycache__']
    )

    sync.disconnect()

3.2 Practical Application Cases

◆ Case 1: Automated Certificate Renewal Check

Scenario: Manage SSL certificates for 100+ domains, need to detect certificates expiring within 30 days and send alerts.

Implementation Code:

python

#!/usr/bin/env python3
# File name: ssl_cert_checker.py

import ssl
import socket
from datetime import datetime, timedelta
import requests

class SSLCertChecker:
    def __init__(self, domains, alert_days=30):
        self.domains = domains
        self.alert_days = alert_days

    def check_cert_expiry(self, domain, port=443):
        """Check certificate expiration time"""
        try:
            context = ssl.create_default_context()
            with socket.create_connection((domain, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=domain) as ssock:
                    cert = ssock.getpeercert()

            # Parse expiration time
            expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
            days_left = (expire_date - datetime.now()).days

            return {
                'domain': domain,
                'expire_date': expire_date,
                'days_left': days_left,
                'issuer': dict(x[0] for x in cert['issuer'])
            }

        except Exception as e:
            return {
                'domain': domain,
                'error': str(e)
            }

    def check_all(self):
        """Check all domains"""
        results = []
        alerts = []

        for domain in self.domains:
            result = self.check_cert_expiry(domain)
            results.append(result)

            if 'days_left' in result and result['days_left'] < self.alert_days:
                alerts.append(f"{domain} certificate will expire in {result['days_left']} days")

        return results, alerts

# Usage example
domains = ['example.com', 'api.example.com', 'www.example.com']
checker = SSLCertChecker(domains)
results, alerts = checker.check_all()

for result in results:
    if 'days_left' in result:
        print(f"{result['domain']}: {result['days_left']} days remaining")
    else:
        print(f"{result['domain']}: Check failed - {result['error']}")

if alerts:
    print("\nAlerts:")
    for alert in alerts:
        print(f"  - {alert}")

Execution Result:

text

example.com: 85 days remaining
api.example.com: 12 days remaining
www.example.com: 45 days remaining

Alerts:
  - api.example.com certificate will expire in 12 days

◆ Case 2: Docker Container Resource Cleanup

Scenario: Regularly clean up containers stopped for more than 7 days, unused images and volumes, to free up disk space.

Implementation Code:

python

#!/usr/bin/env python3
# File name: docker_cleanup.py

import subprocess
import json
from datetime import datetime, timedelta

class DockerCleaner:
    def __init__(self, dry_run=True):
        self.dry_run = dry_run

    def get_stopped_containers(self, days=7):
        """Get containers stopped for more than N days"""
        cutoff_time = datetime.now() - timedelta(days=days)

        cmd = "docker ps -a --format '{{json .}}'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        stopped_containers = []
        for line in result.stdout.strip().split('\n'):
            if not line:
                continue

            container = json.loads(line)
            if container['State'] != 'exited':
                continue

            # Get container details
            inspect_cmd = f"docker inspect {container['ID']}"
            inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True)
            detail = json.loads(inspect_result.stdout)[0]

            finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])
            if finished_at < cutoff_time:
                stopped_containers.append({
                    'id': container['ID'],
                    'name': container['Names'],
                    'finished_at': finished_at
                })

        return stopped_containers

    def remove_containers(self, containers):
        """Remove containers"""
        for container in containers:
            cmd = f"docker rm {container['id']}"
            if self.dry_run:
                print(f"[DRY RUN] {cmd}")
            else:
                subprocess.run(cmd, shell=True)
                print(f"Removed container: {container['name']}")

    def prune_images(self):
        """Clean up unused images"""
        cmd = "docker image prune -a -f"
        if self.dry_run:
            print(f"[DRY RUN] {cmd}")
        else:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            print(result.stdout)

    def prune_volumes(self):
        """Clean up unused volumes"""
        cmd = "docker volume prune -f"
        if self.dry_run:
            print(f"[DRY RUN] {cmd}")
        else:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            print(result.stdout)

    def cleanup(self, container_days=7):
        """Execute cleanup"""
        print(f"Starting cleanup (DRY RUN: {self.dry_run})")

        # Clean up containers
        containers = self.get_stopped_containers(container_days)
        print(f"\nFound {len(containers)} containers stopped for more than {container_days} days")
        self.remove_containers(containers)

        # Clean up images
        print("\nCleaning up unused images...")
        self.pr