1、数据库备份脚本#
#!/usr/bin/env python3
"""
MySQL数据库自动备份脚本
支持定时备份、压缩、保留策略、日志记录
"""
import os
import sys
import subprocess
import datetime
import time
import gzip
import shutil
import logging
from logging.handlers import RotatingFileHandler
import json
import argparse
from pathlib import Path
class MySQLBackup:
def __init__(self, config_file=None):
# 默认配置
self.config = {
'mysql': {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '123456', # 建议使用配置文件或环境变量
'database': 'fdmtek-aps'
},
'backup': {
'backup_dir': '/data1/backups/mysql',
'retention_days': 30, # 保留30天
'compress': False,
'compress_level': 6, # 压缩级别 1-9
'timeout': 300 # mysqldump超时时间(秒)
},
'logging': {
'log_file': '/var/log/mysql_backup.log',
'log_level': 'INFO',
'max_size_mb': 10,
'backup_count': 5
}
}
# 初始化日志
self.setup_logging()
# 加载配置文件
if config_file and os.path.exists(config_file):
self.load_config(config_file)
# 检查必要工具
self.check_prerequisites()
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# 深度更新配置
def deep_update(target, source):
for key, value in source.items():
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
deep_update(target[key], value)
else:
target[key] = value
deep_update(self.config, user_config)
self.logger.info(f"已加载配置文件: {config_file}")
except Exception as e:
self.logger.error(f"加载配置文件失败: {e}")
raise
def setup_logging(self):
"""设置日志系统"""
log_config = self.config['logging']
# 创建日志目录
log_dir = os.path.dirname(log_config['log_file'])
os.makedirs(log_dir, exist_ok=True)
# 创建logger
self.logger = logging.getLogger('MySQLBackup')
self.logger.setLevel(getattr(logging, log_config['log_level']))
# 清除已有的handler
self.logger.handlers.clear()
# 文件handler(滚动日志)
file_handler = RotatingFileHandler(
log_config['log_file'],
maxBytes=log_config['max_size_mb'] * 1024 * 1024,
backupCount=log_config['backup_count'],
encoding='utf-8'
)
# 控制台handler
console_handler = logging.StreamHandler()
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def check_prerequisites(self):
"""检查必要的工具是否安装"""
required_tools = ['mysqldump', 'mysql']
for tool in required_tools:
try:
subprocess.run(['which', tool], check=True, capture_output=True)
except subprocess.CalledProcessError:
self.logger.error(f"未找到必要工具: {tool}")
raise RuntimeError(f"请先安装 {tool}")
self.logger.info("所有必要工具已就绪")
def create_backup_dir(self):
"""创建备份目录"""
backup_dir = self.config['backup']['backup_dir']
try:
os.makedirs(backup_dir, exist_ok=True)
self.logger.debug(f"备份目录: {backup_dir}")
return backup_dir
except Exception as e:
self.logger.error(f"创建备份目录失败: {e}")
raise
def backup_database(self):
"""执行数据库备份"""
mysql_config = self.config['mysql']
backup_config = self.config['backup']
# 创建备份目录
backup_dir = self.create_backup_dir()
# 生成备份文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(
backup_dir,
f"{mysql_config['database']}_{timestamp}.sql"
)
# 构建mysqldump命令
cmd = [
'mysqldump',
'--single-transaction',
'--routines',
'--triggers',
'--events',
'--add-drop-database',
'--databases', mysql_config['database'],
'--host', mysql_config['host'],
'--port', str(mysql_config['port']),
'--user', mysql_config['user'],
'--result-file', backup_file
]
# 添加密码(安全方式)
env = os.environ.copy()
if mysql_config['password']:
env['MYSQL_PWD'] = mysql_config['password']
try:
self.logger.info(f"开始备份数据库: {mysql_config['database']}")
self.logger.debug(f"备份命令: {' '.join(cmd)}")
# 执行备份
start_time = time.time()
result = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True,
timeout=backup_config['timeout']
)
elapsed_time = time.time() - start_time
if result.returncode != 0:
self.logger.error(f"备份失败: {result.stderr}")
return False
# 检查备份文件
if not os.path.exists(backup_file):
self.logger.error("备份文件未生成")
return False
file_size = os.path.getsize(backup_file)
self.logger.info(f"备份成功: {backup_file} ({file_size / 1024:.2f} KB)")
self.logger.info(f"备份耗时: {elapsed_time:.2f}秒")
# 压缩备份文件
if backup_config['compress']:
backup_file = self.compress_backup(backup_file, backup_config['compress_level'])
# 清理旧备份
self.clean_old_backups(backup_dir)
return backup_file
except subprocess.TimeoutExpired:
self.logger.error("备份操作超时")
return False
except Exception as e:
self.logger.error(f"备份过程中发生错误: {e}")
return False
def compress_backup(self, backup_file, compress_level=6):
"""压缩备份文件"""
try:
compressed_file = f"{backup_file}.gz"
self.logger.info(f"开始压缩备份文件: {backup_file}")
with open(backup_file, 'rb') as f_in:
with gzip.open(compressed_file, 'wb', compresslevel=compress_level) as f_out:
shutil.copyfileobj(f_in, f_out)
# 删除原始文件
os.remove(backup_file)
# 计算压缩比
original_size = os.path.getsize(compressed_file)
self.logger.info(f"压缩完成: {compressed_file}")
return compressed_file
except Exception as e:
self.logger.error(f"压缩备份文件失败: {e}")
return backup_file
def clean_old_backups(self, backup_dir):
"""清理旧的备份文件"""
retention_days = self.config['backup']['retention_days']
cutoff_time = time.time() - (retention_days * 24 * 3600)
try:
deleted_files = []
for item in os.listdir(backup_dir):
item_path = os.path.join(backup_dir, item)
# 只处理备份文件
if not (item.startswith(self.config['mysql']['database']) and
(item.endswith('.sql') or item.endswith('.sql.gz'))):
continue
# 检查文件时间
if os.path.getmtime(item_path) < cutoff_time:
os.remove(item_path)
deleted_files.append(item)
if deleted_files:
self.logger.info(f"已清理过期备份文件: {len(deleted_files)}个")
self.logger.debug(f"清理的文件: {', '.join(deleted_files)}")
except Exception as e:
self.logger.error(f"清理旧备份失败: {e}")
def test_mysql_connection(self):
"""测试MySQL连接"""
mysql_config = self.config['mysql']
cmd = [
'mysql',
'--host', mysql_config['host'],
'--port', str(mysql_config['port']),
'--user', mysql_config['user'],
'--execute', 'SELECT 1'
]
env = os.environ.copy()
if mysql_config['password']:
env['MYSQL_PWD'] = mysql_config['password']
try:
result = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True
)
if result.returncode == 0:
self.logger.info("MySQL连接测试成功")
return True
else:
self.logger.error(f"MySQL连接测试失败: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"MySQL连接测试异常: {e}")
return False
def list_backups(self):
"""列出所有备份文件"""
backup_dir = self.config['backup']['backup_dir']
if not os.path.exists(backup_dir):
self.logger.warning("备份目录不存在")
return []
backups = []
for item in sorted(os.listdir(backup_dir)):
item_path = os.path.join(backup_dir, item)
if os.path.isfile(item_path) and item.startswith(self.config['mysql']['database']):
stat = os.stat(item_path)
backups.append({
'name': item,
'size': stat.st_size,
'mtime': datetime.datetime.fromtimestamp(stat.st_mtime),
'path': item_path
})
return backups
def main():
parser = argparse.ArgumentParser(description='MySQL数据库备份工具')
parser.add_argument('--config', '-c', default='config.json',
help='配置文件路径 (默认: config.json)')
parser.add_argument('--test', action='store_true',
help='测试MySQL连接')
parser.add_argument('--list', action='store_true',
help='列出所有备份文件')
parser.add_argument('--run', action='store_true',
help='立即执行备份')
parser.add_argument('--daemon', action='store_true',
help='以守护进程方式运行,按配置定时备份')
args = parser.parse_args()
try:
backup = MySQLBackup(args.config)
if args.test:
backup.test_mysql_connection()
elif args.list:
backups = backup.list_backups()
if backups:
print(f"\n找到 {len(backups)} 个备份文件:")
print("-" * 80)
for b in backups:
print(f"{b['mtime']:%Y-%m-%d %H:%M:%S} | "
f"{b['size']/1024:8.1f} KB | {b['name']}")
else:
print("未找到备份文件")
elif args.run:
result = backup.backup_database()
if result:
print(f"备份成功: {result}")
else:
print("备份失败,请查看日志")
sys.exit(1)
elif args.daemon:
print("守护进程模式,请使用systemd或cron进行定时执行")
# 这里可以添加循环等待逻辑,但建议使用系统定时任务
else:
parser.print_help()
except Exception as e:
print(f"程序执行失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()python2、配置文件 config.json#
{
"mysql": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "fdmtek-aps"
},
"backup": {
"backup_dir": "/data1/backups/mysql",
"retention_days": 30,
"compress": true,
"compress_level": 6,
"timeout": 600
},
"logging": {
"log_file": "/var/log/mysql_backup.log",
"log_level": "INFO",
"max_size_mb": 10,
"backup_count": 5
}
}json3、验证脚本 verify_backup.py#
#!/usr/bin/env python3
import subprocess
import sys
import os
def verify_backup(backup_file):
"""验证备份文件"""
if backup_file.endswith('.gz'):
cmd = f"gzip -t {backup_file}"
else:
# 检查SQL文件格式
cmd = f"head -n 10 {backup_file} | grep 'MariaDB dump'"
result = subprocess.run(cmd, shell=True, capture_output=True)
return result.returncode == 0
if __name__ == "__main__":
if len(sys.argv) > 1:
backup_file = sys.argv[1]
if verify_backup(backup_file):
print(f"✓ 备份文件有效: {backup_file}")
else:
print(f"✗ 备份文件损坏: {backup_file}")
sys.exit(1)
else:
print("使用方法: python verify_backup.py <backup_file>")python4、使用说明#
# 1. 修改config.json中的数据库配置
# 2. 创建备份目录
sudo mkdir -p /data/backups/mysql
sudo chmod 755 /data/backups/mysql
# 3. 测试连接
python3 mysql_backup.py --config config.json --test
# 4. 手动执行一次备份
python3 mysql_backup.py --config config.json --runshell服务器定时执行脚本:
# 编辑crontab
crontab -e
# 添加以下内容,每天凌晨2点执行备份
0 2 * * * /usr/bin/python3 /data1/backups/mysql_backup/mysql_backup.py --config /data1/backups/mysql_backup/config.json --run >> /var/log/mysql_backup_cron.log 2>&1
# 查看crontab
crontab -lshell