Fast-DDS Transport 层架构详解

源码分析文档 | 版本 v1.0 | 最后更新:2026-05-16


📋 目录


第一部分:基础概念

1.1 什么是 DDS?

DDS (Data Distribution Service) 是由 OMG (Object Management Group) 制定的数据分发服务标准,专为实时系统设计的发布-订阅通信中间件。

核心特点

发布-订阅模式:解耦生产者和消费者
去中心化架构:无单点故障
QoS 策略丰富:支持可靠性、持久性、时序等 23+ 种服务质量策略
语言无关:支持 C++、Java、Python、C# 等多种语言
跨平台:Windows、Linux、macOS、RTOS 等

典型应用场景
  • 🤖 机器人系统:ROS 2 的默认中间件
  • 🚗 自动驾驶:车辆传感器数据分发
  • 🏭 工业自动化:PLC 与控制系统的实时通信
  • 🛰️ 航空航天:卫星地面站数据链路
  • 🏥 医疗设备:监护仪数据实时传输
DDS 核心概念模型
┌─────────────┐         Topic          ┌─────────────┐
│  Publisher  │ ─────────────────────► │  Subscriber │
│             │    (Named Data Stream) │             │
│  DataWriter │                        │ DataReader  │
└──────┬──────┘                        └──────┬──────┘
       │                                      │
       │         DomainParticipant            │
       └──────────────────────────────────────┘
                  (Communication Context)
  • DomainParticipant:通信上下文,类似"聊天室"
  • Topic:命名数据流,类似"话题频道"
  • Publisher/DataWriter:数据生产者
  • Subscriber/DataReader:数据消费者

1.2 什么是 RTPS?

RTPS (Real-Time Publish-Subscribe) 是 DDS 标准的有线协议,定义了数据如何在网络上传输。

RTPS vs DDS 的关系
┌─────────────────────────────────────┐
│         DDS (API 标准)               │  ← 应用程序使用
│  - 定义编程接口                       │
│  - 定义 QoS 策略                     │
└──────────────┬──────────────────────┘
               │ 实现
┌──────────────▼──────────────────────┐
│        RTPS (传输协议)               │  ← 网络传输层
│  - 定义消息格式                      │
│  - 定义发现机制                      │
│  - 定义可靠性机制                    │
└──────────────┬──────────────────────┘
               │ 承载
┌──────────────▼──────────────────────┐
│      UDP / TCP / SHM (物理传输)      │  ← 操作系统网络栈
└─────────────────────────────────────┘
RTPS 核心特性

🔹 基于 UDP:默认使用 UDP 进行高效传输
🔹 发现机制:自动发现网络中的参与者(PDP)和端点(EDP)
🔹 可靠性:通过 HEARTBEAT/ACKNACK 机制实现可靠传输
🔹 分片支持:大数据自动分片传输(DATA_FRAG)
🔹 历史缓存:支持 late-joiner 获取历史数据

RTPS 消息类型
消息类型 用途 可靠性
DATA 传输实际数据 可选可靠
DATA_FRAG 分片数据传输 可选可靠
HEARTBEAT 心跳(_writer 通知 _reader) 不可靠
ACKNACK 确认/否定确认(_reader 响应) 不可靠
GAP 通知数据已删除 不可靠
INFO_SRC/DST 源/目的信息 不可靠

1.3 Fast-DDS 简介

Fast-DDS(原名 Fast-RTPS)是由 eProsima 公司开发的高性能 C++ DDS 实现。

项目背景
  • 🏢 开发商:eProsima(西班牙中间件公司)
  • 📅 起始时间:2010 年
  • 📜 许可证:Apache 2.0(开源)
  • 🌟 GitHub Stars:4,000+
  • 🎯 质量等级:ROS 2 Quality Level 1(最高级)
为什么选择 Fast-DDS?

高性能:优化的内存管理和零拷贝技术
ROS 2 默认中间件:所有 ROS 2 LTS 版本的官方推荐
完整标准支持:DDS 1.4、RTPS 2.5、DDS-Security 1.1
多传输支持:UDP、TCP、共享内存、自定义传输
活跃社区:持续更新,商业支持可用

版本演进
版本 发布时间 重要特性
v2.x 2020 稳定版,ROS 2 Foxy/Galactic
v3.0 2023 重命名为 Fast-DDS,API 改进
v3.4 2024 以太网传输、IP 移动性支持
v3.5 2025 当前最新版本

1.4 核心术语表

在深入 Transport 层之前,需要理解以下核心概念:

网络相关术语
术语 英文 说明
Locator Locator 网络地址抽象,包含 {kind, port, address}
通道 Channel 逻辑通信路径(如 UDP 的一个端口)
端点 Endpoint Writer 或 Reader 的统称
GUID Globally Unique Identifier 全局唯一标识符,标识 Participant 或 Endpoint
GuidPrefix GuidPrefix GUID 的前 12 字节,标识 Participant
传输相关术语
术语 英文 说明
Transport Transport 传输层实现(UDP/TCP/SHM)
SenderResource SenderResource 发送资源的 RAII 封装
ReceiverResource ReceiverResource 接收资源的 RAII 封装
NetworkFactory NetworkFactory 管理多个 Transport 实例的工厂
MessageReceiver MessageReceiver RTPS 消息接收器,解析 RTPS 协议
Locator 结构详解
struct Locator_t {
    int32_t kind;      // 传输类型:UDPv4=0, UDPv6=1, TCPv4=2, ...
    uint32_t port;     // 端口号
    octet address[16]; // IP 地址(IPv4 用前 4 字节,IPv6 用全部)
};

示例

Locator(kind=UDPv4, port=7412, address=192.168.1.100)

第二部分:Transport 层架构

2.1 整体架构层次

Fast-DDS 采用分层架构设计,从上到下共 6 层:

┌─────────────────────────────────────────────────┐
│          DDS API Layer (用户层)                    │
│  DomainParticipant / Publisher / Subscriber      │
│  DataWriter / DataReader                         │
│  ✓ 应用程序直接使用的 API                         │
└──────────────┬──────────────────────────────────┘
               │ 调用
┌──────────────▼──────────────────────────────────┐
│         RTPS Layer (协议层)                       │
│  RTPSParticipantImpl                             │
│  RTPSWriter / RTPSReader                         │
│  MessageReceiver (消息接收器)                     │
│  ✓ 实现 RTPS 协议逻辑                             │
│  ✓ 处理发现、可靠性、历史缓存                     │
└──────────────┬──────────────────────────────────┘
               │ 使用
┌──────────────▼──────────────────────────────────┐
│     NetworkFactory (网络工厂层)                   │
│  - 管理多个 TransportInterface 实例              │
│  - 创建 SenderResource / ReceiverResource        │
│  - Locator 转换与选择                            │
│  ✓ 传输层的统一入口                               │
└──────────────┬──────────────────────────────────┘
               │ 抽象
┌──────────────▼──────────────────────────────────┐
│    Transport Interface Layer (传输抽象层)         │
│  TransportInterface (抽象基类)                   │
│  ├─ UDPTransportInterface                        │
│  │   ├─ UDPv4Transport                           │
│  │   └─ UDPv6Transport                           │
│  ├─ TCPTransportInterface                        │
│  │   ├─ TCPv4Transport                           │
│  │   └─ TCPv6Transport                           │
│  └─ SharedMemoryTransport                        │
│  ✓ 定义传输层的统一接口                           │
│  ✓ 支持插件式扩展新传输类型                       │
└──────────────┬──────────────────────────────────┘
               │ 封装
┌──────────────▼──────────────────────────────────┐
│    Channel Resource Layer (通道资源层)            │
│  UDPChannelResource / TCPChannelResource         │
│  - 封装底层 Socket                               │
│  - 管理接收线程                                   │
│  ✓ 每个监听端口一个独立线程                       │
└──────────────┬──────────────────────────────────┘
               │ 调用
┌──────────────▼──────────────────────────────────┐
│    OS Network Layer (操作系统网络层)              │
│  ASIO Library (Boost.Asio)                      │
│  UDP Socket / TCP Socket / Shared Memory         │
│  ✓ 跨平台网络 I/O 抽象                           │
└─────────────────────────────────────────────────┘
各层职责说明
层级 主要职责 关键类
DDS API 提供用户友好的发布-订阅接口 DomainParticipant, DataWriter
RTPS 实现 RTPS 协议逻辑 RTPSWriter, MessageReceiver
NetworkFactory 管理传输实例,创建资源 NetworkFactory
Transport Interface 定义传输抽象接口 TransportInterface
Channel Resource 封装 Socket 和线程 UDPChannelResource
OS Network 操作系统网络调用 Boost.Asio

2.2 核心抽象接口

2.2.1 TransportInterface(传输接口)

TransportInterface 是传输层的核心抽象基类,所有具体传输(UDP/TCP/SHM)都必须继承并实现此接口。

文件位置include/fastdds/rtps/transport/TransportInterface.hpp

class TransportInterface {
public:
    virtual ~TransportInterface() = default;

    // ===== 生命周期管理 =====
    
    /**
     * 初始化传输层
     * @param properties 可选的属性策略
     * @param max_msg_size_no_frag 可选的最大消息大小(避免分片)
     * @return 初始化是否成功
     */
    virtual bool init(
        const PropertyPolicy* properties = nullptr,
        const uint32_t& max_msg_size_no_frag = 0) = 0;

    // ===== 发送相关接口 =====
    
    /**
     * 打开发送通道
     * @param sender_resource_list 输出:创建的发送资源列表
     * @param locator 目标定位器
     * @return 是否成功打开
     */
    virtual bool OpenOutputChannel(
        SendResourceList& sender_resource_list,
        const Locator&) = 0;

    /**
     * 批量打开发送通道(用于多个 locator)
     */
    virtual bool OpenOutputChannels(
        SendResourceList& sender_resource_list,
        const LocatorSelectorEntry& locator_selector_entry);

    // ===== 接收相关接口 =====
    
    /**
     * 打开接收通道
     * @param locator 本地监听定位器
     * @param receiver 接收回调接口(上层传入)
     * @param max_message_size 最大消息大小
     * @return 是否成功打开
     */
    virtual bool OpenInputChannel(
        const Locator&,
        TransportReceiverInterface*,
        uint32_t) = 0;

    /**
     * 关闭接收通道
     */
    virtual bool CloseInputChannel(const Locator&) = 0;

    // ===== Locator 管理接口 =====
    
    /**
     * 检查 locator 是否被此传输支持
     */
    virtual bool IsLocatorSupported(const Locator&) const = 0;

    /**
     * 检查 locator 是否允许使用
     */
    virtual bool is_locator_allowed(const Locator&) const = 0;

    /**
     * 检查 locator 是否可达
     */
    virtual bool is_locator_reachable(const Locator_t& locator) = 0;

    /**
     * 将远程 locator 转换为本地优化的 locator
     * (例如:检测到是本地通信时,转换为 localhost)
     */
    virtual Locator RemoteToMainLocal(const Locator& remote) const = 0;

    /**
     * Locator 选择算法
     * (在多 locator 场景下选择最优路径)
     */
    virtual void select_locators(LocatorSelector& selector) const = 0;

    // ===== 配置接口 =====
    
    /**
     * 获取传输配置描述符
     */
    virtual TransportDescriptorInterface* get_configuration() = 0;

    /**
     * 添加默认输出 locator
     */
    virtual void AddDefaultOutputLocator(LocatorList& defaultList) = 0;
};

关键设计思想

  1. Locator 抽象:用 Locator 统一表示通信端点,屏蔽底层 IP/端口细节
  2. 通道概念:一个"通道"对应一个逻辑通信路径(如 UDP 的一个端口)
  3. 资源管理:通过 SenderResourceReceiverResource 实现 RAII 管理
  4. 插件式扩展:新增传输类型只需继承 TransportInterface 并实现虚函数

2.2.2 TransportReceiverInterface(接收者接口)

TransportReceiverInterface 定义了数据传输到上层的回调接口

文件位置include/fastdds/rtps/transport/TransportReceiverInterface.hpp

class TransportReceiverInterface {
public:
    virtual ~TransportReceiverInterface() = default;

    /**
     * 当接收到数据时,传输层调用此方法
     * 
     * @param data 原始数据指针
     * @param size 数据大小(字节)
     * @param local_locator 本地接收地址(哪个 socket 收到的)
     * @param remote_locator 远程发送地址(谁发的)
     */
    virtual void OnDataReceived(
        const octet* data,
        const uint32_t size,
        const Locator& local_locator,
        const Locator& remote_locator) = 0;
};

这是传输层向上层传递数据的关键接口!

实际实现MessageReceiver 类实现了此接口,负责解析 RTPS 消息。


2.2.3 SenderResource(发送资源)

SenderResource 封装了发送操作的函数对象,采用 RAII 模式管理发送通道。

文件位置include/fastdds/rtps/transport/SenderResource.hpp

class SenderResource {
public:
    /**
     * 发送数据到目标 locator
     * 
     * @param buffers 数据缓冲区列表(支持 scatter-gather I/O)
     * @param total_bytes 总字节数
     * @param destination_locators_begin 目标 locator 迭代器起始
     * @param destination_locators_end 目标 locator 迭代器结束
     * @param max_blocking_time_point 最大阻塞时间点
     * @param transport_priority 传输优先级(用于 QoS)
     * @return 是否发送成功
     */
    bool send(
        const std::vector<NetworkBuffer>& buffers,
        const uint32_t& total_bytes,
        LocatorsIterator* destination_locators_begin,
        LocatorsIterator* destination_locators_end,
        const std::chrono::steady_clock::time_point& max_blocking_time_point,
        int32_t transport_priority);

    /**
     * 获取传输类型
     */
    int32_t kind() const;

protected:
    // 传输类型(UDPv4/UDPv6/TCP 等)
    int32_t transport_kind_;

    // 发送数据的 lambda 函数(由具体传输层注入)
    std::function<bool(
        const std::vector<NetworkBuffer>&,
        uint32_t,
        LocatorsIterator*,
        LocatorsIterator*,
        const std::chrono::steady_clock::time_point&,
        int32_t)> send_lambda_;
};

设计亮点

  • 使用 std::function 实现策略模式,不同传输注入不同的发送逻辑
  • 支持 scatter-gather I/O(多个 buffer 一次性发送)
  • 支持超时控制(max_blocking_time_point

第三部分:数据通路详解

3.1 接收数据通路(从网络到应用)

网络数据包到达
    ↓
┌─────────────────────────────────┐
│ 1. OS Kernel (UDP/TCP Socket)   │
│    - 网卡驱动接收数据包          │
│    - 内核协议栈处理              │
│    - 放入 Socket 接收缓冲区      │
└──────────────┬──────────────────┘
               ↓ wake up
┌─────────────────────────────────┐
│ 2. ASIO Library                 │
│    socket.receive_from()        │
│    - 从内核缓冲区拷贝数据        │
│    - 返回发送方 endpoint         │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 3. UDPChannelResource           │
│    perform_listen_operation()   │
│    - 独立接收线程(每端口一个)   │
│    - 阻塞式接收循环              │
│    - 调用 Receive() 获取数据     │
│    - 填充 message_buffer()       │
└──────────────┬──────────────────┘
               ↓ 回调
┌─────────────────────────────────┐
│ 4. TransportReceiverInterface   │
│    OnDataReceived()             │
│    - 实际实现:MessageReceiver  │
│    - 传入原始字节流              │
│    - 传入 local/remote locator   │
└──────────────┬──────────────────┘
               ↓ 解析
┌─────────────────────────────────┐
│ 5. MessageReceiver              │
│    processCDRMsg()              │
│    - 解析 RTPS 头部              │
│      • Protocol Version         │
│      • Vendor ID                │
│      • GuidPrefix               │
│    - 循环解析子消息 (Submessage) │
│      • DATA / DATA_FRAG         │
│      • HEARTBEAT / ACKNACK      │
│      • GAP / INFO_SRC/DST       │
│    - 路由到对应的 Reader/Writer  │
└──────────────┬──────────────────┘
               ↓ 处理
┌─────────────────────────────────┐
│ 6. RTPSReader/RTPSWriter        │
│    - 处理 DATA 消息              │
│      • 验证序列号                │
│      • 添加到 History Cache      │
│      • 触发 ACKNACK 响应         │
│    - 处理 HEARTBEAT 消息         │
│      • 检测缺失数据              │
│      • 发送 ACKNACK              │
│    - 处理 ACKNACK 消息           │
│      • 标记已确认的数据          │
│      • 触发重传(如有必要)       │
└──────────────┬──────────────────┘
               ↓ 交付
┌─────────────────────────────────┐
│ 7. DDS DataReader               │
│    take()/read()                │
│    - 从 History Cache 读取       │
│    - 反序列化 CDR 数据           │
│    - 转换为用户定义类型           │
│    - 交付给用户应用              │
│    - 调用用户注册的 Listener     │
└─────────────────────────────────┘
关键代码位置

步骤 3 - UDPChannelResource 接收线程
文件:src/cpp/rtps/transport/UDPChannelResource.cpp

void UDPChannelResource::perform_listen_operation(Locator input_locator)
{
    Locator remote_locator;
    
    // 接收线程主循环
    while (alive()) {
        // 获取消息缓冲区
        auto& msg = message_buffer();
        
        // 阻塞接收(直到有数据或 socket 关闭)
        if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) {
            continue;  // 接收失败,继续下一次循环
        }
        
        // 通过回调传递给上层(MessageReceiver)
        if (message_receiver() != nullptr) {
            message_receiver()->OnDataReceived(
                msg.buffer,      // 原始数据
                msg.length,      // 数据长度
                input_locator,   // 本地接收地址
                remote_locator   // 远程发送地址
            );
        }
        else if (alive()) {
            EPROSIMA_LOG_WARNING(RTPS_MSG_IN, 
                "Received Message, but no receiver attached");
        }
    }
    
    // 线程退出时清理
    message_receiver(nullptr);
}

关键点

  • 每个接收通道(端口)有一个独立的接收线程
  • 阻塞式接收,效率高(无需轮询)
  • 通过 message_receiver() 回调将数据传递给上层

步骤 5 - MessageReceiver 处理消息
文件:src/cpp/rtps/messages/MessageReceiver.cpp

void MessageReceiver::processCDRMsg(
    const Locator_t& source_locator,
    const Locator_t& reception_locator,
    CDRMessage_t* msg)
{
    // 1. 重置状态(为处理新消息做准备)
    reset();
    
    // 2. 解析 RTPS 头部(16 字节)
    if (!checkRTPSHeader(msg)) {
        EPROSIMA_LOG_WARNING(RTPS_MSG_IN, "Invalid RTPS header");
        return;
    }
    
    // 3. 循环解析子消息
    while (msg->pos < msg->length) {
        SubmessageHeader_t smh;
        
        // 读取子消息头部(8 字节)
        if (!readSubmessageHeader(msg, &smh)) {
            break;
        }
        
        // 根据子消息类型分发处理
        switch(smh.submessageId) {
            case DATA:
                proc_Submsg_Data(msg, &smh, writerID, was_decoded);
                break;
                
            case DATA_FRAG:
                proc_Submsg_DataFrag(msg, &smh, was_decoded);
                break;
                
            case HEARTBEAT:
                proc_Submsg_Heartbeat(msg, &smh, was_decoded);
                break;
                
            case ACKNACK:
                proc_Submsg_Acknack(msg, &smh, was_decoded);
                break;
                
            case GAP:
                proc_Submsg_Gap(msg, &smh, was_decoded);
                break;
                
            case INFO_TIMESTAMP:
                proc_Submsg_InfoTS(msg, &smh);
                break;
                
            case INFO_SOURCE:
                proc_Submsg_InfoSRC(msg, &smh);
                break;
                
            case INFO_DESTINATION:
                proc_Submsg_InfoDST(msg, &smh);
                break;
                
            default:
                EPROSIMA_LOG_WARNING(RTPS_MSG_IN, 
                    "Unknown submessage type: " << smh.submessageId);
                break;
        }
    }
}

关键点

  • RTPS 消息由头部 + 多个子消息组成
  • 每个子消息有独立的类型和处理逻辑
  • 处理函数会路由到对应的 Reader/Writer

3.2 发送数据通路(从应用到网络)

用户调用 write()
    ↓
┌─────────────────────────────────┐
│ 1. DDS DataWriter               │
│    write() / dispose()          │
│    - 验证 QoS 策略              │
│    - 序列化用户数据为 CDR 格式   │
│    - 创建 SampleInfo             │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 2. RTPSWriter                   │
│    - 创建 CacheChange            │
│      • 分配序列号                │
│      • 填充 GUID                 │
│      • 存储序列化数据            │
│    - 添加到 History Cache        │
│    - 根据可靠性策略决定发送方式   │
│      • RELIABLE: 加入重传队列    │
│      • BEST_EFFORT: 立即发送     │
│    - 触发发送流程                │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 3. RTPSMessageGroup             │
│    - 组装 RTPS 消息             │
│    - 添加 RTPS 头部              │
│      • Protocol Version (2.5)   │
│      • Vendor ID (eProsima)     │
│      • GuidPrefix               │
│    - 添加子消息                  │
│      • DATA (或 DATA_FRAG)      │
│      • HEARTBEAT (RELIABLE)     │
│    - 计算消息总大小              │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 4. RTPSParticipantImpl          │
│    sendSync()                   │
│    - 锁定发送资源列表            │
│    - 遍历 send_resource_list_    │
│      (可能对应多个网络接口)     │
│    - 调用 SenderResource::send()│
│    - 解锁                        │
│    - 通知统计模块                │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 5. UDPSenderResource            │
│    send_lambda_()               │
│    - 调用 UDPTransportInterface │
│    - 传入 socket 和目标 locator  │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 6. UDPTransportInterface        │
│    send()                       │
│    - 验证数据大小                │
│    - 遍历目标 Locators          │
│    - 生成目标 endpoint           │
│    - 调用 socket.send_to()      │
│    - 处理错误和超时              │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 7. ASIO Library                 │
│    socket.send_to()             │
│    - 将数据拷贝到内核缓冲区      │
│    - 触发系统调用 sendto()       │
└──────────────┬──────────────────┘
               ↓
┌─────────────────────────────────┐
│ 8. OS Kernel                    │
│    - 协议栈处理(UDP/IP)        │
│    - 路由查找                    │
│    - 网卡驱动发送                │
│    - 发送到网络                  │
└─────────────────────────────────┘
关键代码位置

步骤 4 - RTPSParticipantImpl 发送
文件:src/cpp/rtps/participant/RTPSParticipantImpl.hpp

template<class LocatorIteratorT>
bool RTPSParticipantImpl::sendSync(
    const std::vector<NetworkBuffer>& buffers,
    const uint32_t& total_bytes,
    const GUID_t& sender_guid,
    const LocatorIteratorT& destination_locators_begin,
    const LocatorIteratorT& destination_locators_end,
    std::chrono::steady_clock::time_point& max_blocking_time_point,
    int32_t transport_priority)
{
    bool ret_code = false;
    
#if HAVE_STRICT_REALTIME
    // 实时模式下使用带超时的锁
    std::unique_lock<std::timed_mutex> lock(
        m_send_resources_mutex_, std::defer_lock);
    if (lock.try_lock_until(max_blocking_time_point)) {
#else
    // 普通模式直接加锁
    std::unique_lock<std::timed_mutex> lock(m_send_resources_mutex_);
#endif
    {
        ret_code = true;
        
        // 遍历所有发送资源(可能对应不同网络接口)
        for (auto& send_resource : send_resource_list_) {
            LocatorIteratorT locators_begin = destination_locators_begin;
            LocatorIteratorT locators_end = destination_locators_end;
            
            // 调用传输层的发送接口
            send_resource->send(
                buffers, 
                total_bytes, 
                &locators_begin, 
                &locators_end,
                max_blocking_time_point, 
                transport_priority);
        }
        
        lock.unlock();
        
        // 通知统计模块(如果启用)
        on_rtps_send(
            sender_guid,
            destination_locators_begin,
            destination_locators_end,
            total_bytes);
        
        // 检查是否是发现协议数据包
        on_discovery_packet(
            sender_guid,
            destination_locators_begin,
            destination_locators_end);
    }
    
    return ret_code;
}

关键点

  • 支持多个发送资源(多网卡场景)
  • 线程安全(使用 mutex 保护)
  • 集成统计和监控功能

步骤 6 - UDPTransportInterface 发送
文件:src/cpp/rtps/transport/UDPTransportInterface.cpp

bool UDPTransportInterface::send(
    const std::vector<NetworkBuffer>& buffers,
    uint32_t total_bytes,
    eProsimaUDPSocket& socket,
    const Locator& remote_locator,
    bool only_multicast_purpose,
    bool whitelisted,
    const std::chrono::microseconds& timeout)
{
    // 1. 验证数据大小
    if (total_bytes > configuration()->sendBufferSize) {
        EPROSIMA_LOG_ERROR(TRANSPORT_UDP, 
            "Message too large: " << total_bytes);
        return false;
    }
    
    bool success = false;
    bool is_multicast = IPLocator::isMulticast(remote_locator);
    
    // 2. 检查是否应该发送(组播/单播过滤)
    if (is_multicast == only_multicast_purpose || whitelisted) {
        
        // 3. 生成目标 endpoint
        auto destinationEndpoint = generate_endpoint(
            remote_locator, 
            IPLocator::getPhysicalPort(remote_locator));
        
        size_t bytesSent = 0;
        
        try {
            // 4. 设置超时(非 Windows 平台)
#ifndef _WIN32
            struct timeval timeStruct;
            timeStruct.tv_sec = 0;
            timeStruct.tv_usec = timeout.count();
            setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO,
                reinterpret_cast<const char*>(&timeStruct), 
                sizeof(timeStruct));
#endif
            
            // 5. 调用 ASIO 发送
            asio::error_code ec;
            bytesSent = getSocketPtr(socket)->send_to(
                buffers,              // 数据缓冲区(scatter-gather)
                destinationEndpoint,  // 目标地址
                0,                    // 标志位
                ec);                  // 错误码
            
            // 6. 处理错误
            if (!!ec) {
                if ((ec.value() == asio::error::would_block) ||
                    (ec.value() == asio::error::try_again)) {
                    // 非阻塞模式下缓冲区满
                    EPROSIMA_LOG_WARNING(TRANSPORT_UDP, 
                        "UDP send would have blocked. Packet dropped.");
                    return true;  // 不算错误
                }
                
                EPROSIMA_LOG_WARNING(TRANSPORT_UDP, ec.message());
                return false;
            }
            
            // 7. 验证发送字节数
            if (bytesSent != total_bytes) {
                EPROSIMA_LOG_WARNING(TRANSPORT_UDP, 
                    "Send incomplete: " << bytesSent << "/" << total_bytes);
            }
        }
        catch (const std::exception& error) {
            EPROSIMA_LOG_WARNING(TRANSPORT_UDP, error.what());
            return false;
        }
        
        // 8. 记录日志
        EPROSIMA_LOG_INFO(TRANSPORT_UDP,
            "UDPTransport: " << bytesSent << " bytes TO " 
            << destinationEndpoint << " FROM " 
            << getSocketPtr(socket)->local_endpoint());
        
        success = true;
    }
    
    return success;
}

关键点

  • 支持 scatter-gather I/O(多个 buffer 一次发送)
  • 组播/单播过滤逻辑
  • 完善的错误处理和日志记录

第四部分:实现细节

4.1 传输层初始化流程

4.1.1 RTPSParticipantImpl 构造函数

文件:src/cpp/rtps/participant/RTPSParticipantImpl.cpp

RTPSParticipantImpl::RTPSParticipantImpl(
    uint32_t domain_id,
    const RTPSParticipantAttributes& param,
    const GuidPrefix_t& guidP,
    RTPSParticipant* part,
    RTPSParticipantListener* plisten)
    : domain_id_(domain_id)
    , m_att(param)
    , mp_userParticipant(part)
    , mp_participantListener(plisten)
{
    // 1. 设置 GUID
    setup_guids(guidP);
    
    // 2. 注册传输层
    if (!setup_transports()) {
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, 
            "Failed to setup transports");
        return;
    }
    
    // 3. 创建接收资源(监听端口)
    createReceiverResources(...);
    
    // 4. 创建发送资源
    createSenderResources(...);
    
    // 5. 设置定时器事件
    setup_timed_events();
    
    // 6. 设置元流量(metatraffic)
    setup_meta_traffic();
    
    // 7. 设置用户流量
    setup_user_traffic();
    
    // 8. 启动内置协议(PDP, EDP, WLP)
    if (!setup_builtin_protocols()) {
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, 
            "Failed to setup builtin protocols");
        return;
    }
    
    initialized_ = true;
}

4.1.2 setup_transports() - 注册传输实例
bool RTPSParticipantImpl::setup_transports()
{
    // 1. 注册内置 UDP v4 传输
    if (m_att.useBuiltinTransports) {
        auto udp_descriptor = std::make_shared<UDPv4TransportDescriptor>();
        
        // 从配置中复制参数
        udp_descriptor->sendBufferSize = m_att.sendSocketBufferSize;
        udp_descriptor->receiveBufferSize = m_att.listenSocketBufferSize;
        udp_descriptor->maxMessageSize = m_att.sendBufferSize;
        
        // 注册到 NetworkFactory
        m_network_Factory.RegisterTransport<
            UDPv4Transport, 
            UDPv4TransportDescriptor>(*udp_descriptor);
    }
    
    // 2. 注册用户自定义传输
    for (auto& user_transport : m_att.userTransports) {
        m_network_Factory.RegisterTransport(user_transport.get());
    }
    
    // 3. 验证至少有一个传输
    if (m_network_Factory.numberOfRegisteredTransports() == 0) {
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, 
            "No transports registered");
        return false;
    }
    
    return true;
}

4.1.3 NetworkFactory::RegisterTransport

文件:src/cpp/rtps/network/NetworkFactory.cpp

bool NetworkFactory::RegisterTransport(
    const TransportDescriptorInterface* descriptor,
    const PropertyPolicy* properties,
    const uint32_t& max_msg_size_no_frag)
{
    // 1. 根据描述符类型创建传输实例
    std::unique_ptr<TransportInterface> transport = 
        create_transport_from_descriptor(descriptor);
    
    if (!transport) {
        EPROSIMA_LOG_ERROR(NETWORK_FACTORY, 
            "Failed to create transport from descriptor");
        return false;
    }
    
    // 2. 初始化传输
    if (!transport->init(properties, max_msg_size_no_frag)) {
        EPROSIMA_LOG_ERROR(NETWORK_FACTORY, 
            "Failed to initialize transport");
        return false;
    }
    
    // 3. 添加到注册列表
    mRegisteredTransports.emplace_back(std::move(transport));
    
    // 4. 更新最大消息大小
    maxMessageSizeBetweenTransports_ = std::min(
        maxMessageSizeBetweenTransports_,
        transport->max_recv_buffer_size());
    
    EPROSIMA_LOG_INFO(NETWORK_FACTORY, 
        "Transport registered: " << descriptor->get_type());
    
    return true;
}

4.2 UDP 传输实现示例

4.2.1 UDPv4Transport 类层次
TransportInterface (抽象基类)
    ↑ 继承
UDPTransportInterface (UDP 通用逻辑)
    - 管理输入 sockets 映射
    - 实现通用的 send/receive 逻辑
    - Locator 转换和优化
    ↑ 继承
UDPv4Transport (IPv4 特定实现)
    - IPv4 地址处理
    - IPv4 组播加入
    - IPv4 接口枚举

4.2.2 UDPTransportInterface 核心成员

文件:src/cpp/rtps/transport/UDPTransportInterface.h

class UDPTransportInterface : public TransportInterface {
protected:
    // ===== Socket 管理 =====
    
    // 输入 sockets 映射:端口 -> 多个 channel
    // 一个端口可能在多个网络接口上监听
    std::map<uint16_t, std::vector<UDPChannelResource*>> mInputSockets;
    
    // 保护 mInputSockets 的互斥锁
    mutable std::recursive_mutex mInputMapMutex;
    
    // ===== ASIO 上下文 =====
    
    // 所有 socket 共享的 io_context
    asio::io_context io_context_;
    
    // ===== 缓冲区配置 =====
    
    // 发送缓冲区大小
    uint32_t mSendBufferSize;
    
    // 接收缓冲区大小
    uint32_t mReceiveBufferSize;
    
    // ===== 网络接口管理 =====
    
    // 允许的网络接口白名单
    std::vector<AllowedNetworkInterface> allowed_interfaces_;
    
    // 网掩码过滤器配置
    NetmaskFilterKind netmask_filter_;
    
    // ===== 统计信息 =====
    
    // 传输统计信息
    TransportStatistics statistics_info_;
    
    // 是否需要重新扫描网络接口
    std::atomic<bool> rescan_interfaces_{false};
    
    // 第一次打开发送通道的标志
    bool first_time_open_output_channel_{true};
};

4.2.3 UDPChannelResource - 通道资源封装

文件:src/cpp/rtps/transport/UDPChannelResource.h

class UDPChannelResource : public ChannelResource {
private:
    // 接收回调接口(指向 MessageReceiver)
    TransportReceiverInterface* message_receiver_;
    
    // ASIO UDP socket
    eProsimaUDPSocket socket_;
    
    // 是否仅用于组播
    bool only_multicast_purpose_;
    
    // 绑定的网络接口名称
    std::string interface_;
    
    // 所属的传输实例
    UDPTransportInterface* transport_;
    
    // 接收线程
    std::thread thread_;

public:
    UDPChannelResource(
        UDPTransportInterface* transport,
        eProsimaUDPSocket& socket,
        uint32_t maxMsgSize,
        const Locator& locator,
        const std::string& sInterface,
        TransportReceiverInterface* receiver,
        const ThreadSettings& thread_config)
        : ChannelResource(maxMsgSize)
        , message_receiver_(receiver)
        , socket_(moveSocket(socket))
        , only_multicast_purpose_(false)
        , interface_(sInterface)
        , transport_(transport)
    {
        // 启动接收线程
        auto fn = [this, locator]() {
            perform_listen_operation(locator);
        };
        
        thread_ = create_thread(
            fn, 
            thread_config, 
            "dds.udp.%u",  // 线程命名:dds.udp.7412
            locator.port);
    }
    
    ~UDPChannelResource() {
        // 清理回调
        message_receiver_ = nullptr;
        
        // 关闭 socket
        asio::error_code ec;
        socket().close(ec);
    }
};

关键点

  • 每个 UDPChannelResource 对应一个监听端口
  • 构造时自动启动接收线程
  • 析构时清理资源

第五部分:关键交互点

5.1 传输层 ↔ 上层(RTPS)交互

方向 接口 作用 调用时机
上行 TransportReceiverInterface::OnDataReceived() 传输层收到数据后通知 RTPS 层 Socket 接收到数据时
下行 SenderResource::send() RTPS 层通过发送资源调用传输层发送 DataWriter 写入数据时
配置 TransportInterface::OpenInputChannel() RTPS 层注册接收回调 Participant 创建时
配置 TransportInterface::OpenOutputChannel() RTPS 层打开发送通道 发现远程端点时
查询 TransportInterface::IsLocatorSupported() 检查 locator 是否支持 Locator 选择时
优化 TransportInterface::RemoteToMainLocal() 优化远程 locator 本地通信检测时

5.2 传输层 ↔ 下层(OS/ASIO)交互

操作 ASIO 调用 位置 说明
接收 socket.receive_from(buffer, endpoint) UDPChannelResource::Receive() 阻塞接收数据
发送 socket.send_to(buffers, endpoint) UDPTransportInterface::send() 发送数据到目标
绑定 socket.bind(endpoint) UDPv4Transport::OpenAndBindInputSocket() 绑定本地端口
加入组播 socket.set_option(multicast::join_group()) UDPChannelResource 构造 加入组播组
关闭 socket.close() UDPChannelResource::~UDPChannelResource() 关闭 socket
取消异步操作 socket.cancel() UDPChannelResource::release() 中断阻塞接收

5.3 典型交互时序图

接收数据时序
Network      ASIO       UDPChannel    Message      RTPSReader    DataReader
  │            │         Resource     Receiver       (RTPS)       (DDS)
  │            │             │            │             │             │
  │──Packet──▶│             │            │             │             │
  │            │──recv_from─▶│            │             │             │
  │            │             │            │             │             │
  │            │             │──OnDataReceived─────────▶│             │
  │            │             │            │             │             │
  │            │             │            │──processCDRMsg──────────▶│
  │            │             │            │             │             │
  │            │             │            │             │──onNewCacheChange─▶│
  │            │             │            │             │             │
  │            │             │            │             │             │──take()──▶User
发送数据时序
User      DataWriter    RTPSWriter    RTPSParticipant   SenderResource   UDPTransport   ASIO      Network
  │            │             │                │                │               │           │          │
  │──write()──▶│             │                │                │               │           │          │
  │            │             │                │                │               │           │          │
  │            │──add_change─▶│                │                │               │           │          │
  │            │             │                │                │               │           │          │
  │            │             │──sendSync()────▶│                │               │           │          │
  │            │             │                │                │               │           │          │
  │            │             │                │──send()────────▶│               │           │          │
  │            │             │                │                │               │           │          │
  │            │             │                │                │──send_lambda()─▶│           │          │
  │            │             │                │                │               │           │          │
  │            │             │                │                │               │──send_to()─▶│          │
  │            │             │                │                │               │           │          │
  │            │             │                │                │               │           │──sendto()─▶

第六部分:设计亮点与最佳实践

6.1 设计亮点

✨ 1. 传输无关性

问题:不同场景需要不同的传输协议(UDP 低延迟、TCP 穿透防火墙、SHM 零拷贝)

解决方案

  • 统一的 TransportInterface 抽象
  • 上层代码完全不关心底层传输类型
  • 通过 Locator.kind 区分传输类型

收益

  • ✅ 易于扩展新传输类型
  • ✅ 支持混合传输(同时使用 UDP + SHM)
  • ✅ 代码复用率高

✨ 2. 多传输并发支持

场景:一个 Participant 可以同时注册多个传输

// 示例:同时使用 UDP 和共享内存
participant_attr.useBuiltinTransports = true;  // UDP

auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
participant_attr.userTransports.push_back(shm_transport);  // SHM

工作机制

  • NetworkFactory 维护传输列表
  • 发送时遍历所有传输,选择合适的发送
  • 接收时每个传输独立监听

收益

  • ✅ 本地通信自动使用 SHM(零拷贝)
  • ✅ 远程通信使用 UDP(跨主机)
  • ✅ 透明切换,无需应用层干预

✨ 3. 异步接收架构

设计

  • 每个接收通道(端口)独立线程
  • 阻塞式接收(高效,无轮询开销)
  • 线程名规范化(便于调试):dds.udp.7412

优势

  • ✅ 高并发:多个端口同时接收不阻塞
  • ✅ 低延迟:数据到达立即处理
  • ✅ 易调试:线程名清晰标识端口

注意事项

  • ⚠️ 线程数量 = 监听端口数 × 网络接口数
  • ⚠️ 大量端口时需考虑线程资源

✨ 4. RAII 资源管理

SenderResource

// 构造时打开通道
SenderResource resource = transport->OpenOutputChannel(locator);

// 使用时发送
resource.send(buffers, ...);

// 析构时自动关闭
// (离开作用域时自动释放)

ReceiverResource

// 构造时创建 socket 和接收线程
auto resource = std::make_shared<ReceiverResource>(...);

// 智能指针管理生命周期
// (引用计数为 0 时自动清理)

收益

  • ✅ 避免资源泄漏
  • ✅ 异常安全
  • ✅ 代码简洁

✨ 5. Locator 智能优化

场景:检测到通信双方在同一主机

优化过程

// 1. 远程 locator: 192.168.1.100:7412
Locator remote = {UDPv4, 7412, "192.168.1.100"};

// 2. 检测到是本地地址
if (transport->is_local_locator(remote)) {
    // 3. 转换为 localhost
    Locator optimized = {UDPv4, 7412, "127.0.0.1"};
}

// 4. 如果有 SHM 传输,优先使用 SHM
if (has_shm_transport) {
    Locator shm_locator = {SHM, 7412, "..."};
}

收益

  • ✅ 减少网络开销
  • ✅ 降低延迟
  • ✅ 提高吞吐量

6.2 最佳实践

📌 1. 传输选择建议
场景 推荐传输 理由
单机内通信 Shared Memory 零拷贝,微秒级延迟
局域网通信 UDP 低延迟,高吞吐
跨公网通信 TCP 穿透防火墙,可靠
实时性要求高 UDP + RELIABLE QoS 兼顾速度和可靠性
大数据传输 TCP 或 UDP + 分片 避免丢包

📌 2. 端口规划建议
// 推荐:使用连续端口范围
participant_attr.builtin.metatrafficUnicastLocatorList.clear();
Locator loc;
loc.kind = LOCATOR_KIND_UDPv4;
loc.port = 7400;  // 起始端口
IPLocator::setIPv4(loc, "0.0.0.0");
participant_attr.builtin.metatrafficUnicastLocatorList.push_back(loc);

// Fast-DDS 会自动分配后续端口:
// - Metatraffic Unicast: 7400
// - Default Unicast: 7410
// - User Data: 动态分配

注意

  • ✅ 避免端口冲突
  • ✅ 预留足够端口范围
  • ✅ 防火墙开放相应端口

📌 3. 缓冲区调优
// 根据消息大小调整缓冲区
UDPv4TransportDescriptor udp_desc;
udp_desc.sendBufferSize = 1024 * 1024;      // 1 MB 发送缓冲
udp_desc.receiveBufferSize = 1024 * 1024;   // 1 MB 接收缓冲
udp_desc.maxMessageSize = 65000;            // 最大消息大小

participant_attr.transportConfiguration.push_back(udp_desc);

经验值

  • 小消息(< 1KB):默认值即可
  • 中等消息(1KB-100KB):增加到 256KB
  • 大消息(> 100KB):考虑分片或使用 TCP

📌 4. 多网卡配置
// 指定使用特定网卡
UDPv4TransportDescriptor udp_desc;
udp_desc.interfaceWhiteList = {"eth0", "eth1"};  // 只使用 eth0 和 eth1

participant_attr.transportConfiguration.push_back(udp_desc);

适用场景

  • 多网卡服务器
  • 隔离内外网
  • 负载均衡

📌 5. 性能监控
// 启用统计模块
participant_attr.properties.push_back({
    "fastdds.statistics", "HISTORY_LATENCY_TOPIC"
});

// 运行时查询统计信息
auto stats = participant->get_statistics();
std::cout << "Latency: " << stats.latency << " ms" << std::endl;

关键指标

  • 端到端延迟
  • 吞吐量
  • 丢包率
  • 重传次数

附录:相关文件索引

A. 头文件

文件路径 说明 行数
include/fastdds/rtps/transport/TransportInterface.hpp 传输接口定义 347
include/fastdds/rtps/transport/TransportReceiverInterface.hpp 接收者接口 59
include/fastdds/rtps/transport/SenderResource.hpp 发送资源 160
include/fastdds/rtps/transport/UDPTransportDescriptor.hpp UDP 配置描述符 -
include/fastdds/rtps/transport/TCPTransportDescriptor.hpp TCP 配置描述符 -
src/cpp/rtps/transport/UDPTransportInterface.h UDP 传输接口 347
src/cpp/rtps/transport/UDPChannelResource.h UDP 通道资源 263
src/cpp/rtps/network/NetworkFactory.hpp 网络工厂 358
src/cpp/rtps/messages/MessageReceiver.h 消息接收器 305
src/cpp/rtps/participant/RTPSParticipantImpl.hpp RTPS 参与者 1439

B. 实现文件

文件路径 说明 行数
src/cpp/rtps/transport/UDPTransportInterface.cpp UDP 传输实现 839
src/cpp/rtps/transport/UDPChannelResource.cpp UDP 通道实现 141
src/cpp/rtps/transport/UDPv4Transport.cpp UDPv4 具体实现 -
src/cpp/rtps/transport/UDPv6Transport.cpp UDPv6 具体实现 -
src/cpp/rtps/transport/TCPTransportInterface.cpp TCP 传输实现 72K
src/cpp/rtps/messages/MessageReceiver.cpp 消息接收器 1476
src/cpp/rtps/participant/RTPSParticipantImpl.cpp RTPS 参与者实现 -
src/cpp/rtps/network/NetworkFactory.cpp 网络工厂实现 -

C. 示例代码

示例路径 说明
examples/cpp/hello_world/ 基础发布-订阅示例
examples/cpp/benchmark/ 性能测试示例
examples/cpp/discovery_server/ 发现服务器示例
examples/cpp/security/ 安全通信示例

D. 测试代码

测试路径 说明
test/blackbox/ 黑盒集成测试
test/unittest/rtps/transport/ 传输层单元测试
test/performance/ 性能基准测试

总结

Fast-DDS 的 Transport 层采用了清晰的分层架构优秀的抽象设计

传输无关性:通过 TransportInterface 抽象,上层无需关心底层传输
插件式扩展:新增传输类型只需实现接口,无需修改核心代码
高性能:异步接收、零拷贝、多传输并发
可靠性:RAII 资源管理、完善的错误处理
灵活性:支持 UDP/TCP/SHM 混合使用,自动优化路径

这个架构非常适合作为分布式系统传输层的参考设计

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐