Fast-DDS Transport 层架构详解
Fast-DDS Transport 层架构详解
源码分析文档 | 版本 v1.0 | 最后更新:2026-05-16
📋 目录
第一部分:基础概念
1.1 什么是 DDS?
DDS (Data Distribution Service) 是由 OMG (Object Management Group) 制定的数据分发服务标准,专为实时系统设计的发布-订阅通信中间件。
核心特点
✅ 发布-订阅模式:解耦生产者和消费者
✅ 去中心化架构:无单点故障
✅ QoS 策略丰富:支持可靠性、持久性、时序等 23+ 种服务质量策略
✅ 语言无关:支持 C++、Java、Python、C# 等多种语言
✅ 跨平台:Windows、Linux、macOS、RTOS 等
典型应用场景
- 🤖 机器人系统:ROS 2 的默认中间件
- 🚗 自动驾驶:车辆传感器数据分发
- 🏭 工业自动化:PLC 与控制系统的实时通信
- 🛰️ 航空航天:卫星地面站数据链路
- 🏥 医疗设备:监护仪数据实时传输
DDS 核心概念模型
┌─────────────┐ Topic ┌─────────────┐
│ Publisher │ ─────────────────────► │ Subscriber │
│ │ (Named Data Stream) │ │
│ DataWriter │ │ DataReader │
└──────┬──────┘ └──────┬──────┘
│ │
│ DomainParticipant │
└──────────────────────────────────────┘
(Communication Context)
- DomainParticipant:通信上下文,类似"聊天室"
- Topic:命名数据流,类似"话题频道"
- Publisher/DataWriter:数据生产者
- Subscriber/DataReader:数据消费者
1.2 什么是 RTPS?
RTPS (Real-Time Publish-Subscribe) 是 DDS 标准的有线协议,定义了数据如何在网络上传输。
RTPS vs DDS 的关系
┌─────────────────────────────────────┐
│ DDS (API 标准) │ ← 应用程序使用
│ - 定义编程接口 │
│ - 定义 QoS 策略 │
└──────────────┬──────────────────────┘
│ 实现
┌──────────────▼──────────────────────┐
│ RTPS (传输协议) │ ← 网络传输层
│ - 定义消息格式 │
│ - 定义发现机制 │
│ - 定义可靠性机制 │
└──────────────┬──────────────────────┘
│ 承载
┌──────────────▼──────────────────────┐
│ UDP / TCP / SHM (物理传输) │ ← 操作系统网络栈
└─────────────────────────────────────┘
RTPS 核心特性
🔹 基于 UDP:默认使用 UDP 进行高效传输
🔹 发现机制:自动发现网络中的参与者(PDP)和端点(EDP)
🔹 可靠性:通过 HEARTBEAT/ACKNACK 机制实现可靠传输
🔹 分片支持:大数据自动分片传输(DATA_FRAG)
🔹 历史缓存:支持 late-joiner 获取历史数据
RTPS 消息类型
| 消息类型 | 用途 | 可靠性 |
|---|---|---|
| DATA | 传输实际数据 | 可选可靠 |
| DATA_FRAG | 分片数据传输 | 可选可靠 |
| HEARTBEAT | 心跳(_writer 通知 _reader) | 不可靠 |
| ACKNACK | 确认/否定确认(_reader 响应) | 不可靠 |
| GAP | 通知数据已删除 | 不可靠 |
| INFO_SRC/DST | 源/目的信息 | 不可靠 |
1.3 Fast-DDS 简介
Fast-DDS(原名 Fast-RTPS)是由 eProsima 公司开发的高性能 C++ DDS 实现。
项目背景
- 🏢 开发商:eProsima(西班牙中间件公司)
- 📅 起始时间:2010 年
- 📜 许可证:Apache 2.0(开源)
- 🌟 GitHub Stars:4,000+
- 🎯 质量等级:ROS 2 Quality Level 1(最高级)
为什么选择 Fast-DDS?
✅ 高性能:优化的内存管理和零拷贝技术
✅ ROS 2 默认中间件:所有 ROS 2 LTS 版本的官方推荐
✅ 完整标准支持:DDS 1.4、RTPS 2.5、DDS-Security 1.1
✅ 多传输支持:UDP、TCP、共享内存、自定义传输
✅ 活跃社区:持续更新,商业支持可用
版本演进
| 版本 | 发布时间 | 重要特性 |
|---|---|---|
| v2.x | 2020 | 稳定版,ROS 2 Foxy/Galactic |
| v3.0 | 2023 | 重命名为 Fast-DDS,API 改进 |
| v3.4 | 2024 | 以太网传输、IP 移动性支持 |
| v3.5 | 2025 | 当前最新版本 |
1.4 核心术语表
在深入 Transport 层之前,需要理解以下核心概念:
网络相关术语
| 术语 | 英文 | 说明 |
|---|---|---|
| Locator | Locator | 网络地址抽象,包含 {kind, port, address} |
| 通道 | Channel | 逻辑通信路径(如 UDP 的一个端口) |
| 端点 | Endpoint | Writer 或 Reader 的统称 |
| GUID | Globally Unique Identifier | 全局唯一标识符,标识 Participant 或 Endpoint |
| GuidPrefix | GuidPrefix | GUID 的前 12 字节,标识 Participant |
传输相关术语
| 术语 | 英文 | 说明 |
|---|---|---|
| Transport | Transport | 传输层实现(UDP/TCP/SHM) |
| SenderResource | SenderResource | 发送资源的 RAII 封装 |
| ReceiverResource | ReceiverResource | 接收资源的 RAII 封装 |
| NetworkFactory | NetworkFactory | 管理多个 Transport 实例的工厂 |
| MessageReceiver | MessageReceiver | RTPS 消息接收器,解析 RTPS 协议 |
Locator 结构详解
struct Locator_t {
int32_t kind; // 传输类型:UDPv4=0, UDPv6=1, TCPv4=2, ...
uint32_t port; // 端口号
octet address[16]; // IP 地址(IPv4 用前 4 字节,IPv6 用全部)
};
示例:
Locator(kind=UDPv4, port=7412, address=192.168.1.100)
第二部分:Transport 层架构
2.1 整体架构层次
Fast-DDS 采用分层架构设计,从上到下共 6 层:
┌─────────────────────────────────────────────────┐
│ DDS API Layer (用户层) │
│ DomainParticipant / Publisher / Subscriber │
│ DataWriter / DataReader │
│ ✓ 应用程序直接使用的 API │
└──────────────┬──────────────────────────────────┘
│ 调用
┌──────────────▼──────────────────────────────────┐
│ RTPS Layer (协议层) │
│ RTPSParticipantImpl │
│ RTPSWriter / RTPSReader │
│ MessageReceiver (消息接收器) │
│ ✓ 实现 RTPS 协议逻辑 │
│ ✓ 处理发现、可靠性、历史缓存 │
└──────────────┬──────────────────────────────────┘
│ 使用
┌──────────────▼──────────────────────────────────┐
│ NetworkFactory (网络工厂层) │
│ - 管理多个 TransportInterface 实例 │
│ - 创建 SenderResource / ReceiverResource │
│ - Locator 转换与选择 │
│ ✓ 传输层的统一入口 │
└──────────────┬──────────────────────────────────┘
│ 抽象
┌──────────────▼──────────────────────────────────┐
│ Transport Interface Layer (传输抽象层) │
│ TransportInterface (抽象基类) │
│ ├─ UDPTransportInterface │
│ │ ├─ UDPv4Transport │
│ │ └─ UDPv6Transport │
│ ├─ TCPTransportInterface │
│ │ ├─ TCPv4Transport │
│ │ └─ TCPv6Transport │
│ └─ SharedMemoryTransport │
│ ✓ 定义传输层的统一接口 │
│ ✓ 支持插件式扩展新传输类型 │
└──────────────┬──────────────────────────────────┘
│ 封装
┌──────────────▼──────────────────────────────────┐
│ Channel Resource Layer (通道资源层) │
│ UDPChannelResource / TCPChannelResource │
│ - 封装底层 Socket │
│ - 管理接收线程 │
│ ✓ 每个监听端口一个独立线程 │
└──────────────┬──────────────────────────────────┘
│ 调用
┌──────────────▼──────────────────────────────────┐
│ OS Network Layer (操作系统网络层) │
│ ASIO Library (Boost.Asio) │
│ UDP Socket / TCP Socket / Shared Memory │
│ ✓ 跨平台网络 I/O 抽象 │
└─────────────────────────────────────────────────┘
各层职责说明
| 层级 | 主要职责 | 关键类 |
|---|---|---|
| DDS API | 提供用户友好的发布-订阅接口 | DomainParticipant, DataWriter |
| RTPS | 实现 RTPS 协议逻辑 | RTPSWriter, MessageReceiver |
| NetworkFactory | 管理传输实例,创建资源 | NetworkFactory |
| Transport Interface | 定义传输抽象接口 | TransportInterface |
| Channel Resource | 封装 Socket 和线程 | UDPChannelResource |
| OS Network | 操作系统网络调用 | Boost.Asio |
2.2 核心抽象接口
2.2.1 TransportInterface(传输接口)
TransportInterface 是传输层的核心抽象基类,所有具体传输(UDP/TCP/SHM)都必须继承并实现此接口。
文件位置:include/fastdds/rtps/transport/TransportInterface.hpp
class TransportInterface {
public:
virtual ~TransportInterface() = default;
// ===== 生命周期管理 =====
/**
* 初始化传输层
* @param properties 可选的属性策略
* @param max_msg_size_no_frag 可选的最大消息大小(避免分片)
* @return 初始化是否成功
*/
virtual bool init(
const PropertyPolicy* properties = nullptr,
const uint32_t& max_msg_size_no_frag = 0) = 0;
// ===== 发送相关接口 =====
/**
* 打开发送通道
* @param sender_resource_list 输出:创建的发送资源列表
* @param locator 目标定位器
* @return 是否成功打开
*/
virtual bool OpenOutputChannel(
SendResourceList& sender_resource_list,
const Locator&) = 0;
/**
* 批量打开发送通道(用于多个 locator)
*/
virtual bool OpenOutputChannels(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry);
// ===== 接收相关接口 =====
/**
* 打开接收通道
* @param locator 本地监听定位器
* @param receiver 接收回调接口(上层传入)
* @param max_message_size 最大消息大小
* @return 是否成功打开
*/
virtual bool OpenInputChannel(
const Locator&,
TransportReceiverInterface*,
uint32_t) = 0;
/**
* 关闭接收通道
*/
virtual bool CloseInputChannel(const Locator&) = 0;
// ===== Locator 管理接口 =====
/**
* 检查 locator 是否被此传输支持
*/
virtual bool IsLocatorSupported(const Locator&) const = 0;
/**
* 检查 locator 是否允许使用
*/
virtual bool is_locator_allowed(const Locator&) const = 0;
/**
* 检查 locator 是否可达
*/
virtual bool is_locator_reachable(const Locator_t& locator) = 0;
/**
* 将远程 locator 转换为本地优化的 locator
* (例如:检测到是本地通信时,转换为 localhost)
*/
virtual Locator RemoteToMainLocal(const Locator& remote) const = 0;
/**
* Locator 选择算法
* (在多 locator 场景下选择最优路径)
*/
virtual void select_locators(LocatorSelector& selector) const = 0;
// ===== 配置接口 =====
/**
* 获取传输配置描述符
*/
virtual TransportDescriptorInterface* get_configuration() = 0;
/**
* 添加默认输出 locator
*/
virtual void AddDefaultOutputLocator(LocatorList& defaultList) = 0;
};
关键设计思想:
- Locator 抽象:用
Locator统一表示通信端点,屏蔽底层 IP/端口细节 - 通道概念:一个"通道"对应一个逻辑通信路径(如 UDP 的一个端口)
- 资源管理:通过
SenderResource和ReceiverResource实现 RAII 管理 - 插件式扩展:新增传输类型只需继承
TransportInterface并实现虚函数
2.2.2 TransportReceiverInterface(接收者接口)
TransportReceiverInterface 定义了数据传输到上层的回调接口。
文件位置:include/fastdds/rtps/transport/TransportReceiverInterface.hpp
class TransportReceiverInterface {
public:
virtual ~TransportReceiverInterface() = default;
/**
* 当接收到数据时,传输层调用此方法
*
* @param data 原始数据指针
* @param size 数据大小(字节)
* @param local_locator 本地接收地址(哪个 socket 收到的)
* @param remote_locator 远程发送地址(谁发的)
*/
virtual void OnDataReceived(
const octet* data,
const uint32_t size,
const Locator& local_locator,
const Locator& remote_locator) = 0;
};
这是传输层向上层传递数据的关键接口!
实际实现:MessageReceiver 类实现了此接口,负责解析 RTPS 消息。
2.2.3 SenderResource(发送资源)
SenderResource 封装了发送操作的函数对象,采用 RAII 模式管理发送通道。
文件位置:include/fastdds/rtps/transport/SenderResource.hpp
class SenderResource {
public:
/**
* 发送数据到目标 locator
*
* @param buffers 数据缓冲区列表(支持 scatter-gather I/O)
* @param total_bytes 总字节数
* @param destination_locators_begin 目标 locator 迭代器起始
* @param destination_locators_end 目标 locator 迭代器结束
* @param max_blocking_time_point 最大阻塞时间点
* @param transport_priority 传输优先级(用于 QoS)
* @return 是否发送成功
*/
bool send(
const std::vector<NetworkBuffer>& buffers,
const uint32_t& total_bytes,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& max_blocking_time_point,
int32_t transport_priority);
/**
* 获取传输类型
*/
int32_t kind() const;
protected:
// 传输类型(UDPv4/UDPv6/TCP 等)
int32_t transport_kind_;
// 发送数据的 lambda 函数(由具体传输层注入)
std::function<bool(
const std::vector<NetworkBuffer>&,
uint32_t,
LocatorsIterator*,
LocatorsIterator*,
const std::chrono::steady_clock::time_point&,
int32_t)> send_lambda_;
};
设计亮点:
- 使用
std::function实现策略模式,不同传输注入不同的发送逻辑 - 支持 scatter-gather I/O(多个 buffer 一次性发送)
- 支持超时控制(
max_blocking_time_point)
第三部分:数据通路详解
3.1 接收数据通路(从网络到应用)
网络数据包到达
↓
┌─────────────────────────────────┐
│ 1. OS Kernel (UDP/TCP Socket) │
│ - 网卡驱动接收数据包 │
│ - 内核协议栈处理 │
│ - 放入 Socket 接收缓冲区 │
└──────────────┬──────────────────┘
↓ wake up
┌─────────────────────────────────┐
│ 2. ASIO Library │
│ socket.receive_from() │
│ - 从内核缓冲区拷贝数据 │
│ - 返回发送方 endpoint │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 3. UDPChannelResource │
│ perform_listen_operation() │
│ - 独立接收线程(每端口一个) │
│ - 阻塞式接收循环 │
│ - 调用 Receive() 获取数据 │
│ - 填充 message_buffer() │
└──────────────┬──────────────────┘
↓ 回调
┌─────────────────────────────────┐
│ 4. TransportReceiverInterface │
│ OnDataReceived() │
│ - 实际实现:MessageReceiver │
│ - 传入原始字节流 │
│ - 传入 local/remote locator │
└──────────────┬──────────────────┘
↓ 解析
┌─────────────────────────────────┐
│ 5. MessageReceiver │
│ processCDRMsg() │
│ - 解析 RTPS 头部 │
│ • Protocol Version │
│ • Vendor ID │
│ • GuidPrefix │
│ - 循环解析子消息 (Submessage) │
│ • DATA / DATA_FRAG │
│ • HEARTBEAT / ACKNACK │
│ • GAP / INFO_SRC/DST │
│ - 路由到对应的 Reader/Writer │
└──────────────┬──────────────────┘
↓ 处理
┌─────────────────────────────────┐
│ 6. RTPSReader/RTPSWriter │
│ - 处理 DATA 消息 │
│ • 验证序列号 │
│ • 添加到 History Cache │
│ • 触发 ACKNACK 响应 │
│ - 处理 HEARTBEAT 消息 │
│ • 检测缺失数据 │
│ • 发送 ACKNACK │
│ - 处理 ACKNACK 消息 │
│ • 标记已确认的数据 │
│ • 触发重传(如有必要) │
└──────────────┬──────────────────┘
↓ 交付
┌─────────────────────────────────┐
│ 7. DDS DataReader │
│ take()/read() │
│ - 从 History Cache 读取 │
│ - 反序列化 CDR 数据 │
│ - 转换为用户定义类型 │
│ - 交付给用户应用 │
│ - 调用用户注册的 Listener │
└─────────────────────────────────┘
关键代码位置
步骤 3 - UDPChannelResource 接收线程
文件:src/cpp/rtps/transport/UDPChannelResource.cpp
void UDPChannelResource::perform_listen_operation(Locator input_locator)
{
Locator remote_locator;
// 接收线程主循环
while (alive()) {
// 获取消息缓冲区
auto& msg = message_buffer();
// 阻塞接收(直到有数据或 socket 关闭)
if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) {
continue; // 接收失败,继续下一次循环
}
// 通过回调传递给上层(MessageReceiver)
if (message_receiver() != nullptr) {
message_receiver()->OnDataReceived(
msg.buffer, // 原始数据
msg.length, // 数据长度
input_locator, // 本地接收地址
remote_locator // 远程发送地址
);
}
else if (alive()) {
EPROSIMA_LOG_WARNING(RTPS_MSG_IN,
"Received Message, but no receiver attached");
}
}
// 线程退出时清理
message_receiver(nullptr);
}
关键点:
- 每个接收通道(端口)有一个独立的接收线程
- 阻塞式接收,效率高(无需轮询)
- 通过
message_receiver()回调将数据传递给上层
步骤 5 - MessageReceiver 处理消息
文件:src/cpp/rtps/messages/MessageReceiver.cpp
void MessageReceiver::processCDRMsg(
const Locator_t& source_locator,
const Locator_t& reception_locator,
CDRMessage_t* msg)
{
// 1. 重置状态(为处理新消息做准备)
reset();
// 2. 解析 RTPS 头部(16 字节)
if (!checkRTPSHeader(msg)) {
EPROSIMA_LOG_WARNING(RTPS_MSG_IN, "Invalid RTPS header");
return;
}
// 3. 循环解析子消息
while (msg->pos < msg->length) {
SubmessageHeader_t smh;
// 读取子消息头部(8 字节)
if (!readSubmessageHeader(msg, &smh)) {
break;
}
// 根据子消息类型分发处理
switch(smh.submessageId) {
case DATA:
proc_Submsg_Data(msg, &smh, writerID, was_decoded);
break;
case DATA_FRAG:
proc_Submsg_DataFrag(msg, &smh, was_decoded);
break;
case HEARTBEAT:
proc_Submsg_Heartbeat(msg, &smh, was_decoded);
break;
case ACKNACK:
proc_Submsg_Acknack(msg, &smh, was_decoded);
break;
case GAP:
proc_Submsg_Gap(msg, &smh, was_decoded);
break;
case INFO_TIMESTAMP:
proc_Submsg_InfoTS(msg, &smh);
break;
case INFO_SOURCE:
proc_Submsg_InfoSRC(msg, &smh);
break;
case INFO_DESTINATION:
proc_Submsg_InfoDST(msg, &smh);
break;
default:
EPROSIMA_LOG_WARNING(RTPS_MSG_IN,
"Unknown submessage type: " << smh.submessageId);
break;
}
}
}
关键点:
- RTPS 消息由头部 + 多个子消息组成
- 每个子消息有独立的类型和处理逻辑
- 处理函数会路由到对应的 Reader/Writer
3.2 发送数据通路(从应用到网络)
用户调用 write()
↓
┌─────────────────────────────────┐
│ 1. DDS DataWriter │
│ write() / dispose() │
│ - 验证 QoS 策略 │
│ - 序列化用户数据为 CDR 格式 │
│ - 创建 SampleInfo │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 2. RTPSWriter │
│ - 创建 CacheChange │
│ • 分配序列号 │
│ • 填充 GUID │
│ • 存储序列化数据 │
│ - 添加到 History Cache │
│ - 根据可靠性策略决定发送方式 │
│ • RELIABLE: 加入重传队列 │
│ • BEST_EFFORT: 立即发送 │
│ - 触发发送流程 │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 3. RTPSMessageGroup │
│ - 组装 RTPS 消息 │
│ - 添加 RTPS 头部 │
│ • Protocol Version (2.5) │
│ • Vendor ID (eProsima) │
│ • GuidPrefix │
│ - 添加子消息 │
│ • DATA (或 DATA_FRAG) │
│ • HEARTBEAT (RELIABLE) │
│ - 计算消息总大小 │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 4. RTPSParticipantImpl │
│ sendSync() │
│ - 锁定发送资源列表 │
│ - 遍历 send_resource_list_ │
│ (可能对应多个网络接口) │
│ - 调用 SenderResource::send()│
│ - 解锁 │
│ - 通知统计模块 │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 5. UDPSenderResource │
│ send_lambda_() │
│ - 调用 UDPTransportInterface │
│ - 传入 socket 和目标 locator │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 6. UDPTransportInterface │
│ send() │
│ - 验证数据大小 │
│ - 遍历目标 Locators │
│ - 生成目标 endpoint │
│ - 调用 socket.send_to() │
│ - 处理错误和超时 │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 7. ASIO Library │
│ socket.send_to() │
│ - 将数据拷贝到内核缓冲区 │
│ - 触发系统调用 sendto() │
└──────────────┬──────────────────┘
↓
┌─────────────────────────────────┐
│ 8. OS Kernel │
│ - 协议栈处理(UDP/IP) │
│ - 路由查找 │
│ - 网卡驱动发送 │
│ - 发送到网络 │
└─────────────────────────────────┘
关键代码位置
步骤 4 - RTPSParticipantImpl 发送
文件:src/cpp/rtps/participant/RTPSParticipantImpl.hpp
template<class LocatorIteratorT>
bool RTPSParticipantImpl::sendSync(
const std::vector<NetworkBuffer>& buffers,
const uint32_t& total_bytes,
const GUID_t& sender_guid,
const LocatorIteratorT& destination_locators_begin,
const LocatorIteratorT& destination_locators_end,
std::chrono::steady_clock::time_point& max_blocking_time_point,
int32_t transport_priority)
{
bool ret_code = false;
#if HAVE_STRICT_REALTIME
// 实时模式下使用带超时的锁
std::unique_lock<std::timed_mutex> lock(
m_send_resources_mutex_, std::defer_lock);
if (lock.try_lock_until(max_blocking_time_point)) {
#else
// 普通模式直接加锁
std::unique_lock<std::timed_mutex> lock(m_send_resources_mutex_);
#endif
{
ret_code = true;
// 遍历所有发送资源(可能对应不同网络接口)
for (auto& send_resource : send_resource_list_) {
LocatorIteratorT locators_begin = destination_locators_begin;
LocatorIteratorT locators_end = destination_locators_end;
// 调用传输层的发送接口
send_resource->send(
buffers,
total_bytes,
&locators_begin,
&locators_end,
max_blocking_time_point,
transport_priority);
}
lock.unlock();
// 通知统计模块(如果启用)
on_rtps_send(
sender_guid,
destination_locators_begin,
destination_locators_end,
total_bytes);
// 检查是否是发现协议数据包
on_discovery_packet(
sender_guid,
destination_locators_begin,
destination_locators_end);
}
return ret_code;
}
关键点:
- 支持多个发送资源(多网卡场景)
- 线程安全(使用 mutex 保护)
- 集成统计和监控功能
步骤 6 - UDPTransportInterface 发送
文件:src/cpp/rtps/transport/UDPTransportInterface.cpp
bool UDPTransportInterface::send(
const std::vector<NetworkBuffer>& buffers,
uint32_t total_bytes,
eProsimaUDPSocket& socket,
const Locator& remote_locator,
bool only_multicast_purpose,
bool whitelisted,
const std::chrono::microseconds& timeout)
{
// 1. 验证数据大小
if (total_bytes > configuration()->sendBufferSize) {
EPROSIMA_LOG_ERROR(TRANSPORT_UDP,
"Message too large: " << total_bytes);
return false;
}
bool success = false;
bool is_multicast = IPLocator::isMulticast(remote_locator);
// 2. 检查是否应该发送(组播/单播过滤)
if (is_multicast == only_multicast_purpose || whitelisted) {
// 3. 生成目标 endpoint
auto destinationEndpoint = generate_endpoint(
remote_locator,
IPLocator::getPhysicalPort(remote_locator));
size_t bytesSent = 0;
try {
// 4. 设置超时(非 Windows 平台)
#ifndef _WIN32
struct timeval timeStruct;
timeStruct.tv_sec = 0;
timeStruct.tv_usec = timeout.count();
setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO,
reinterpret_cast<const char*>(&timeStruct),
sizeof(timeStruct));
#endif
// 5. 调用 ASIO 发送
asio::error_code ec;
bytesSent = getSocketPtr(socket)->send_to(
buffers, // 数据缓冲区(scatter-gather)
destinationEndpoint, // 目标地址
0, // 标志位
ec); // 错误码
// 6. 处理错误
if (!!ec) {
if ((ec.value() == asio::error::would_block) ||
(ec.value() == asio::error::try_again)) {
// 非阻塞模式下缓冲区满
EPROSIMA_LOG_WARNING(TRANSPORT_UDP,
"UDP send would have blocked. Packet dropped.");
return true; // 不算错误
}
EPROSIMA_LOG_WARNING(TRANSPORT_UDP, ec.message());
return false;
}
// 7. 验证发送字节数
if (bytesSent != total_bytes) {
EPROSIMA_LOG_WARNING(TRANSPORT_UDP,
"Send incomplete: " << bytesSent << "/" << total_bytes);
}
}
catch (const std::exception& error) {
EPROSIMA_LOG_WARNING(TRANSPORT_UDP, error.what());
return false;
}
// 8. 记录日志
EPROSIMA_LOG_INFO(TRANSPORT_UDP,
"UDPTransport: " << bytesSent << " bytes TO "
<< destinationEndpoint << " FROM "
<< getSocketPtr(socket)->local_endpoint());
success = true;
}
return success;
}
关键点:
- 支持 scatter-gather I/O(多个 buffer 一次发送)
- 组播/单播过滤逻辑
- 完善的错误处理和日志记录
第四部分:实现细节
4.1 传输层初始化流程
4.1.1 RTPSParticipantImpl 构造函数
文件:src/cpp/rtps/participant/RTPSParticipantImpl.cpp
RTPSParticipantImpl::RTPSParticipantImpl(
uint32_t domain_id,
const RTPSParticipantAttributes& param,
const GuidPrefix_t& guidP,
RTPSParticipant* part,
RTPSParticipantListener* plisten)
: domain_id_(domain_id)
, m_att(param)
, mp_userParticipant(part)
, mp_participantListener(plisten)
{
// 1. 设置 GUID
setup_guids(guidP);
// 2. 注册传输层
if (!setup_transports()) {
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
"Failed to setup transports");
return;
}
// 3. 创建接收资源(监听端口)
createReceiverResources(...);
// 4. 创建发送资源
createSenderResources(...);
// 5. 设置定时器事件
setup_timed_events();
// 6. 设置元流量(metatraffic)
setup_meta_traffic();
// 7. 设置用户流量
setup_user_traffic();
// 8. 启动内置协议(PDP, EDP, WLP)
if (!setup_builtin_protocols()) {
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
"Failed to setup builtin protocols");
return;
}
initialized_ = true;
}
4.1.2 setup_transports() - 注册传输实例
bool RTPSParticipantImpl::setup_transports()
{
// 1. 注册内置 UDP v4 传输
if (m_att.useBuiltinTransports) {
auto udp_descriptor = std::make_shared<UDPv4TransportDescriptor>();
// 从配置中复制参数
udp_descriptor->sendBufferSize = m_att.sendSocketBufferSize;
udp_descriptor->receiveBufferSize = m_att.listenSocketBufferSize;
udp_descriptor->maxMessageSize = m_att.sendBufferSize;
// 注册到 NetworkFactory
m_network_Factory.RegisterTransport<
UDPv4Transport,
UDPv4TransportDescriptor>(*udp_descriptor);
}
// 2. 注册用户自定义传输
for (auto& user_transport : m_att.userTransports) {
m_network_Factory.RegisterTransport(user_transport.get());
}
// 3. 验证至少有一个传输
if (m_network_Factory.numberOfRegisteredTransports() == 0) {
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
"No transports registered");
return false;
}
return true;
}
4.1.3 NetworkFactory::RegisterTransport
文件:src/cpp/rtps/network/NetworkFactory.cpp
bool NetworkFactory::RegisterTransport(
const TransportDescriptorInterface* descriptor,
const PropertyPolicy* properties,
const uint32_t& max_msg_size_no_frag)
{
// 1. 根据描述符类型创建传输实例
std::unique_ptr<TransportInterface> transport =
create_transport_from_descriptor(descriptor);
if (!transport) {
EPROSIMA_LOG_ERROR(NETWORK_FACTORY,
"Failed to create transport from descriptor");
return false;
}
// 2. 初始化传输
if (!transport->init(properties, max_msg_size_no_frag)) {
EPROSIMA_LOG_ERROR(NETWORK_FACTORY,
"Failed to initialize transport");
return false;
}
// 3. 添加到注册列表
mRegisteredTransports.emplace_back(std::move(transport));
// 4. 更新最大消息大小
maxMessageSizeBetweenTransports_ = std::min(
maxMessageSizeBetweenTransports_,
transport->max_recv_buffer_size());
EPROSIMA_LOG_INFO(NETWORK_FACTORY,
"Transport registered: " << descriptor->get_type());
return true;
}
4.2 UDP 传输实现示例
4.2.1 UDPv4Transport 类层次
TransportInterface (抽象基类)
↑ 继承
UDPTransportInterface (UDP 通用逻辑)
- 管理输入 sockets 映射
- 实现通用的 send/receive 逻辑
- Locator 转换和优化
↑ 继承
UDPv4Transport (IPv4 特定实现)
- IPv4 地址处理
- IPv4 组播加入
- IPv4 接口枚举
4.2.2 UDPTransportInterface 核心成员
文件:src/cpp/rtps/transport/UDPTransportInterface.h
class UDPTransportInterface : public TransportInterface {
protected:
// ===== Socket 管理 =====
// 输入 sockets 映射:端口 -> 多个 channel
// 一个端口可能在多个网络接口上监听
std::map<uint16_t, std::vector<UDPChannelResource*>> mInputSockets;
// 保护 mInputSockets 的互斥锁
mutable std::recursive_mutex mInputMapMutex;
// ===== ASIO 上下文 =====
// 所有 socket 共享的 io_context
asio::io_context io_context_;
// ===== 缓冲区配置 =====
// 发送缓冲区大小
uint32_t mSendBufferSize;
// 接收缓冲区大小
uint32_t mReceiveBufferSize;
// ===== 网络接口管理 =====
// 允许的网络接口白名单
std::vector<AllowedNetworkInterface> allowed_interfaces_;
// 网掩码过滤器配置
NetmaskFilterKind netmask_filter_;
// ===== 统计信息 =====
// 传输统计信息
TransportStatistics statistics_info_;
// 是否需要重新扫描网络接口
std::atomic<bool> rescan_interfaces_{false};
// 第一次打开发送通道的标志
bool first_time_open_output_channel_{true};
};
4.2.3 UDPChannelResource - 通道资源封装
文件:src/cpp/rtps/transport/UDPChannelResource.h
class UDPChannelResource : public ChannelResource {
private:
// 接收回调接口(指向 MessageReceiver)
TransportReceiverInterface* message_receiver_;
// ASIO UDP socket
eProsimaUDPSocket socket_;
// 是否仅用于组播
bool only_multicast_purpose_;
// 绑定的网络接口名称
std::string interface_;
// 所属的传输实例
UDPTransportInterface* transport_;
// 接收线程
std::thread thread_;
public:
UDPChannelResource(
UDPTransportInterface* transport,
eProsimaUDPSocket& socket,
uint32_t maxMsgSize,
const Locator& locator,
const std::string& sInterface,
TransportReceiverInterface* receiver,
const ThreadSettings& thread_config)
: ChannelResource(maxMsgSize)
, message_receiver_(receiver)
, socket_(moveSocket(socket))
, only_multicast_purpose_(false)
, interface_(sInterface)
, transport_(transport)
{
// 启动接收线程
auto fn = [this, locator]() {
perform_listen_operation(locator);
};
thread_ = create_thread(
fn,
thread_config,
"dds.udp.%u", // 线程命名:dds.udp.7412
locator.port);
}
~UDPChannelResource() {
// 清理回调
message_receiver_ = nullptr;
// 关闭 socket
asio::error_code ec;
socket().close(ec);
}
};
关键点:
- 每个 UDPChannelResource 对应一个监听端口
- 构造时自动启动接收线程
- 析构时清理资源
第五部分:关键交互点
5.1 传输层 ↔ 上层(RTPS)交互
| 方向 | 接口 | 作用 | 调用时机 |
|---|---|---|---|
| 上行 | TransportReceiverInterface::OnDataReceived() |
传输层收到数据后通知 RTPS 层 | Socket 接收到数据时 |
| 下行 | SenderResource::send() |
RTPS 层通过发送资源调用传输层发送 | DataWriter 写入数据时 |
| 配置 | TransportInterface::OpenInputChannel() |
RTPS 层注册接收回调 | Participant 创建时 |
| 配置 | TransportInterface::OpenOutputChannel() |
RTPS 层打开发送通道 | 发现远程端点时 |
| 查询 | TransportInterface::IsLocatorSupported() |
检查 locator 是否支持 | Locator 选择时 |
| 优化 | TransportInterface::RemoteToMainLocal() |
优化远程 locator | 本地通信检测时 |
5.2 传输层 ↔ 下层(OS/ASIO)交互
| 操作 | ASIO 调用 | 位置 | 说明 |
|---|---|---|---|
| 接收 | socket.receive_from(buffer, endpoint) |
UDPChannelResource::Receive() |
阻塞接收数据 |
| 发送 | socket.send_to(buffers, endpoint) |
UDPTransportInterface::send() |
发送数据到目标 |
| 绑定 | socket.bind(endpoint) |
UDPv4Transport::OpenAndBindInputSocket() |
绑定本地端口 |
| 加入组播 | socket.set_option(multicast::join_group()) |
UDPChannelResource 构造 |
加入组播组 |
| 关闭 | socket.close() |
UDPChannelResource::~UDPChannelResource() |
关闭 socket |
| 取消异步操作 | socket.cancel() |
UDPChannelResource::release() |
中断阻塞接收 |
5.3 典型交互时序图
接收数据时序
Network ASIO UDPChannel Message RTPSReader DataReader
│ │ Resource Receiver (RTPS) (DDS)
│ │ │ │ │ │
│──Packet──▶│ │ │ │ │
│ │──recv_from─▶│ │ │ │
│ │ │ │ │ │
│ │ │──OnDataReceived─────────▶│ │
│ │ │ │ │ │
│ │ │ │──processCDRMsg──────────▶│
│ │ │ │ │ │
│ │ │ │ │──onNewCacheChange─▶│
│ │ │ │ │ │
│ │ │ │ │ │──take()──▶User
发送数据时序
User DataWriter RTPSWriter RTPSParticipant SenderResource UDPTransport ASIO Network
│ │ │ │ │ │ │ │
│──write()──▶│ │ │ │ │ │ │
│ │ │ │ │ │ │ │
│ │──add_change─▶│ │ │ │ │ │
│ │ │ │ │ │ │ │
│ │ │──sendSync()────▶│ │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │──send()────────▶│ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ │──send_lambda()─▶│ │ │
│ │ │ │ │ │ │ │
│ │ │ │ │ │──send_to()─▶│ │
│ │ │ │ │ │ │ │
│ │ │ │ │ │ │──sendto()─▶
第六部分:设计亮点与最佳实践
6.1 设计亮点
✨ 1. 传输无关性
问题:不同场景需要不同的传输协议(UDP 低延迟、TCP 穿透防火墙、SHM 零拷贝)
解决方案:
- 统一的
TransportInterface抽象 - 上层代码完全不关心底层传输类型
- 通过
Locator.kind区分传输类型
收益:
- ✅ 易于扩展新传输类型
- ✅ 支持混合传输(同时使用 UDP + SHM)
- ✅ 代码复用率高
✨ 2. 多传输并发支持
场景:一个 Participant 可以同时注册多个传输
// 示例:同时使用 UDP 和共享内存
participant_attr.useBuiltinTransports = true; // UDP
auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
participant_attr.userTransports.push_back(shm_transport); // SHM
工作机制:
NetworkFactory维护传输列表- 发送时遍历所有传输,选择合适的发送
- 接收时每个传输独立监听
收益:
- ✅ 本地通信自动使用 SHM(零拷贝)
- ✅ 远程通信使用 UDP(跨主机)
- ✅ 透明切换,无需应用层干预
✨ 3. 异步接收架构
设计:
- 每个接收通道(端口)独立线程
- 阻塞式接收(高效,无轮询开销)
- 线程名规范化(便于调试):
dds.udp.7412
优势:
- ✅ 高并发:多个端口同时接收不阻塞
- ✅ 低延迟:数据到达立即处理
- ✅ 易调试:线程名清晰标识端口
注意事项:
- ⚠️ 线程数量 = 监听端口数 × 网络接口数
- ⚠️ 大量端口时需考虑线程资源
✨ 4. RAII 资源管理
SenderResource:
// 构造时打开通道
SenderResource resource = transport->OpenOutputChannel(locator);
// 使用时发送
resource.send(buffers, ...);
// 析构时自动关闭
// (离开作用域时自动释放)
ReceiverResource:
// 构造时创建 socket 和接收线程
auto resource = std::make_shared<ReceiverResource>(...);
// 智能指针管理生命周期
// (引用计数为 0 时自动清理)
收益:
- ✅ 避免资源泄漏
- ✅ 异常安全
- ✅ 代码简洁
✨ 5. Locator 智能优化
场景:检测到通信双方在同一主机
优化过程:
// 1. 远程 locator: 192.168.1.100:7412
Locator remote = {UDPv4, 7412, "192.168.1.100"};
// 2. 检测到是本地地址
if (transport->is_local_locator(remote)) {
// 3. 转换为 localhost
Locator optimized = {UDPv4, 7412, "127.0.0.1"};
}
// 4. 如果有 SHM 传输,优先使用 SHM
if (has_shm_transport) {
Locator shm_locator = {SHM, 7412, "..."};
}
收益:
- ✅ 减少网络开销
- ✅ 降低延迟
- ✅ 提高吞吐量
6.2 最佳实践
📌 1. 传输选择建议
| 场景 | 推荐传输 | 理由 |
|---|---|---|
| 单机内通信 | Shared Memory | 零拷贝,微秒级延迟 |
| 局域网通信 | UDP | 低延迟,高吞吐 |
| 跨公网通信 | TCP | 穿透防火墙,可靠 |
| 实时性要求高 | UDP + RELIABLE QoS | 兼顾速度和可靠性 |
| 大数据传输 | TCP 或 UDP + 分片 | 避免丢包 |
📌 2. 端口规划建议
// 推荐:使用连续端口范围
participant_attr.builtin.metatrafficUnicastLocatorList.clear();
Locator loc;
loc.kind = LOCATOR_KIND_UDPv4;
loc.port = 7400; // 起始端口
IPLocator::setIPv4(loc, "0.0.0.0");
participant_attr.builtin.metatrafficUnicastLocatorList.push_back(loc);
// Fast-DDS 会自动分配后续端口:
// - Metatraffic Unicast: 7400
// - Default Unicast: 7410
// - User Data: 动态分配
注意:
- ✅ 避免端口冲突
- ✅ 预留足够端口范围
- ✅ 防火墙开放相应端口
📌 3. 缓冲区调优
// 根据消息大小调整缓冲区
UDPv4TransportDescriptor udp_desc;
udp_desc.sendBufferSize = 1024 * 1024; // 1 MB 发送缓冲
udp_desc.receiveBufferSize = 1024 * 1024; // 1 MB 接收缓冲
udp_desc.maxMessageSize = 65000; // 最大消息大小
participant_attr.transportConfiguration.push_back(udp_desc);
经验值:
- 小消息(< 1KB):默认值即可
- 中等消息(1KB-100KB):增加到 256KB
- 大消息(> 100KB):考虑分片或使用 TCP
📌 4. 多网卡配置
// 指定使用特定网卡
UDPv4TransportDescriptor udp_desc;
udp_desc.interfaceWhiteList = {"eth0", "eth1"}; // 只使用 eth0 和 eth1
participant_attr.transportConfiguration.push_back(udp_desc);
适用场景:
- 多网卡服务器
- 隔离内外网
- 负载均衡
📌 5. 性能监控
// 启用统计模块
participant_attr.properties.push_back({
"fastdds.statistics", "HISTORY_LATENCY_TOPIC"
});
// 运行时查询统计信息
auto stats = participant->get_statistics();
std::cout << "Latency: " << stats.latency << " ms" << std::endl;
关键指标:
- 端到端延迟
- 吞吐量
- 丢包率
- 重传次数
附录:相关文件索引
A. 头文件
| 文件路径 | 说明 | 行数 |
|---|---|---|
include/fastdds/rtps/transport/TransportInterface.hpp |
传输接口定义 | 347 |
include/fastdds/rtps/transport/TransportReceiverInterface.hpp |
接收者接口 | 59 |
include/fastdds/rtps/transport/SenderResource.hpp |
发送资源 | 160 |
include/fastdds/rtps/transport/UDPTransportDescriptor.hpp |
UDP 配置描述符 | - |
include/fastdds/rtps/transport/TCPTransportDescriptor.hpp |
TCP 配置描述符 | - |
src/cpp/rtps/transport/UDPTransportInterface.h |
UDP 传输接口 | 347 |
src/cpp/rtps/transport/UDPChannelResource.h |
UDP 通道资源 | 263 |
src/cpp/rtps/network/NetworkFactory.hpp |
网络工厂 | 358 |
src/cpp/rtps/messages/MessageReceiver.h |
消息接收器 | 305 |
src/cpp/rtps/participant/RTPSParticipantImpl.hpp |
RTPS 参与者 | 1439 |
B. 实现文件
| 文件路径 | 说明 | 行数 |
|---|---|---|
src/cpp/rtps/transport/UDPTransportInterface.cpp |
UDP 传输实现 | 839 |
src/cpp/rtps/transport/UDPChannelResource.cpp |
UDP 通道实现 | 141 |
src/cpp/rtps/transport/UDPv4Transport.cpp |
UDPv4 具体实现 | - |
src/cpp/rtps/transport/UDPv6Transport.cpp |
UDPv6 具体实现 | - |
src/cpp/rtps/transport/TCPTransportInterface.cpp |
TCP 传输实现 | 72K |
src/cpp/rtps/messages/MessageReceiver.cpp |
消息接收器 | 1476 |
src/cpp/rtps/participant/RTPSParticipantImpl.cpp |
RTPS 参与者实现 | - |
src/cpp/rtps/network/NetworkFactory.cpp |
网络工厂实现 | - |
C. 示例代码
| 示例路径 | 说明 |
|---|---|
examples/cpp/hello_world/ |
基础发布-订阅示例 |
examples/cpp/benchmark/ |
性能测试示例 |
examples/cpp/discovery_server/ |
发现服务器示例 |
examples/cpp/security/ |
安全通信示例 |
D. 测试代码
| 测试路径 | 说明 |
|---|---|
test/blackbox/ |
黑盒集成测试 |
test/unittest/rtps/transport/ |
传输层单元测试 |
test/performance/ |
性能基准测试 |
总结
Fast-DDS 的 Transport 层采用了清晰的分层架构和优秀的抽象设计:
✅ 传输无关性:通过 TransportInterface 抽象,上层无需关心底层传输
✅ 插件式扩展:新增传输类型只需实现接口,无需修改核心代码
✅ 高性能:异步接收、零拷贝、多传输并发
✅ 可靠性:RAII 资源管理、完善的错误处理
✅ 灵活性:支持 UDP/TCP/SHM 混合使用,自动优化路径
这个架构非常适合作为分布式系统传输层的参考设计!
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)