摘要

本文详细介绍DolphinDB与OPC-UA协议的集成方法,实现工业设备的标准化数据采集。从OPC-UA协议基础到DolphinDB插件使用,从节点浏览到数据订阅,逐步带领读者构建完整的工业数据采集系统。同时提供智能制造场景的实战案例和最佳实践,帮助读者实现工业设备与数据平台的无缝对接。本文适合从事工业物联网和智能制造的工程师阅读。


一、OPC-UA协议概述

1.1 什么是OPC-UA

OPC-UA(Open Platform Communications Unified Architecture)是工业自动化领域的标准通信协议:

OPC-UA架构

PLC控制器

OPC-UA Server

传感器

数控机床

机器人

OPC-UA Client
DolphinDB

数据存储

实时分析

监控告警

1.2 OPC-UA核心概念

概念 说明 示例
Server 数据提供方 PLC、SCADA系统
Client 数据消费方 DolphinDB、MES系统
Node 数据节点 温度传感器节点
Namespace 命名空间 设备命名空间
Subscription 订阅 数据变化订阅

1.3 OPC-UA优势

优势 说明
平台独立 跨操作系统、跨语言
安全可靠 内置安全机制
信息模型 标准化数据模型
互操作性 工业标准协议

二、DolphinDB OPC-UA插件

2.1 插件安装

# 下载OPC-UA插件
cd /opt/dolphindb/server/plugins
git clone https://github.com/dolphindb/DolphinDBPlugin.git

# 编译插件(需要open62541库)
cd DolphinDBPlugin/opcua
mkdir build && cd build
cmake ..
make

# 插件文件
# libPluginOPCUA.so

2.2 加载插件

// 加载OPC-UA插件
loadPlugin("/opt/dolphindb/server/plugins/opcua/libPluginOPCUA.so")

// 查看插件函数
getFunctionList()

2.3 插件函数

函数 说明
opcua::connect 连接OPC-UA服务器
opcua::browse 浏览节点
opcua::read 读取节点值
opcua::write 写入节点值
opcua::subscribe 订阅数据变化
opcua::disconnect 断开连接

三、连接OPC-UA服务器

3.1 基本连接

// 加载插件
loadPlugin("/opt/dolphindb/server/plugins/opcua/libPluginOPCUA.so")

// 连接OPC-UA服务器
conn = opcua::connect("opc.tcp://localhost:4840")

// 查看连接状态
print("OPC-UA连接成功")

3.2 带认证连接

// 用户名密码认证
conn = opcua::connect(
    "opc.tcp://opc-server:4840",
    "username",
    "password"
)

// 证书认证
conn = opcua::connect(
    "opc.tcp://opc-server:4840",
    "/certs/client.crt",
    "/certs/client.key"
)

3.3 安全模式

// 安全模式设置
conn = opcua::connect(
    "opc.tcp://opc-server:4840",
    "username",
    "password",
    "SignAndEncrypt"  // 安全模式: None, Sign, SignAndEncrypt
)

四、浏览节点

4.1 浏览根节点

// 浏览根节点
nodes = opcua::browse(conn, "i=84")  // RootFolder

// 查看节点列表
nodes

4.2 浏览对象节点

// 浏览Objects文件夹
objects = opcua::browse(conn, "i=85")  // ObjectsFolder

// 浏览设备节点
devices = opcua::browse(conn, "ns=2;s=Devices")

// 浏览传感器节点
sensors = opcua::browse(conn, "ns=2;s=Devices.Sensors")

4.3 节点标识符

标识符类型 格式 示例
整数标识 i=xxx i=2258
字符串标识 ns=x;s=xxx ns=2;s=Temperature
GUID标识 g=xxx g=09087e75-8e5e-499b-954f...

4.4 节点属性

// 读取节点属性
attr = opcua::readAttribute(conn, "ns=2;s=Temperature", "Value")

// 可读取的属性
// Value - 节点值
// DataType - 数据类型
// Description - 描述
// DisplayName - 显示名称
// NodeClass - 节点类

五、读取数据

5.1 读取单个节点

// 读取温度值
temp = opcua::read(conn, "ns=2;s=Temperature")
print("温度: " + string(temp))

// 读取设备状态
status = opcua::read(conn, "ns=2;s=DeviceStatus")
print("状态: " + string(status))

5.2 批量读取

// 批量读取多个节点
nodes = [
    "ns=2;s=Temperature",
    "ns=2;s=Humidity",
    "ns=2;s=Pressure",
    "ns=2;s=Vibration"
]

values = opcua::read(conn, nodes)
values

5.3 历史数据读取

// 读取历史数据(如果服务器支持)
startTime = now() - 3600000  // 1小时前
endTime = now()

history = opcua::readHistory(
    conn,
    "ns=2;s=Temperature",
    startTime,
    endTime
)
history

六、写入数据

6.1 写入单个节点

// 写入设定值
opcua::write(conn, "ns=2;s=SetPoint", 25.5)

// 写入设备参数
opcua::write(conn, "ns=2;s=DeviceConfig", `{"mode":"auto","interval":1000}`)

6.2 批量写入

// 批量写入
nodes = ["ns=2;s=SetPoint1", "ns=2;s=SetPoint2", "ns=2;s=SetPoint3"]
values = [25.0, 50.0, 100.0]

opcua::write(conn, nodes, values)

七、订阅数据变化

7.1 创建订阅

// 创建流表接收数据
share streamTable(1:0, `node_id`timestamp`value, 
                  [STRING, TIMESTAMP, DOUBLE]) as opcua_stream

// 订阅节点变化
opcua::subscribe(
    conn,
    ["ns=2;s=Temperature", "ns=2;s=Humidity", "ns=2;s=Pressure"],
    opcua_stream,
    1000  // 采样间隔(ms)
)

// 查看订阅数据
select count(*) from opcua_stream

7.2 订阅回调处理

// 自定义回调处理
def processOpcuaData(msg) {
    // 解析数据
    nodeId = msg.nodeId
    value = msg.value
    timestamp = msg.sourceTimestamp
    
    // 数据处理逻辑
    if (nodeId == "ns=2;s=Temperature" and value > 35) {
        print("温度告警: " + string(value))
    }
    
    return table(nodeId as node_id, timestamp, value)
}

// 创建带回调的订阅
share streamTable(1:0, `node_id`timestamp`value, 
                  [STRING, TIMESTAMP, DOUBLE]) as processed_stream

opcua::subscribe(conn, "ns=2;s=Temperature", processed_stream, 1000, processOpcuaData)

7.3 订阅管理

// 查看订阅状态
opcua::getSubscriptionStatus(conn)

// 取消订阅
opcua::unsubscribe(conn, subscriptionId)

// 修改订阅参数
opcua::modifySubscription(conn, subscriptionId, 500)  // 修改采样间隔

八、工业物联网实战案例

8.1 智能工厂数据采集

// ========== 1. 创建数据表 ==========

// 设备实时数据流表
share streamTable(1:0,
    `device_id`node_id`timestamp`value`quality,
    [STRING, STRING, TIMESTAMP, DOUBLE, INT]
) as device_stream

// 设备状态表
share streamTable(1:0,
    `device_id`timestamp`status`temperature`speed`power,
    [STRING, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE]
) as status_stream

// ========== 2. 连接OPC-UA服务器 ==========

loadPlugin("/opt/dolphindb/server/plugins/opcua/libPluginOPCUA.so")

// 连接多个PLC
plc1 = opcua::connect("opc.tcp://plc1:4840", "admin", "password")
plc2 = opcua::connect("opc.tcp://plc2:4840", "admin", "password")
plc3 = opcua::connect("opc.tcp://plc3:4840", "admin", "password")

// ========== 3. 定义节点映射 ==========

// 设备节点配置
deviceNodes = table([
    "ns=2;s=Device1.Temperature" as node_id,
    "ns=2;s=Device1.Speed" as node_id,
    "ns=2;s=Device1.Power" as node_id,
    "ns=2;s=Device1.Status" as node_id
])

// ========== 4. 订阅数据 ==========

// 订阅PLC1数据
opcua::subscribe(plc1, deviceNodes.node_id, device_stream, 1000)

// 订阅PLC2数据
opcua::subscribe(plc2, deviceNodes.node_id, device_stream, 1000)

// 订阅PLC3数据
opcua::subscribe(plc3, deviceNodes.node_id, device_stream, 1000)

print("OPC-UA数据采集已启动")

8.2 实时数据处理

// ========== 5. 创建处理引擎 ==========

// 数据转换引擎
def transformDeviceData(msg) {
    // 解析节点ID
    parts = split(msg.node_id, ".")
    deviceId = parts[0]
    metric = parts[1]
    
    return table(
        deviceId as device_id,
        msg.node_id as node_id,
        msg.timestamp as timestamp,
        msg.value as value,
        msg.quality as quality
    )
}

// 创建时间序列引擎
status_engine = createTimeSeriesEngine(
    "status_engine",
    60000,  // 1分钟
    60000,
    <[
        avg(value) as avg_value,
        max(value) as max_value,
        min(value) as min_value,
        count(*) as sample_count
    ]>,
    device_stream,
    `device_id,
    `timestamp
)

// 订阅处理
subscribeTable(, "device_stream", "transform", -1, transformDeviceData, true)

8.3 数据持久化

// ========== 6. 持久化到分布式表 ==========

// 创建分布式数据库
db = database("dfs://factory_data", COMPO,
    [RANGE, 2024.01.01..2024.12.31, VALUE, `Device1`Device2`Device3])

// 创建分布式表
schema = table(1:0,
    `device_id`node_id`timestamp`value`quality,
    [STRING, STRING, TIMESTAMP, DOUBLE, INT])
db.createPartitionedTable(schema, `device_data, `timestamp`device_id)

// 订阅持久化
def persistData(msg) {
    loadTable("dfs://factory_data", "device_data").append!(msg)
}

subscribeTable(, "device_stream", "persist", -1, persistData, true, 10000, true)

九、OPC-UA服务器配置

9.1 Prosys OPC-UA Simulator

# 下载并安装Prosys OPC-UA Simulator
# https://www.prosysopc.com/products/opc-ua-simulation-server/

# 配置模拟节点
# 1. 启动Prosys OPC-UA Simulator
# 2. 添加模拟节点(温度、湿度、压力等)
# 3. 设置数据变化规则
# 4. 启动服务器(默认端口4840)

9.2 KEPServerEX配置

# 安装KEPServerEX
# 配置设备通道
# 1. 创建通道(Channel)
# 2. 添加设备(Device)
# 3. 配置标签(Tag)
# 4. 启用OPC-UA服务器接口

9.3 开源OPC-UA服务器

// 使用Python创建简单OPC-UA服务器
// pip install opcua

from opcua import Server

server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840")

# 注册命名空间
idx = server.register_namespace("http://example.org")

# 创建对象
myobj = server.nodes.objects.add_object(idx, "MyDevice")

# 创建变量
temp = myobj.add_variable(idx, "Temperature", 25.0)
temp.set_writable()

# 启动服务器
server.start()

十、性能优化

10.1 批量操作

// 批量读取优化
def batchRead(conn, nodes, batchSize=100) {
    results = table(1:0, `node_id`value, [STRING, DOUBLE])
    
    for (i in 0..(size(nodes)/batchSize)) {
        batch = nodes[(i*batchSize) : min((i+1)*batchSize, size(nodes))]
        values = opcua::read(conn, batch)
        results.append!(values)
    }
    
    return results
}

10.2 订阅优化

高频数据

中频数据

低频数据

OPC-UA Server

订阅策略

采样间隔100ms

采样间隔1000ms

采样间隔10000ms

实时流处理

分钟级聚合

小时级统计

10.3 连接池

// 连接池管理
def createConnectionPool(servers, poolSize=5) {
    pool = dict(STRING, ANY)
    
    for (server in servers) {
        connections = array(ANY, poolSize)
        for (i in 0..poolSize) {
            connections[i] = opcua::connect(server.url, server.user, server.pwd)
        }
        pool[server.name] = connections
    }
    
    return pool
}

// 使用连接池
servers = [
    table("plc1" as name, "opc.tcp://plc1:4840" as url, "admin" as user, "pwd" as pwd),
    table("plc2" as name, "opc.tcp://plc2:4840" as url, "admin" as user, "pwd" as pwd)
]

pool = createConnectionPool(servers)

十一、监控与运维

11.1 连接监控

// 检查连接状态
def checkConnection(conn) {
    try {
        opcua::read(conn, "i=2258")  // 读取服务器状态
        return true
    } catch(ex) {
        return false
    }
}

// 定时检查
scheduleJob("checkOpcua", "检查OPC-UA连接", 
    def() { 
        if (not checkConnection(plc1)) {
            print("PLC1连接断开")
        }
    },
    00:00, 2024.01.01, 2030.12.31, 'M')  // 每分钟检查

11.2 数据质量监控

// 监控数据质量
def monitorDataQuality(stream) {
    select node_id,
           count(*) as total,
           sum(iif(quality == 0, 1, 0)) as good_count,
           sum(iif(quality != 0, 1, 0)) as bad_count
    from stream
    where timestamp > now() - 3600000
    group by node_id
}

十二、最佳实践

12.1 节点命名规范

规范 格式 示例
设备节点 ns=2;s={Device}.{Metric} ns=2;s=Device1.Temperature
区域节点 ns=2;s={Area}.{Device}.{Metric} ns=2;s=WorkshopA.Device1.Temperature
功能节点 ns=2;s={Function}.{SubFunction} ns=2;s=Control.SetPoint

12.2 错误处理

// 健壮的数据采集
def robustRead(conn, nodeId, retries=3) {
    for (i in 0..retries) {
        try {
            return opcua::read(conn, nodeId)
        } catch(ex) {
            if (i == retries - 1) {
                print("读取失败: " + nodeId + ", 错误: " + ex)
                return NULL
            }
            sleep(1000)  // 等待1秒重试
        }
    }
    return NULL
}

12.3 安全建议

建议 说明
启用加密 使用SignAndEncrypt模式
证书管理 定期更新证书
访问控制 最小权限原则
审计日志 记录所有操作

十三、总结

本文详细介绍了DolphinDB与OPC-UA协议的集成方法。核心要点如下:

  1. OPC-UA协议:工业自动化标准通信协议
  2. 插件使用:连接、浏览、读写、订阅的完整流程
  3. 数据采集:实时数据订阅和历史数据读取
  4. 工业应用:智能工厂数据采集系统实战
  5. 性能优化:批量操作、订阅优化、连接池
  6. 运维监控:连接监控、数据质量监控

思考题

  1. 如何设计OPC-UA节点结构以支持灵活的数据访问?
  2. 如何保证OPC-UA数据采集的可靠性?
  3. 在大规模设备接入场景下如何优化性能?

参考资料

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐