技术文档库
文件管理
技术文档

GoldenDB 备份深度解析

15 章节 1 表格 docx 自动转换

深入剖析 GoldenDB 分布式数据库的备份机制与原理,涵盖全量备份、增量备份、日志归档等核心概念。

AIO 备份平台 GoldenDB 备份恢复深度解析

——源码级流程剖析 · 数据管道全链路 · 错误处理与回滚机制

基于 AIO v5.5.2 源码分析 10.7.16.216 备份一体机 编制日期:2026年6月

1. 目 录

第一章 平台架构深度解析

第二章 全量备份全链路剖析

第三章 XtraBackup 命令深度拆解

第四章 QoS 传输管道机制

第五章 ZFS 快照生命周期

第六章 增量备份 LSN 链条管理

第七章 元数据备份一致性保障

第八章 GTM 日志备份机制

第九章 恢复/挂载全链路剖析

第十章 错误处理与回滚机制

第十一章 Agent 通信协议

第十二章 端口管理与并发控制

附录 A 关键代码路径速查

附录 B 完整命令参考

2. 第一章 平台架构深度解析

2.1 1.1 架构全景图

2.2 1.2 三层架构的职责边界

AIO 平台采用三层解耦架构,每层有明确的职责边界和通信协议:

CDM 控制面(Flask + uWSGI + gevent)

职责:接收用户请求 → 参数校验 → 持久化任务状态 → 通过 SAL 触发 Airflow DAG → 管理策略/资源/元数据 → 前端展示

通信方式:REST API(v1/v2/v3)→ APScheduler(后台定时任务)→ Airflow(通过 SAL 调用)

关键组件:Alembic 数据库迁移(126+ 版本)、Vue.js SPA 前端、Redis 缓存

Airflow 编排面(CeleryExecutor)

职责:DAG 调度 → 任务分发 → Worker 执行 → 日志收集 → 回调 CDM

通信方式:Airflow Scheduler → Celery Broker(Redis)→ Celery Worker → 回调 API

关键组件:自定义 Operator(AIOPythonOperator/AIOBashOperator/AIOSSHOperator)、共享执行底座

共享执行底座(aio_tasks + aio_public_module + aio_lib)

职责:Operator 封装 → 远程执行(SSH/RPC)→ Shell 适配 → 数据库类型抽象 → 回调机制

三个库的分工:aio_tasks(任务工厂)、aio_public_module(operator/shell/remote)、aio_lib(command/crypto/nettools/storage)

2.3 1.3 CDM → Airflow 的触发链路

当用户在 Web 界面点击"立即备份"时,完整的调用链路为:

1. 前端发送 POST /api/v3/backup/trigger 请求

2. CDM API 层校验参数(资源是否存在、策略是否有效、权限是否足够)

3. Services 层调用 Airflow SAL(aio_sal.enc)

4. SAL 通过 Airflow REST API 触发 DAG Run,传入 conf 参数

5. Airflow Scheduler 分发任务到 Celery Worker

6. Worker 执行 DAG 中的每个 Task

7. 执行完成后通过 callback API 回调 CDM 更新任务状态

2.4 1.4 关键配置文件链

配置的生成和传递链路:

cfg/aio.env (手工配置)
  ↓ init.cdm.sh / init.airflow.sh
cfg/cdm.runtime.env + cfg/airflow.runtime.env (生成)
  ↓
/etc/profile.d/cdm.runtime.sh + airflow.runtime.sh (导出)
  ↓
CDM/Airflow 服务启动时加载

3. 第二章 全量备份全链路剖析

3.1 2.1 流程全景图

3.2 2.2 DAG 入口:tiny_goldendb

全量备份的 DAG 入口是 tiny_goldendb,它是一个"薄 DAG"——定义了任务编排顺序,但实际逻辑全部委托给 aio_tasks 模块。

# tiny_goldendb.py 核心结构
dag = DAG(dag_id="tiny_goldendb", ...)

(
  backup_policy(dag)  # BranchPythonOperator: 根据 dag_type 分支
  >> [
    full_backup(dag, "full_backup_policy", backup_trigger),
    inc_backup(dag, "inc_backup_policy", backup_trigger),
    log_backup(dag, "log_backup_policy", backup_trigger),
  ]
)

backup_policy 是一个 BranchPythonOperator,它根据 dag_run.conf 中的 dag_type 参数决定走哪个分支:full_backup / inc_backup / log_backup。

3.3 2.3 全量备份 DAG:goldendb_full_backup_agent

全量备份的完整 DAG 定义在 backup_full_agent_goldendb_task.py 中,任务链为:

select_agent_or_ssh # BranchPythonOperator: 选择 Agent 或 SSH 模式 ↓ (Agent 模式) worker_and_resource_env_check # 检查 Worker 和源端环境 ↓ get_goldendb_mysql_size # 获取源端数据大小 ↓ create_volume_on_storage # 在 ZFS 存储池创建/扩容卷 ↓ get_source_mysql_conf # 下载源端 my.cnf ↓ running_backup_command # 执行 XtraBackup 备份(核心!) ↓ backup_data_prepare # Prepare 备份数据 ↓ backup_goldendb_metadata # 备份 GoldenDB 元数据 ↓ create_snapshot # 创建 ZFS 快照 ↓ record_backup_meta # 记录备份元数据到 CDM

3.4 2.4 深度剖析:worker_and_resource_env_check

这是备份前的环境检查环节,代码位于 backup_goldendb_agent.py 的 worker_and_resource_env_check_callable 函数:

Worker 侧检查(本地)

1. 验证 XtraBackup 可执行:xtrabackup{version} --version
2. 验证 RPC 工具可用:{LOCAL_RPC} --version

3. 验证 lsof 工具可用:{AIO_AIRFLOW_TOOLS_LSOF} -v

4. 验证 lz4 压缩工具可用:lz4 --version
5. 验证 xbstream 可用:xbstream{version} --help
6. 启动本地 RPC 服务:LocalRpcServer().start()

源端资源检查(通过 RPC 远程执行)

1. 获取源端系统架构:arch(返回 x86_64 或 aarch64)

2. 验证源端 RPC 工具可用:{rpc_path}/rpc --version
3. 验证源端 XtraBackup 可用:{remote_xtrabackup} --version
4. 推送架构信息到 Xcom:ti.xcom_push(key="sys_arch", value=sys_arch)

数据库连接检查

1. 通过 GoldenDBSerialize 获取 Manager 和 Data 节点连接信息

2. 测试 Manager 节点 MySQL 连接:can_connect_to_mysql(manager_ip, port)

3. 测试 Data 节点 MySQL 连接:can_connect_to_mysql(data_ip, port)

3.5 2.5 深度剖析:get_goldendb_mysql_size

获取源端数据大小的逻辑(get_mysql_size 函数):

1. 连接 MySQL (SHOW VARIABLES LIKE 'datadir') 2. 通过 RPC 在源端执行 du -s <datadir> 获取数据目录大小 3. 连接 MySQL (SHOW VARIABLES LIKE 'innodb_undo_directory') 4. 通过 RPC 在源端执行 du -s <undodir> 获取 undo 目录大小 5. 返回 datadir_m_size + undodir_m_size (MB)

这个大小用于后续创建 ZFS 卷时的空间预分配。

3.6 2.6 深度剖析:running_backup_command(核心!)

这是整个备份流程的核心环节,代码位于 backup_goldendb_agent.py 的 running_backup_command_callback 函数:

Step 1: 清理旧备份目录

full_dir, full = os.path.split(backup_folder)
run_local_command(f"cd {full_dir} && rm -rf {full} && mkdir -p {full}", check=False)

处理重试场景:如果之前的备份失败留下了残留数据,先清理再重建目录。check=False 表示目录不存在也不报错。

Step 2: 构建 QoS 接收命令

start_qos_cmd = (
    f"{AIO_AIRFLOW_TOOLS_QOS} --server "
    + "--port={port} | lz4 -d -B4 | "
    + f"xbstream -x -C '{backup_folder}'"
)

QoS 服务启动后会:监听指定端口 → 接收数据 → lz4 解压 → xbstream 解包 → 写入备份目录

Step 3: 启动 QoS 服务(端口分配)

start_qos_service_callback 函数的执行逻辑:

1. volume_busy_check(): 检查目标目录是否被其他备份进程占用
   → 如果占用,kill 占用进程

2. 获取文件锁: fcntl.lockf(lock_fd, fcntl.LOCK_EX)
   → 防止多个并发任务同时分配端口

3. 读取已使用端口: open(UNUSE_PORT_FILE)
   → 获取当前占用的端口列表

4. 遍历端口范围 12000-13001:
   → 检查每个端口是否被占用 (ps -ef | grep port)
   → 找到第一个可用端口

5. 启动 QoS 服务:
   subprocess.Popen(start_qos_cmd, shell=True)
   sleep(5) 等待服务就绪

6. 记录已使用端口: 写入 USED_PORT_FILE

7. 释放文件锁: fcntl.lockf(lock_fd, fcntl.LOCK_UN)

8. 返回 (qos_service_port, process_object)

Step 4: 构建并执行 XtraBackup 命令

command = (
    f"sh -c 'set -o pipefail && sudo {remote_pxb} "
    f"--defaults-file={conf[node_mysql_cnf_path]} "
    f"--host={conf[node_ipaddr]} "
    f"--user={conf[node_db_username]} "
    f"--port={conf[node_db_port]} "
    f"--password={conf[node_db_passwd]} "
    f"--datadir={conf[data_dir]} "
    f"--parallel={conf[xtrabackup_parallel]} "
    "--slave_info --backup --no-server-version-check --stream=xbstream | "
    "lz4 -B4 | "
    f"qos --host={conf[sys_dn_ipaddr]} --port={qos_service_port}")

关键点:

• set -o pipefail:管道中任何命令失败都会导致整个管道失败

• sudo:XtraBackup 需要 root 权限读取数据文件
• --slave_info:记录从库 Binlog 位置,用于搭建从库
• --stream=xbstream:流式输出,不写本地文件

• 整个命令通过 RPC 在 GoldenDB 源端执行

• 数据流向:XtraBackup → lz4 压缩 → QoS 传输 → Worker 端接收

Step 5: 错误处理与回滚

如果备份命令执行失败,会触发回滚逻辑:

except Exception as e:
    logger.error(f"运行备份命令失败: {e}")
    data = {
        "revert": True,
        "backup_folder": backup_folder,
        "volume_name": conf["full_volume_name"],
        "latest_snapshot_name": conf["latest_snapshot_name"],
    }
    revert_volume_when_backup_failed(**data)

回滚操作会:删除已创建的 ZFS 卷 → 恢复到上一个快照 → 清理残留文件

Step 6: 停止 QoS 服务

finally:
    logger.info("Stop Qos Service")
    stop_qos_service_callback(qos_service_port)

无论成功还是失败,都会停止 QoS 服务并释放端口。

4. 第三章 XtraBackup 命令深度拆解

4.1 3.1 命令结构图

4.2 3.2 全量备份命令完整拆解

sudo xtrabackup \
  --defaults-file=/data/zxdb4/etc/my.cnf \     ← GoldenDB DN 实例配置
  --host=10.7.16.65 \                           ← 源端 DN 节点 IP
  --user=xtrabackup \                           ← 数据库备份用户
  --port=5504 \                                  ← DN 数据端口
  --password=xxx \                               ← 数据库密码
  --datadir=/data/zxdb4/data \                  ← InnoDB 数据目录
  --parallel=4 \                                 ← 并行拷贝线程数
  --slave_info \                                 ← 记录从库 Binlog 位置
  --backup \                                     ← 执行备份操作
  --no-server-version-check \                   ← 跳过版本兼容检查
  --stream=xbstream \                            ← 流式输出 xbstream 格式
  | lz4 -B4 \                                    ← LZ4 压缩(块大小 4KB)
  | qos --host=10.7.16.216 --port=12001          ← QoS 传输到 Worker

4.3 3.3 增量备份命令差异

增量备份相比全量,多了两个关键参数:

--incremental \                                ← 标记为增量备份
  --incremental-lsn=15188961605 \                ← 基于指定 LSN 做增量

LSN 的来源:从全量备份目录的 xtrabackup_checkpoints 文件中读取 to_lsn 字段。

# 读取 LSN 的 Shell 命令
echo `cat /volmountpoint/aiopool/<vol>/full/xtrabackup_checkpoints \
  | grep "to_lsn" | awk -F "=" '{print $2}'`

4.4 3.4 SSH 模式的命令差异

当 Agent 不可用时,AIO 支持通过 SSH 直接执行备份。SSH 模式的命令通过 AIOSSHOperator 发送:

# SSH 模式:通过 SSH 在源端执行,数据通过 SSH 管道回传
sudo xtrabackup --defaults-file=... --stream=xbstream \
  | lz4 -B4 \
  | ssh -p <port> <user>@<ip> 'lz4 -d -B4 | sudo xbstream -x -C <dir>'

对比 Agent 模式(QoS)和 SSH 模式:

• Agent 模式:通过 RPC 下发命令,QoS 传输数据 → 更安全、端口可控

• SSH 模式:通过 SSH 下发命令,SSH 管道传输数据 → 更简单、但依赖 SSH 稳定性

5. 第四章 QoS 传输管道机制

5.1 4.1 管道架构图

5.2 4.2 QoS vs NC 对比

AIO 早期使用 NC(netcat)进行数据传输,后升级为 QoS(Quality of Service)工具:

NC 模式(已废弃)

# Worker 端启动接收
nc -l -p 12001 | lz4 -d -B4 | xbstream -x -C '<backup_dir>'

# 源端发送
xtrabackup --stream=xbstream | lz4 -B4 | nc <worker_ip> 12001

缺点:无端口管理、无进程守护、端口冲突频繁、无传输限速

QoS 模式(当前使用)

# Worker 端启动接收
qos --server --port=12001 | lz4 -d -B4 | xbstream -x -C '<backup_dir>'

# 源端发送
xtrabackup --stream=xbstream | lz4 -B4 | qos --host=<worker_ip> --port=12001

优势:端口管理(文件锁)、进程守护、传输限速、自动重试

5.3 4.3 端口分配机制深度剖析

QoS 端口分配是一个精心设计的并发安全机制:

1. 文件锁保护:
   lock_fd = open(NC_LOCK_FILE, "w+")
   fcntl.lockf(lock_fd, fcntl.LOCK_EX)  # 排他锁

2. 端口范围:
   PORT_RANGE_START = 12000
   PORT_RANGE_END = 13001
   共 1001 个可用端口

3. 已使用端口记录:
   文件: scripts/used_port.json
   格式: 每行一个端口号

4. 端口检查:
   ps -ef | grep "<port>" | grep -v grep
   如果有进程占用,跳过该端口

5. 端口释放:
   备份完成后从 used_port.json 中移除
   释放文件锁

5.4 4.4 数据压缩策略

AIO 使用 LZ4 压缩(而非 ZSTD),原因:

• LZ4 压缩速度极快(~700 MB/s 压缩,~4000 MB/s 解压)

• 压缩比 2:1~3:1,足够节省网络带宽

• CPU 开销极低,不影响数据库性能

• -B4 参数:块大小 4KB,适合流式传输

6. 第五章 ZFS 快照生命周期

6.1 5.1 ZFS 在 AIO 中的角色

ZFS 在 AIO 中承担三重角色:

1. 存储引擎:ZVol(ZFS Volume)作为块设备存储备份数据

2. 快照保护:每次备份完成后创建快照,确保数据一致性

3. 空间管理:自动压缩、去重、精简配置

6.2 5.2 卷创建流程

# 创建 ZFS 卷(AIO 自动执行)
zfs create -V <size>G -o volblocksize=<block_size> aiopool/<volume_name>

# 挂载到备份目录
mount /dev/zvol/aiopool/<volume_name> /volmountpoint/aiopool/<volume_name>

卷创建前会检查存储池可用空间:如果可用空间 < 总空间的 20%,自动报错拒绝创建。

6.3 5.3 快照创建流程

# 备份完成后执行
sync  # 强制刷盘,确保数据一致性
zfs snapshot aiopool/<volume_name>@<execution_batch>

# execution_batch 是批次号,用于标识每次备份
# 快照几乎不占额外空间(只记录差异)

6.4 5.4 快照回滚(恢复时)

# 回滚到指定快照
zfs rollback aiopool/<volume_name>@<snapshot_name>

# 注意:回滚会丢失快照之后的所有修改

6.5 5.5 快照删除(清理时)

# 删除指定快照
zfs destroy aiopool/<volume_name>@<snapshot_name>

# AIO 会根据保留策略自动清理过期快照

7. 第六章 增量备份 LSN 链条管理

7.1 6.1 LSN 链条图

7.2 6.2 LSN 读取机制

增量备份的第一步是读取全量备份的 LSN,这一步在 Airflow DAG 中定义为:

# get_full_backup_lsn 任务
bash_command = f"""
  echo `cat {FULL_BACKUP_FOLDER}/xtrabackup_checkpoints \
    | grep "to_lsn" | awk -F "=" '{{print $2}}'`
"""

# 读取结果通过 Xcom 传递给后续任务
# 后续任务通过 ti.xcom_pull(task_ids="get_full_backup_lsn") 获取

7.3 6.3 xtrabackup_checkpoints 文件解析

# 全量备份的 checkpoints
backup_type = full-backuped
from_lsn = 0                    ← 起始 LSN(全量从 0 开始)
to_lsn = 15188961605            ← 结束 LSN
last_lsn = 15188961790          ← 最新 LSN(含备份后变化)

# 增量备份的 checkpoints
backup_type = incremental
from_lsn = 15188961605          ← 必须等于全量的 to_lsn
to_lsn = 15188962100            ← 增量结束 LSN
last_lsn = 15188962300

7.4 6.4 增量备份的 Prepare 链

增量备份恢复时需要按顺序合并,Prepare 链为:

# Step 1: Prepare 全量(加 --apply-log-only)
xtrabackup --prepare --use-memory=1GB --apply-log-only \
  --target-dir=<full_dir>

# Step 2: Apply 增量(加 --apply-log-only)
xtrabackup --prepare --use-memory=1GB --apply-log-only \
  --target-dir=<full_dir> \
  --incremental-dir=<incr_dir>

# Step 3: 最终 Prepare(不加 --apply-log-only)
xtrabackup --prepare --use-memory=1GB \
  --target-dir=<full_dir> \
  --incremental-dir=<incr_dir>
为什么最后一步不能加 --apply-log-only?因为不加这个参数,XtraBackup 会在最后执行 "roll back uncommitted transactions",确保所有未提交的事务被回滚。如果加了,这个步骤会被跳过,数据可能不一致。

8. 第七章 元数据备份一致性保障

8.1 7.1 元数据备份流程图

GoldenDB 元数据备份的 DAG 为 goldendb_meta_backup,任务链为:

source_prepare → worker_prepare → backup_meta → record

8.2 7.2 元数据备份深度剖析

代码位于 backup_goldendb_agent.py 的 get_metadata_callback 函数:

Step 1: SSH 到 Manager 节点

gdb_serialize = GoldenDBSerialize(conf)
with RemoteClient(**gdb_serialize.manager.manager_conn) as mgr_client:
    # 获取 Manager 用户的 home 目录
    pwd_cmd = f"sudo su - {manager_username} -c 'cd ~ && pwd'"
    result = mgr_client.run(pwd_cmd)
    manager_pwd = result.stdout.strip()

Step 2: 调用 GoldenDB 官方脚本

remote_dir_path = os.path.join(manager_pwd, "activeio/backup_metadata", str(node_id))
    command = (
        f"sudo su - {manager_username} -c 'mkdir -p {remote_dir_path}' && "
        f"sudo su - {manager_username} -c \
        'python ~/bin/backup_metadata.py \
        --cluster_id={cluster_id} --backup_path={remote_dir_path}'"
    )
    mgr_client.run(command)

Step 3: 下载元数据文件

file_list = [
        "DBPasswdInfos.dat",      # 数据库密码信息
        "DictionaryInfos.dat",    # 数据字典
        "DictionaryInfos.idx",    # 数据字典索引
        "TableIndexInfos.dat",    # 表索引信息
        "TableIndexInfos.idx",    # 表索引索引
    ]
    with RemoteClient(**gdb_serialize.manager.data_conn) as data_client:
        for file in file_list:
            data_client.get(
                remote=os.path.join(remote_dir_path, file),
                local=os.path.join(local_metadata_path, file)
            )

Step 4: 打包压缩

tar_command = (
        f"cd {local_dir_path} && "
        f"tar -zcvf {execution_batch}.tar.gz ./{execution_batch}/* && "
        f"mv -f {execution_batch}.tar.gz {execution_batch}"
    )
    run_local_command(tar_command)

9. 第八章 GTM 日志备份机制

9.1 8.1 GTM 日志备份流程图

GTM 日志备份的 DAG 为 goldendb_gtmlog_backup,流程为:

checkRemoteServer → get_active_tran_filename # 获取活动事务文件名和大小 → get_data_total_size # 计算总数据大小 → worker_prepare # Worker 环境准备 → create_dir # 创建存储目录 → get_active_tran_data # 下载活动事务数据 → dummy_callback_task # 成功回调

9.2 8.2 活动事务文件获取

代码位于 gtmlog_goldendb.py 的 get_active_tran_filename_callback 函数:

1. 通过 GoldenDBSerialize 获取 Manager 连接信息
2. SSH 到 Manager 节点
3. 获取活跃事务路径和归档路径
4. 列出目录中的文件及大小
5. 过滤出需要备份的文件
6. 计算总文件大小
7. 推送到 Xcom: ti.xcom_push(key="active_file_size", value=file_size)

9.3 8.3 数据下载

get_active_tran_data_callback 函数通过 SSH/SCP 从 Manager 节点下载活动事务数据到 Worker 本地目录。

10. 第九章 恢复/挂载全链路剖析

10.1 9.1 恢复流程图

10.2 9.2 两种恢复方式

完整恢复(Physical Restore)

将备份数据恢复到新的 GoldenDB 实例,用于灾难恢复。

1. 选择备份快照
2. 创建恢复卷(基于 ZFS 快照)
3. Prepare 备份数据
   xtrabackup --prepare --target-dir=<backup_dir>
4. 停止目标 MySQL 实例
5. 清理数据目录
   rm -rf /data/zxX/data/*
6. 拷贝备份数据
   xtrabackup --copy-back --target-dir=<backup_dir> --datadir=<data_dir>
7. 修改权限
   chown -R mysql:mysql <data_dir>
8. 恢复 GoldenDB 元数据
   python ~/bin/restore_metadata.py ...
9. 启动 MySQL 实例
10. 验证数据完整性

挂载恢复(Mount)

将备份卷挂载为只读,用于数据查询和验证,不启动 MySQL 实例。

1. 选择备份快照 2. 创建可写卷(基于 ZFS 快照) 3. 通过 iSCSI 挂载到目标主机 4. 数据目录归位(软链接) 5. 可选:启动只读实例

10.3 9.3 clear_mount 流程

挂载使用完毕后,需要清理挂载点:

goldendb_clear_mount DAG: 1. 卸载 iSCSI 卷 2. 删除临时数据目录 3. 恢复原始数据目录 4. 清理 ZFS 卷

11. 第十章 错误处理与回滚机制

11.1 10.1 Airflow 级别的错误处理

每个 DAG 都配置了 on_failure_callback:

# 全量备份失败回调
on_failure_callback=lambda context: backup_trigger_callback("full", "failed", context)

# 增量备份失败回调
on_failure_callback=lambda context: backup_trigger_callback("inc", "failed", context)

# 日志备份失败回调
on_failure_callback=lambda context: sub_crontab_callback("failed", "gtmlog", context)

11.2 10.2 任务级别的回滚

在 backup_full_agent_goldendb_task.py 中,running_backup_command 任务配置了 revert 机制:

revert_backup_command_failed_rollback = lambda dag, backup_folder: AIOPythonOperator( task_id="revert_backup_command_failed_rollback", python_callable=revert_volume_when_backup_failed, op_kwargs={"revert": False}, revert_op_kwargs={ "revert": '{{ dag_run.conf["revert_after_fail"] }}', "latest_snapshot_name": '{{ dag_run.conf["latest_snapshot_name"] }}', "backup_folder": backup_folder, "volume_name": '{{ dag_run.conf["full_volume_name"] }}', }, )

revert_volume_when_backup_failed 函数的逻辑:

1. 检查 revert 参数是否为 True

2. 如果需要回滚:

a. 删除已创建的 ZFS 快照

b. 回滚 ZFS 卷到上一个快照

c. 清理备份目录中的残留文件

3. 记录回滚日志

11.3 10.3 备份命令内部的错误处理

# running_backup_command_callback 中的错误处理
try:
    qos_service_port = start_qos_service_callback(...)
    run_remote_command(command, rpc_conn, output_cmd=False)
except Exception as e:
    logger.error(f"运行备份命令失败: {e}")
    # 触发回滚
    revert_volume_when_backup_failed(
        revert=True, backup_folder=..., volume_name=..., ...
    )
finally:
    # 无论成功失败,都停止 QoS 服务
    stop_qos_service_callback(qos_service_port)

11.4 10.4 set -o pipefail 的作用

备份命令使用 set -o pipefail 确保管道中任何命令失败都会被捕获:

# 没有 pipefail 时:
xtrabackup ... | lz4 ... | qos ...
# 如果 xtrabackup 失败,lz4 和 qos 仍然会执行(因为管道返回最后一个命令的退出码)

# 有 pipefail 时:
set -o pipefail && xtrabackup ... | lz4 ... | qos ...
# 如果 xtrabackup 失败,整个管道立即返回失败退出码

12. 第十一章 Agent 通信协议

12.1 11.1 RDB Agent 架构图

12.2 11.2 RPC 通信机制

AIO 通过 RPC(Remote Procedure Call)与 GoldenDB 源端的 Agent 通信:

# RPC 连接信息
rpc_conn = {
    "host": conf["node_ipaddr"],      # 源端 IP
    "port": conf["node_agent_install_port"]  # Agent 端口
}

# 通过 RPC 执行远程命令
run_remote_command("xtrabackup --version", rpc_conn)

run_remote_command 的实现:

def run_remote_command(cmd, conn, check=True, ...): # 构建 RPC 命令 cmd = f"{LOCAL_RPC} -h {conn['host']} -p {conn['port']} -c \" {cmd} \" # 本地执行 RPC 客户端 result = run_local_command(cmd, check, ...) if RPC_CONNECT_ERROR in result.stderr: raise Exception(f"RPC 服务连接异常...") return result

12.3 11.3 Agent 安装路径

# Agent 工具安装路径
/opt/aio/airflow/tools/mysql/xtrabackup/8.0-linux-x86_64/xtrabackup
/opt/aio/airflow/tools/qos/x86_64/qos
/opt/aio/airflow/tools/rpc/x86_64/rpc

# GoldenDB 源端脚本
/opt/aio/scripts/goldendb/check_goldendb.sh

13. 第十二章 端口管理与并发控制

13.1 12.1 端口分配流程

13.2 12.2 并发控制机制

当多个备份任务并发执行时,端口分配通过文件锁实现互斥:

# 并发控制流程
1. 打开锁文件: lock_fd = open("nc_lock.lock", "w+")
2. 获取排他锁: fcntl.lockf(lock_fd, fcntl.LOCK_EX)
3. 读取已使用端口: open("used_port.json")
4. 遍历端口范围 12000-13001
5. 检查每个端口是否被占用
6. 分配第一个可用端口
7. 记录到 used_port.json
8. 释放锁: fcntl.lockf(lock_fd, fcntl.LOCK_UN)

13.3 12.3 端口冲突处理

如果端口被占用,AIO 会:

1. 检查占用进程是否是之前的备份残留

2. 如果是残留进程,kill -9 清理

3. 重新检查端口可用性

4. 如果端口范围耗尽,抛出异常

13.4 12.4 端口释放机制

# 备份完成后释放端口
def stop_qos_service_callback(qos_service_port):
    # 1. 杀死 QoS 进程
    process_list = check_port_process(qos_service_port)
    for process in process_list:
        run_local_command(f"kill {process}", check=False)

    # 2. 从 used_port.json 中移除
    with open(UNUSE_PORT_FILE, "r") as fp:
        used_ports = fp.read().strip().split("\n")
    used_ports.remove(qos_service_port)
    with open(UNUSE_PORT_FILE, "w") as fp:
        fp.write("\n".join(used_ports))

14. 附录 A 关键代码路径速查

airflow/airflow/dags/tiny_goldendb.pyDAG 入口backup_policy >> [full, inc, log]
aio_tasks/backup_full_agent_goldendb_task.py全量备份 DAGrunning_backup_command, create_volume
aio_tasks/backup_inc_agent_goldendb_task.py增量备份 DAGrunning_inc_backup_command
aio_tasks/backup_goldendb_task.py备份任务工厂GoldenDBOperator, runBackupCommand
aio_tasks/dependencies/backup_goldendb_agent.py核心备份逻辑running_backup_command_callback
aio_tasks/operators/backup.py备份 OperatorAIOFullBackupSSHOperator
aio_tasks/dependencies/config.py配置常量POOL_NAME, TOOLS_PATH
aio_tasks/dependencies/gtmlog_goldendb.pyGTM 日志备份get_active_tran_filename_callback
aio_tasks/dependencies/callback.py回调机制sub_crontab_callback
scripts/goldendb/check_goldendb.sh环境检测check_goldendb.sh
cfg/aio.env平台配置AIO_HOME, AIO_DB_HOSTNAME
docs/goldendb_agent_说明文档.mdAgent 文档安装、权限、原理

15. 附录 B 完整命令参考

# === 全量备份(Agent 模式) ===
sh -c 'set -o pipefail && sudo xtrabackup \
  --defaults-file=<cnf> --host=<IP> --user=<USER> --port=<PORT> \
  --password=<PWD> --datadir=<DATADIR> --parallel=<N> \
  --slave_info --backup --no-server-version-check --stream=xbstream \
  | lz4 -B4 | qos --host=<DN_IP> --port=<PORT>'

# === 增量备份(Agent 模式) ===
sh -c 'set -o pipefail && sudo xtrabackup \
  --defaults-file=<cnf> --host=<IP> --incremental \
  --incremental-lsn=<LSN> --slave_info \
  --user=<USER> --port=<PORT> --password=<PWD> \
  --datadir=<DATADIR> --parallel=<N> \
  --backup --no-server-version-check --stream=xbstream \
  | lz4 -B4 | qos --host=<DN_IP> --port=<PORT>'

# === 全量备份(SSH 模式) ===
sh -c 'set -o pipefail && sudo xtrabackup \
  --defaults-file=<cnf> --host=<IP> --user=<USER> --port=<PORT> \
  --password=<PWD> --datadir=<DATADIR> --parallel=<N> \
  --slave_info --backup --stream=xbstream \
  | lz4 -B4 | ssh -p <PORT> <USER>@<IP> \
  'lz4 -d -B4 | sudo xbstream -x -C <DIR>'

# === Worker 端接收(QoS) ===
qos --server --port=<PORT> | lz4 -d -B4 | xbstream -x -C '<DIR>'

# === Worker 端接收(NC,已废弃) ===
nc -l -p <PORT> | lz4 -d -B4 | xbstream -x -C '<DIR>'

# === Prepare 全量 ===
xtrabackup --prepare --use-memory=1GB --target-dir=<FULL_DIR>

# === Prepare 增量(中间步骤) ===
xtrabackup --prepare --use-memory=1GB --apply-log-only \
  --target-dir=<FULL_DIR> --incremental-dir=<INCR_DIR>

# === Prepare 增量(最终步骤) ===
xtrabackup --prepare --use-memory=1GB \
  --target-dir=<FULL_DIR> --incremental-dir=<INCR_DIR>

# === 恢复 ===
xtrabackup --copy-back --target-dir=<FULL_DIR> --datadir=<DATADIR>

# === 元数据备份 ===
python ~/bin/backup_metadata.py --cluster_id=<ID> --backup_path=<PATH>

# === ZFS 快照 ===
sync && zfs snapshot aiopool/<VOL>@<BATCH>

# === ZFS 回滚 ===
zfs rollback aiopool/<VOL>@<SNAPSHOT>

# === 环境检测 ===
bash /opt/aio/scripts/goldendb/check_goldendb.sh
本文档由系统自动从 docx 文件转换生成 · 如有排版问题请访问 文件管理 下载原始文档