InfluxDB 数据库迁移与增量数据同步实战

一、项目背景

在企业级应用中,数据库迁移是一项常见的运维任务。本文将详细介绍如何将 InfluxDB 数据库从一台服务器(A服务器)迁移到另一台服务器(B服务器),并重点讲解迁移过程中时间段增量数据同步的实现方案。

1.1 迁移场景

  • 源服务器(A服务器): 172.16.231.218,运行 InfluxDB 服务,端口 8087
  • 目标服务器(B服务器): 172.16.231.219,部署新的 InfluxDB 服务,端口 8086
  • 涉及数据库: objiot(物联网设备数据)、accdetect(加速度检测数据)

1.2 数据同步需求

由于迁移过程中业务系统仍在运行,需要同步迁移前后的数据差异:

同步时间范围:2025-05-15 16:00:00 —— 2025-05-18 11:00:00

二、技术实现方案

2.1 整体架构

┌─────────────────┐         备份         ┌─────────────────┐
│   A服务器        │ ──────────────────> │   备份文件       │
│ (172.16.231.218)│                     │   (/mnt/nas/)   │
└─────────────────┘                     └────────┬────────┘
                                                 │ scp
                                                 ▼
┌─────────────────┐         恢复         ┌─────────────────┐
│   B服务器        │ <───────────────── │   备份文件       │
│ (172.16.231.219)│                     └─────────────────┘
└────────┬────────┘
         │
         │ 增量同步(时间段数据)
         ▼
┌─────────────────┐
│  C#同步工具      │
│ (InfluxdbTool)  │
└─────────────────┘

2.2 技术选型

技术组件 版本 用途
InfluxDB 2.x 时序数据库
.NET 10.0 同步工具开发框架
InfluxDB.Client Latest InfluxDB .NET SDK
Serilog Latest 日志框架

三、关键代码解析

3.1 数据模型定义

3.1.1 DeviceState(终端状态)
[Measurement("DeviceState")]
public class DeviceState
{
    [Column(IsTimestamp = true)]
    public DateTime CreaTime { get; set; }

    [Column(IsTag = true)]
    public string Key { get; set; }

    [Column("Value")]
    public string Value { get; set; }
}

关键点说明

  • [Measurement] 特性指定 InfluxDB 中的表名
  • [Column(IsTimestamp = true)] 标记时间戳字段
  • [Column(IsTag = true)] 标记索引标签字段
3.1.2 HGRealtimeData(终端实时数据)
[Measurement("HGRealtimeData")]
public class HGRealtimeData
{
    [Column(IsTimestamp = true)]
    public DateTime CreaTime { get; set; }

    [Column(IsTag = true)]
    public string Key { get; set; }

    [Column("Value")]
    public string Value { get; set; }
}

3.2 主程序入口

static void Main(string[] args)
{
    try
    {
        Dijing.SerilogExt.InitLog.SetLog(Dijing.SerilogExt.RunModeEnum.Debug);

        // 定义同步时间段
        var beginTime = DateTime.Parse("2025-05-15 16:00:00");
        var endTime = DateTime.Parse("2025-05-18 11:00:00");

        // 初始化源端和目标端配置
        var influxFromOption = InitInfluxdbFromOption();
        var influxToOption = InitInfluxdbToOption();

        // 同步 DeviceState 数据
        SyncMeasurement<DeviceState>("DeviceState", beginTime, endTime, influxFromOption, influxToOption);

        // 同步 HGRealtimeData 数据
        SyncMeasurement<HGRealtimeData>("HGRealtimeData", beginTime, endTime, influxFromOption, influxToOption);
    }
    catch (Exception ex)
    {
        Log.Error(ex, "数据迁移失败");
    }
}

3.3 InfluxDB 配置初始化

private static InfluxOption InitInfluxdbFromOption()
{
    return new InfluxOption
    {
        Url = "http://<SOURCE_SERVER>:8086",      // 源服务器地址
        Token = "<YOUR_API_TOKEN>",
        Bucket = "objiot",
        Org = "ruiyun"
    };
}

private static InfluxOption InitInfluxdbToOption()
{
    return new InfluxOption
    {
        Url = "http://<TARGET_SERVER>:8086",      // 目标服务器地址
        Token = "<YOUR_API_TOKEN>",
        Bucket = "objiot",
        Org = "ruiyun"
    };
}

3.4 增量同步核心逻辑

// 获取指定时间段内的所有设备Key
private static List<string> GetInlfuxdbKeys(string measurement, DateTime beginTime, DateTime endTime, InfluxOption influxOption)
{
    var keys = Dijing.InfluxdbExt.Influxdb2Helper.GetKeys(measurement, beginTime, endTime, influxOption, 1);
    return keys ?? new List<string>();
}

// 查询指定Key的DeviceState数据
private static List<DeviceState> GetDeviceStateList(string key, DateTime beginTime, DateTime endTime, InfluxOption influxOption)
{
    return Dijing.InfluxdbExt.Influxdb2Helper.GetAll<DeviceState>(key, beginTime, endTime, influxOption);
}

// 批量插入DeviceState数据
private static void BatchInsertDeviceState(List<DeviceState> list, InfluxOption influxOption)
{
    Dijing.InfluxdbExt.Influxdb2Helper.Writes<DeviceState>(list, influxOption);
}

四、操作步骤

4.1 数据库备份(A服务器)

# 备份 objiot 数据库
influx backup /mnt/nas --bucket objiot -t <YOUR_API_TOKEN>

# 备份 accdetect 数据库
influx backup /mnt/nas/accalarm --bucket accdetect -t <YOUR_API_TOKEN>

4.2 传输备份文件

scp -r /mnt/nas/influxdbbackup20260515/* root@<TARGET_SERVER_IP>:/mnt/nas/influxdbbackup20260515/
scp -r /mnt/nas/accalarm/* root@<TARGET_SERVER_IP>:/mnt/nas/accalarm/

4.3 数据库恢复(B服务器)

# 恢复 objiot 数据库
influx restore --bucket objiot --new-bucket objiot --org ruiyun /mnt/nas/influxdbbackup20260515 -t <YOUR_API_TOKEN>

# 恢复 accdetect 数据库
influx restore --bucket accdetect --new-bucket accdetect --org ruiyun /mnt/nas/accalarm/ -t <YOUR_API_TOKEN>

4.4 增量数据同步

# 配置端口转发(将远程端口映射到本地)
ssh -L 8087:127.0.0.1:8086 root@172.16.231.218
ssh -L 8086:127.0.0.1:8086 root@172.16.231.219

# 运行同步工具
dotnet run --project InfluxdbTool.csproj

五、服务健康检测

# 检测 A 服务器
curl -v http://172.16.231.218:8086/ping

# 检测 B 服务器
curl -v http://172.16.231.219:8086/ping

正常响应

HTTP/1.1 204 No Content
Content-Type: application/json

六、注意事项

6.1 数据一致性

  • 停机窗口:在增量同步完成前,避免写入新数据到源数据库
  • 时间精度:确保源端和目标端的系统时间一致(建议使用 NTP 同步)
  • 重复数据:增量同步可能产生重复数据,建议在应用层做去重处理

6.2 网络安全

  • Token 管理:API Token 具有管理员权限,需妥善保管
  • 传输加密:建议使用 HTTPS 或 SSH 隧道进行数据传输
  • 访问控制:限制 InfluxDB 服务只允许内网访问

6.3 性能优化

  • 批量操作:使用批量写入 API 提高写入效率
  • 时间段分片:对于超大数据量,建议按时间段分片同步
  • 索引优化:在目标库创建必要的标签索引

七、实际应用效果

7.1 迁移结果

指标 数值
迁移数据库数量 2 个
同步数据时间段 ~69 小时
DeviceState 记录数 ~50,000 条
HGRealtimeData 记录数 ~120,000 条
同步成功率 100%

7.2 日志示例

[INF] 开始获取InfluxDB DeviceState keys...
[INF] 获取DeviceState keys完成,共获取到 156 个key
[INF] 开始查询DeviceState,key=device_001
[INF] 查询完成,key=device_001,共获取到 328 条数据
[INF] 开始批量插入DeviceState数据,数量=328
[INF] 批量插入DeviceState数据完成
...
[INF] key总数: 156

附录:配置参数汇总

参数
A 服务器地址 <SOURCE_SERVER_IP>
B 服务器地址 <TARGET_SERVER_IP>
组织名称 orgname
API Token <YOUR_API_TOKEN>
数据库名称 objiot, accdetect

总结:本文详细介绍了 InfluxDB 数据库迁移的完整流程,重点讲解了基于 C# 实现的时间段增量数据同步方案。通过合理的备份策略和增量同步机制,可以在保证数据一致性的前提下,实现零停机或最小停机时间的数据库迁移。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐