I. Overview
1.1 Background Introduction
In operations and maintenance work, a large number of repetitive tasks occupy 60%-80% of engineers’ time: log analysis, batch operations, monitoring and alerting, resource cleanup, etc. While these tasks are simple, manual execution is inefficient and error-prone. Python, with its concise syntax, rich standard library, and third-party modules, has become the language of choice for operations automation. This article shares 10 Python scripts that have been validated in production environments, helping operations engineers break free from repetitive labor.
1.2 Technical Characteristics
- High Development Efficiency: Python syntax is concise; the development cycle is only 1/3 of Shell scripts, suitable for quickly implementing operational requirements.
- Mature Ecosystem: Provides mature Ops libraries like paramiko, requests, and psutil, avoiding reinventing the wheel.
- Cross-Platform Compatibility: The same script can run on Linux, Windows, and macOS, reducing maintenance costs.
- Easy Maintenance and Extension: Code is highly readable, facilitating team collaboration and feature iteration.
1.3 Applicable Scenarios
- Scenario 1: Operations teams managing 100+ servers, requiring batch configuration distribution, command execution, and file synchronization.
- Scenario 2: Business systems processing GB-level logs daily, requiring automated anomaly analysis, traffic statistics, and report generation.
- Scenario 3: Multi-cloud environment resource management, including automated cleanup and cost optimization for VMs, containers, and storage.
- Scenario 4: 24/7 monitoring scenarios requiring automated health checks, alert handling, and fault self-healing.
1.4 Environmental Requirements
| Component | Version Requirements | Description |
|---|---|---|
| Operating System | CentOS 7+ / Ubuntu 18.04+ | LTS versions recommended |
| Python | 3.8+ | 3.10+ recommended, supporting latest syntax features |
| pip | 20.0+ | For installing dependencies |
| Hardware Config | 2C4G+ | Adjust based on actual load |
II. Detailed Steps
2.1 Preparations
◆ 2.1.1 System Checks
bash
# Check Python version python3 --version # Check pip version pip3 --version # Check system resources free -h df -h
◆ 2.1.2 Installing Dependencies
bash
# Upgrade pip pip3 install --upgrade pip # Install common Ops libraries pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client # Verify installation pip3 list | grep -E "paramiko|requests|psutil"
2.2 Core Configuration
◆ 2.2.1 Configuring SSH Key Authentication
bash
# Generate SSH key pair ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N "" # Distribute public key to target server (example) ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100
Note: Using key authentication instead of password login enhances security and supports batch operations. It’s recommended to create a dedicated key pair for Ops scripts for easier permission management and auditing.
◆ 2.2.2 Configuration File Example
yaml
# Configuration file: config.yml
servers:
- host: 192.168.1.100
port: 22
user: root
key_file: ~/.ssh/ops_rsa
- host: 192.168.1.101
port: 22
user: root
key_file: ~/.ssh/ops_rsa
mysql:
host: 192.168.1.200
port: 3306
user: monitor
password: your_password
database: ops
redis:
host: 192.168.1.201
port: 6379
password: your_redis_password
db: 0
log:
level: INFO
file: /var/log/ops/automation.log
max_size: 100 # MB
backup_count: 10
Parameter Description:
- servers: Target server list, supports batch operations.
- mysql/redis: Database connection information, for storing execution results and status.
- log: Log configuration, rotation recommended to avoid disk filling.
◆ 2.2.3 Log Configuration
python
# logging_config.py
import logging
from logging.handlers import RotatingFileHandler
def setup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO):
logger = logging.getLogger('ops_automation')
logger.setLevel(level)
# Rotating file handler
handler = RotatingFileHandler(
log_file,
maxBytes=100 * 1024 * 1024, # 100MB
backupCount=10
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
2.3 Startup and Verification
◆ 2.3.1 Basic Tests
bash
# Test SSH connection
python3 -c "import paramiko; print('paramiko OK')"
# Test config file reading
python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"
◆ 2.3.2 Functional Verification
bash
# Verify batch SSH execution (Script 1 example) python3 batch_ssh_executor.py "uptime" # Expected output # [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08 # [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18
III. Example Code and Configuration
3.1 Complete Configuration Examples
◆ 3.1.1 Script 1: Batch SSH Command Executor
python
#!/usr/bin/env python3
# File path: batch_ssh_executor.py
"""
Batch SSH Command Executor
Supports concurrent execution, result collection, exception handling
"""
import paramiko
import yaml
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging_config import setup_logger
logger = setup_logger()
class SSHExecutor:
def __init__(self, config_file='config.yml'):
with open(config_file) as f:
self.config = yaml.safe_load(f)
self.servers = self.config['servers']
def execute_on_host(self, server, command, timeout=30):
"""Execute command on a single host"""
host = server['host']
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Use key authentication
key = paramiko.RSAKey.from_private_key_file(server['key_file'])
client.connect(
hostname=host,
port=server['port'],
username=server['user'],
pkey=key,
timeout=10
)
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
result = {
'host': host,
'success': exit_code == 0,
'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),
'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),
'exit_code': exit_code
}
client.close()
logger.info(f"[{host}] Command executed, exit_code={exit_code}")
return result
except Exception as e:
logger.error(f"[{host}] Error: {str(e)}")
return {
'host': host,
'success': False,
'stdout': '',
'stderr': str(e),
'exit_code': -1
}
def execute_parallel(self, command, max_workers=10):
"""Execute commands concurrently"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.execute_on_host, server, command): server
for server in self.servers
}
for future in as_completed(futures):
results.append(future.result())
return results
def print_results(self, results):
"""Format output results"""
success_count = sum(1 for r in results if r['success'])
print(f"\nExecution complete: Successful {success_count}/{len(results)}\n")
for result in sorted(results, key=lambda x: x['host']):
status = "SUCCESS" if result['success'] else "FAILED"
print(f"[{result['host']}] {status}")
if result['stdout']:
print(f" Output: {result['stdout']}")
if result['stderr']:
print(f" Error: {result['stderr']}")
print()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 batch_ssh_executor.py '<command>'")
sys.exit(1)
command = sys.argv[1]
executor = SSHExecutor()
results = executor.execute_parallel(command)
executor.print_results(results)
◆ 3.1.2 Script 2: Log Analysis and Alerting
python
#!/usr/bin/env python3
# File name: log_analyzer.py
"""
Log Analysis Tool
Features: Error statistics, anomaly detection, automatic alerts
"""
import re
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import requests
from logging_config import setup_logger
logger = setup_logger()
class LogAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.error_patterns = {
'http_5xx': r'HTTP/\d\.\d"\s5\d{2}',
'exception': r'(Exception|Error|Fatal)',
'timeout': r'(timeout|timed out)',
'connection_refused': r'Connection refused',
'out_of_memory': r'(OutOfMemory|OOM|Cannot allocate memory)'
}
def parse_nginx_log(self, line):
"""Parse Nginx log format"""
pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'
match = re.match(pattern, line)
if match:
return {
'ip': match.group(1),
'time': match.group(2),
'request': match.group(3),
'status': int(match.group(4)),
'size': int(match.group(5)),
'referer': match.group(6),
'user_agent': match.group(7)
}
return None
def analyze(self, time_window=60):
"""Analyze logs from the last N minutes"""
now = datetime.now()
cutoff_time = now - timedelta(minutes=time_window)
stats = {
'total_requests': 0,
'error_count': defaultdict(int),
'status_codes': Counter(),
'top_ips': Counter(),
'slow_requests': []
}
with open(self.log_file, 'r') as f:
for line in f:
entry = self.parse_nginx_log(line)
if not entry:
continue
# Time filtering
log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')
if log_time.replace(tzinfo=None) < cutoff_time:
continue
stats['total_requests'] += 1
stats['status_codes'][entry['status']] += 1
stats['top_ips'][entry['ip']] += 1
# Error detection
for error_type, pattern in self.error_patterns.items():
if re.search(pattern, line):
stats['error_count'][error_type] += 1
# 5xx error recording
if 500 <= entry['status'] < 600:
stats['slow_requests'].append({
'time': entry['time'],
'request': entry['request'],
'status': entry['status']
})
return stats
def check_alert_conditions(self, stats):
"""Check alert conditions"""
alerts = []
# 5xx error rate > 5%
if stats['total_requests'] > 0:
error_5xx = sum(count for code, count in stats['status_codes'].items()
if 500 <= code < 600)
error_rate = error_5xx / stats['total_requests']
if error_rate > 0.05:
alerts.append({
'level': 'critical',
'message': f'5xx error rate: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})'
})
# OOM errors
if stats['error_count']['out_of_memory'] > 0:
alerts.append({
'level': 'critical',
'message': f'OOM errors detected: {stats["error_count"]["out_of_memory"]} times'
})
# Connection timeout
if stats['error_count']['timeout'] > 100:
alerts.append({
'level': 'warning',
'message': f'Timeout errors abnormal: {stats["error_count"]["timeout"]} times'
})
return alerts
def send_alert(self, alerts, webhook_url):
"""Send alerts to WeCom/DingTalk"""
if not alerts:
return
message = "【Log Alert】\n" + "\n".join(
f"[{a['level'].upper()}] {a['message']}" for a in alerts
)
payload = {
"msgtype": "text",
"text": {"content": message}
}
try:
response = requests.post(webhook_url, json=payload, timeout=5)
if response.status_code == 200:
logger.info("Alert sent successfully")
else:
logger.error(f"Failed to send alert: {response.status_code}")
except Exception as e:
logger.error(f"Alert sending exception: {str(e)}")
if __name__ == '__main__':
analyzer = LogAnalyzer('/var/log/nginx/access.log')
stats = analyzer.analyze(time_window=5)
print(f"Total requests: {stats['total_requests']}")
print(f"Status code distribution: {dict(stats['status_codes'])}")
print(f"Top 10 IPs: {stats['top_ips'].most_common(10)}")
print(f"Error statistics: {dict(stats['error_count'])}")
alerts = analyzer.check_alert_conditions(stats)
if alerts:
print("\nAlerts triggered:")
for alert in alerts:
print(f" [{alert['level']}] {alert['message']}")
# Send alert (replace with actual webhook URL)
# analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')
◆ 3.1.3 Script 3: System Resource Monitoring
python
#!/usr/bin/env python3
# File name: system_monitor.py
"""
System Resource Monitor
Monitors CPU, memory, disk, network, supports Prometheus integration
"""
import psutil
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from logging_config import setup_logger
logger = setup_logger()
class SystemMonitor:
def __init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):
self.pushgateway_url = pushgateway_url
self.job_name = job_name
self.registry = CollectorRegistry()
# Define metrics
self.cpu_gauge = Gauge('system_cpu_percent', 'CPU usage', registry=self.registry)
self.memory_gauge = Gauge('system_memory_percent', 'Memory usage', registry=self.registry)
self.disk_gauge = Gauge('system_disk_percent', 'Disk usage',
['mountpoint'], registry=self.registry)
self.network_gauge = Gauge('system_network_bytes', 'Network traffic',
['interface', 'direction'], registry=self.registry)
def collect_metrics(self):
"""Collect system metrics"""
metrics = {}
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
metrics['cpu'] = cpu_percent
self.cpu_gauge.set(cpu_percent)
# Memory
memory = psutil.virtual_memory()
metrics['memory'] = {
'percent': memory.percent,
'total': memory.total,
'available': memory.available,
'used': memory.used
}
self.memory_gauge.set(memory.percent)
# Disk
metrics['disk'] = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
metrics['disk'][partition.mountpoint] = {
'percent': usage.percent,
'total': usage.total,
'used': usage.used,
'free': usage.free
}
self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)
except PermissionError:
continue
# Network
net_io = psutil.net_io_counters(pernic=True)
metrics['network'] = {}
for interface, stats in net_io.items():
metrics['network'][interface] = {
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv
}
self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)
self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)
return metrics
def check_thresholds(self, metrics):
"""Check threshold alerts"""
alerts = []
if metrics['cpu'] > 80:
alerts.append(f"CPU usage too high: {metrics['cpu']:.1f}%")
if metrics['memory']['percent'] > 85:
alerts.append(f"Memory usage too high: {metrics['memory']['percent']:.1f}%")
for mount, stats in metrics['disk'].items():
if stats['percent'] > 90:
alerts.append(f"Disk space low: {mount} ({stats['percent']:.1f}%)")
return alerts
def push_metrics(self):
"""Push metrics to Pushgateway"""
try:
push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry)
logger.info("Metrics pushed successfully")
except Exception as e:
logger.error(f"Failed to push metrics: {str(e)}")
def run(self, interval=60):
"""Continuous monitoring"""
logger.info(f"Starting monitor, collection interval: {interval} seconds")
while True:
try:
metrics = self.collect_metrics()
alerts = self.check_thresholds(metrics)
if alerts:
logger.warning("Alerts triggered: " + "; ".join(alerts))
self.push_metrics()
time.sleep(interval)
except KeyboardInterrupt:
logger.info("Monitoring stopped")
break
except Exception as e:
logger.error(f"Monitoring exception: {str(e)}")
time.sleep(interval)
if __name__ == '__main__':
monitor = SystemMonitor()
# Single collection
metrics = monitor.collect_metrics()
print(f"CPU: {metrics['cpu']:.1f}%")
print(f"Memory: {metrics['memory']['percent']:.1f}%")
print("Disk:")
for mount, stats in metrics['disk'].items():
print(f" {mount}: {stats['percent']:.1f}%")
# Continuous monitoring (uncomment to enable)
# monitor.run(interval=60)
◆ 3.1.4 Script 4: MySQL Slow Query Analysis
python
#!/usr/bin/env python3
# File name: mysql_slow_query_analyzer.py
"""
MySQL Slow Query Analyzer
Parses slow query log, generates optimization suggestions
"""
import re
import pymysql
from collections import defaultdict
from logging_config import setup_logger
logger = setup_logger()
class SlowQueryAnalyzer:
def __init__(self, slow_log_file, db_config):
self.slow_log_file = slow_log_file
self.db_config = db_config
self.queries = []
def parse_slow_log(self):
"""Parse slow query log"""
current_query = {}
with open(self.slow_log_file, 'r') as f:
for line in f:
# Time line
if line.startswith('# Time:'):
if current_query:
self.queries.append(current_query)
current_query = {'time': line.split(':', 1)[1].strip()}
# User@Host line
elif line.startswith('# User@Host:'):
match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)
if match:
current_query['user'] = match.group(1)
current_query['host'] = match.group(3)
# Query_time line
elif line.startswith('# Query_time:'):
match = re.search(
r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)',
line
)
if match:
current_query['query_time'] = float(match.group(1))
current_query['lock_time'] = float(match.group(2))
current_query['rows_sent'] = int(match.group(3))
current_query['rows_examined'] = int(match.group(4))
# SQL statement
elif not line.startswith('#') and line.strip():
current_query['sql'] = current_query.get('sql', '') + line.strip() + ' '
if current_query:
self.queries.append(current_query)
logger.info(f"Parsing complete, total {len(self.queries)} slow queries")
def analyze(self):
"""Analyze slow queries"""
stats = {
'total': len(self.queries),
'avg_query_time': 0,
'max_query_time': 0,
'top_queries': [],
'table_scan': []
}
if not self.queries:
return stats
# Basic statistics
total_time = sum(q['query_time'] for q in self.queries)
stats['avg_query_time'] = total_time / len(self.queries)
stats['max_query_time'] = max(q['query_time'] for q in self.queries)
# Top 10 time-consuming queries
sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True)
stats['top_queries'] = sorted_queries[:10]
# Full table scan detection (rows_examined > 10000)
stats['table_scan'] = [
q for q in self.queries
if q.get('rows_examined', 0) > 10000
]
return stats
def get_explain_plan(self, sql):
"""Get EXPLAIN execution plan"""
try:
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute(f"EXPLAIN {sql}")
result = cursor.fetchall()
cursor.close()
conn.close()
return result
except Exception as e:
logger.error(f"EXPLAIN failed: {str(e)}")
return None
def generate_report(self, stats):
"""Generate analysis report"""
report = []
report.append("=" * 80)
report.append("MySQL Slow Query Analysis Report")
report.append("=" * 80)
report.append(f"Total slow queries: {stats['total']}")
report.append(f"Average query time: {stats['avg_query_time']:.2f} seconds")
report.append(f"Max query time: {stats['max_query_time']:.2f} seconds")
report.append("")
report.append("Top 10 time-consuming queries:")
for i, query in enumerate(stats['top_queries'], 1):
report.append(f"\n{i}. Query time: {query['query_time']:.2f} seconds")
report.append(f" Rows examined: {query.get('rows_examined', 0)}")
report.append(f" SQL: {query.get('sql', '')[:200]}")
if stats['table_scan']:
report.append(f"\nFound {len(stats['table_scan'])} full table scan queries")
for query in stats['table_scan'][:5]:
report.append(f" - {query.get('sql', '')[:100]}")
return "\n".join(report)
if __name__ == '__main__':
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'test'
}
analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config)
analyzer.parse_slow_log()
stats = analyzer.analyze()
print(analyzer.generate_report(stats))
◆ 3.1.5 Script 5: File Synchronization Tool
python
#!/usr/bin/env python3
# File name: file_sync.py
"""
File Synchronization Tool
Supports incremental sync, resumable transfer, verification
"""
import os
import hashlib
import paramiko
from pathlib import Path
from logging_config import setup_logger
logger = setup_logger()
class FileSync:
def __init__(self, ssh_config):
self.ssh_config = ssh_config
self.client = None
self.sftp = None
def connect(self):
"""Establish SSH connection"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])
self.client.connect(
hostname=self.ssh_config['host'],
port=self.ssh_config['port'],
username=self.ssh_config['user'],
pkey=key
)
self.sftp = self.client.open_sftp()
logger.info(f"Connection successful: {self.ssh_config['host']}")
except Exception as e:
logger.error(f"Connection failed: {str(e)}")
raise
def disconnect(self):
"""Close connection"""
if self.sftp:
self.sftp.close()
if self.client:
self.client.close()
def calculate_md5(self, file_path):
"""Calculate file MD5"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def remote_file_exists(self, remote_path):
"""Check if remote file exists"""
try:
self.sftp.stat(remote_path)
return True
except FileNotFoundError:
return False
def sync_file(self, local_path, remote_path, check_md5=True):
"""Sync a single file"""
try:
# Ensure remote directory exists
remote_dir = os.path.dirname(remote_path)
try:
self.sftp.stat(remote_dir)
except FileNotFoundError:
self._create_remote_dir(remote_dir)
# MD5 verification
need_upload = True
if check_md5 and self.remote_file_exists(remote_path):
local_md5 = self.calculate_md5(local_path)
# Remote MD5 calculation (requires command execution)
stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}")
remote_md5 = stdout.read().decode().split()[0]
if local_md5 == remote_md5:
logger.info(f"File unchanged, skipping: {local_path}")
need_upload = False
if need_upload:
self.sftp.put(local_path, remote_path)
logger.info(f"Upload successful: {local_path} -> {remote_path}")
return True
return False
except Exception as e:
logger.error(f"Sync failed {local_path}: {str(e)}")
return False
def _create_remote_dir(self, remote_dir):
"""Recursively create remote directory"""
dirs = []
while remote_dir != '/':
dirs.append(remote_dir)
remote_dir = os.path.dirname(remote_dir)
for dir_path in reversed(dirs):
try:
self.sftp.stat(dir_path)
except FileNotFoundError:
self.sftp.mkdir(dir_path)
logger.info(f"Created directory: {dir_path}")
def sync_directory(self, local_dir, remote_dir, exclude_patterns=None):
"""Sync entire directory"""
exclude_patterns = exclude_patterns or []
synced_count = 0
skipped_count = 0
for root, dirs, files in os.walk(local_dir):
# Calculate relative path
rel_path = os.path.relpath(root, local_dir)
remote_root = os.path.join(remote_dir, rel_path).replace('\\', '/')
for file in files:
# Exclusion rules
if any(pattern in file for pattern in exclude_patterns):
continue
local_file = os.path.join(root, file)
remote_file = os.path.join(remote_root, file).replace('\\', '/')
if self.sync_file(local_file, remote_file):
synced_count += 1
else:
skipped_count += 1
logger.info(f"Sync complete: Uploaded {synced_count} files, skipped {skipped_count}")
if __name__ == '__main__':
ssh_config = {
'host': '192.168.1.100',
'port': 22,
'user': 'root',
'key_file': '~/.ssh/ops_rsa'
}
sync = FileSync(ssh_config)
sync.connect()
# Sync single file
# sync.sync_file('/local/config.yml', '/remote/config.yml')
# Sync directory
sync.sync_directory(
'/local/app',
'/remote/app',
exclude_patterns=['.git', '.pyc', '__pycache__']
)
sync.disconnect()
3.2 Practical Application Cases
◆ Case 1: Automated Certificate Renewal Check
Scenario: Manage SSL certificates for 100+ domains, need to detect certificates expiring within 30 days and send alerts.
Implementation Code:
python
#!/usr/bin/env python3
# File name: ssl_cert_checker.py
import ssl
import socket
from datetime import datetime, timedelta
import requests
class SSLCertChecker:
def __init__(self, domains, alert_days=30):
self.domains = domains
self.alert_days = alert_days
def check_cert_expiry(self, domain, port=443):
"""Check certificate expiration time"""
try:
context = ssl.create_default_context()
with socket.create_connection((domain, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
# Parse expiration time
expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_left = (expire_date - datetime.now()).days
return {
'domain': domain,
'expire_date': expire_date,
'days_left': days_left,
'issuer': dict(x[0] for x in cert['issuer'])
}
except Exception as e:
return {
'domain': domain,
'error': str(e)
}
def check_all(self):
"""Check all domains"""
results = []
alerts = []
for domain in self.domains:
result = self.check_cert_expiry(domain)
results.append(result)
if 'days_left' in result and result['days_left'] < self.alert_days:
alerts.append(f"{domain} certificate will expire in {result['days_left']} days")
return results, alerts
# Usage example
domains = ['example.com', 'api.example.com', 'www.example.com']
checker = SSLCertChecker(domains)
results, alerts = checker.check_all()
for result in results:
if 'days_left' in result:
print(f"{result['domain']}: {result['days_left']} days remaining")
else:
print(f"{result['domain']}: Check failed - {result['error']}")
if alerts:
print("\nAlerts:")
for alert in alerts:
print(f" - {alert}")
Execution Result:
text
example.com: 85 days remaining api.example.com: 12 days remaining www.example.com: 45 days remaining Alerts: - api.example.com certificate will expire in 12 days
◆ Case 2: Docker Container Resource Cleanup
Scenario: Regularly clean up containers stopped for more than 7 days, unused images and volumes, to free up disk space.
Implementation Code:
python
#!/usr/bin/env python3
# File name: docker_cleanup.py
import subprocess
import json
from datetime import datetime, timedelta
class DockerCleaner:
def __init__(self, dry_run=True):
self.dry_run = dry_run
def get_stopped_containers(self, days=7):
"""Get containers stopped for more than N days"""
cutoff_time = datetime.now() - timedelta(days=days)
cmd = "docker ps -a --format '{{json .}}'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
stopped_containers = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
container = json.loads(line)
if container['State'] != 'exited':
continue
# Get container details
inspect_cmd = f"docker inspect {container['ID']}"
inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True)
detail = json.loads(inspect_result.stdout)[0]
finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])
if finished_at < cutoff_time:
stopped_containers.append({
'id': container['ID'],
'name': container['Names'],
'finished_at': finished_at
})
return stopped_containers
def remove_containers(self, containers):
"""Remove containers"""
for container in containers:
cmd = f"docker rm {container['id']}"
if self.dry_run:
print(f"[DRY RUN] {cmd}")
else:
subprocess.run(cmd, shell=True)
print(f"Removed container: {container['name']}")
def prune_images(self):
"""Clean up unused images"""
cmd = "docker image prune -a -f"
if self.dry_run:
print(f"[DRY RUN] {cmd}")
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
def prune_volumes(self):
"""Clean up unused volumes"""
cmd = "docker volume prune -f"
if self.dry_run:
print(f"[DRY RUN] {cmd}")
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
def cleanup(self, container_days=7):
"""Execute cleanup"""
print(f"Starting cleanup (DRY RUN: {self.dry_run})")
# Clean up containers
containers = self.get_stopped_containers(container_days)
print(f"\nFound {len(containers)} containers stopped for more than {container_days} days")
self.remove_containers(containers)
# Clean up images
print("\nCleaning up unused images...")
self.pr