ROS2 机器人软件系统
从零搭建 ROS2 环境,掌握核心通信机制,完成 Gazebo 仿真、导航与实体机器人部署。
本文涵盖完整的学习路线、代码示例与实战项目。但要求学习者掌握Linux或Ubuntu操作系统,掌握Python和C++开发语言,会搭建部署虚拟机。
基于 Ubuntu 22.04 + ROS2 Humble
1.过一下概念:ROS2 是什么?
ROS2(Robot Operating System 2) 相当于机器人的操作系统。但它不是传统意义上Windows和Linux那样的操作系统,而是一个中间件,提供通信机制、工具库和硬件抽象,让不同开发者编写的复杂软件模块能够协同工作。
把机器人比作生物体,ROS2 就是神经系统和大脑——负责信息传递、决策与协调。
核心职责:
- 通信骨干:节点、话题、服务、动作,让感知、决策、控制高效协作。
- 工具生态:RVIZ2、rqt、命令行工具,调试可视化一站式。
- 硬件抽象:统一驱动接口,屏蔽传感器/执行器差异。
分布式计算:支持多机协同,模块可运行在不同计算机上。
2. 环境准备:Ubuntu 22.04 虚拟机
参考配置:
- VMware 17
- 内存 16GB,硬盘 100GB
创建虚拟机
按照标准 Ubuntu 22.04.5 安装流程完成虚拟机的创建即可。
系统优化:
- 更换国内软件镜像源,提高更新和安装软件的速度:
sudo vim /etc/apt/sources.list
# 替换为以下内容(适用于 jammy)
deb https://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse
deb https://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse
deb https://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse
deb https://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse
- 更新并安装增强工具:
sudo apt update && sudo apt upgrade -y
sudo apt install open-vm-tools-desktop -y
3. 安装 ROS2 Humble
- 设置语言环境(推荐英文)。确保系统语言环境支持UTF-8编码,这是ROS2正常工作的基础。打开终端,依次执行以下命令:
# 更新软件列表并安装locales包
sudo apt update && sudo apt install locales
# 生成英语区域设置
sudo locale-gen en_US en_US.UTF-8
# 永久设置系统语言环境(重启后依然有效)
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
# 为当前会话立即生效
export LANG=en_US.UTF-8
# 检查当前时区设置
timedatectl status
# 如果时区不是上海,可以设置:
sudo timedatectl set-timezone Asia/Shanghai
测试方式:
- 添加 ROS2 软件源
# 方案一:官方源(需 GPG 密钥,可能被墙)
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update && sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
# 方案二:跳过 GPG(国内用户实用)
echo "deb [trusted=yes] http://packages.ros.org/ros2/ubuntu jammy main" | sudo tee /etc/apt/sources.list.d/ros2.list
sudo apt update
- 初学者,推荐安装桌面完整版(Desktop),包含了ROS、RViz、示例程序和教程
# 更新软件包索引并升级系统
sudo apt update
sudo apt upgrade
# 桌面完整版ROS2
sudo apt install ros-humble-desktop
sudo apt install ros-dev-tools
- 环境配置(自动加载)
# 为了方便使用,将ROS2的环境变量自动添加到你的bash会话中。
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc
- 另外一种快捷安装方式鱼香ROS一键安装
# 下载并执行脚本
wget http://fishros.com/install -O fishros && . fishros

- 测试demo:小乌龟控制
ros2 run turtlesim turtlesim_node # 终端1小乌龟仿真机器人
ros2 run turtlesim turtle_teleop_key # 终端2小乌龟机器人键盘控制器
(注意聚焦在终端2才能发出控制命令,在终端1查看机器人的移动)
完成环境的搭建即可,不强求理解什么,在实战中自然体会和领悟。
4. Colcon 构建工具
Colcon 是 ROS2 的项目构建管理器,类似于 make/catkin。colcon 是ROS2中非常重要的构建工具。
核心作用:
- 编译代码:将源码编译成可执行文件。
- 依赖管理:自动处理包间依赖。
- 环境设置:生成工作空间环境配置。
- 包发现:让 ROS2 能找到你的包。
安装 Colcon:
sudo apt install python3-colcon-common-extensions
测试
创建如下工作空间结构:
ros2_ws/
├── src/ # 源代码目录
│ ├── package_1/ # 功能包1
│ ├── package_2/ # 功能包2
│ └── ...
├── build/ # 编译中间文件(colcon创建)
├── install/ # 安装目录(colcon创建)
└── log/ # 编译日志(colcon创建)
基本使用
# 1. 进入工作空间
cd ~/ros2_ws
# 2. 编译所有包
colcon build
# 3. 加载工作空间环境
source install/setup.bash
# 4. 现在ROS2就能找到你编译的包了
ros2 pkg list | grep my_package
常用命令,先扫一遍就行。了解下工具的功能的下设计使用思路
# 编译所有包
colcon build
# 编译特定包
colcon build --packages-select my_package
# 显示详细输出
colcon build --event-handlers console_direct+
# 并行编译(加快速度)
colcon build --parallel-workers 4
# 清理后重新编译
colcon build --cmake-clean-first
# 只编译依赖包
colcon build --packages-up-to my_package
5. 核心概念理解:节点、话题、服务、动作
首先我的理解是节点是ROS2系统的每个特定组成单位,而话题/服务/动作是三种不同的通讯方式。
(扫一下有个概念知道干嘛的即可,之后会一步步手搓)
节点(Node)
节点是 ROS2 中可执行程序的最小单位,每个节点负责一个特定功能(如摄像头驱动、导航计算)。
ros2 node list # 查看运行中的节点
ros2 node info /node_name # 查看节点的详细信息
演示,这是一个键盘控制节点
- 发布者(Publishers)——核心输出,它发布了 3 个话题,其中最关键的是:
- /cmd_vel (geometry_msgs/msg/Twist):这是核心输出话题。当你按下键盘方向键(如 i/j/k/l 等)时,该节点会计算线速度(前进/后退)和角速度(旋转),并发布到这个话题。下游的机器人底盘驱动节点订阅此话题即可移动。
- /parameter_events 和 /rosout:这是 ROS 2 节点自带的标准话题,分别用于广播参数变更事件和输出日志信息,与业务逻辑无关。
- 服务服务器(Service Servers)——仅限参数配置。它提供了 6 个服务,但全部都是 ROS 2 节点自动拥有的标准参数服务(如 get_parameters、set_parameters):
这表明该节点没有自定义的业务服务。它只允许你在运行时通过 ros2 param set 修改其内部参数(例如调整速度缩放比例),但不提供“暂停”、“停止”等自定义功能接口。 - 没有订阅者(Subscribers):该节点完全独立,不接收任何 ROS 话题数据来调整自身行为。
没有动作服务器/客户端(Action Servers/Clients):它不处理需要反馈或取消的长时间任务(如导航),只做简单、持续的指令发送。
话题(Topic)——发布/订阅(异步)
单向、异步,适合传感器数据流。类比:广播系统。例如单一的摄像头节点发布一个图像话题,订阅该图像话题的所有节点都能同时收到图像。
- 单向:数据只从发布者流向订阅者
- 异步:发送后不管,不关心谁接收
- 实时:适合传感器数据流
ros2 topic list # 查看当前所有话题
ros2 topic echo /topic_name # 查看话题里的数据
ros2 topic pub /topic std_msgs/String "data: 'hello'" # 手动模拟发布数据
# 查看话题关系
rqt_graph
演示:
1.启动一个键盘控制器节点,他会把键盘的输入发布到/cmd_vel话题
2.搞一个新终端查看这个/cmd_vel的数据。刚开始是阻塞的,只要在控制器节点按方向键,按键输入就会被监听到。
3.再启动一个节点,按数据格式,模拟手动发布一个数据

也成功被监听了
至于rqt_graph工具可以可视化查看订阅关系,例如这是一个宇树go2的四足控制器节点订阅了cmd_vel话题,所以键盘控制器就可以实现按键控制机器狗
服务(Service)——请求/响应(同步)
双向、同步,适合瞬时操作(如开关、查询)。类比:点餐。
ros2 service list # 查看当前所有服务
ros2 service call /service_name package/srv/Type "{args}" # 手动调用一个服务,发送请求并等待响应 /service_name:要调用的服务名称(需替换为实际名字) package/srv/Type:服务的数据类型(包名/服务文件名) "{args}":请求参数(YAML 格式)
动作(Action)——目标/反馈/结果(长时间任务)
异步、可取消、带反馈,适合导航、抓取等长时间任务。类比:宴席策划。
ros2 action list # 查看所有动作
ros2 action send_goal /action_name package/action/Type "{goal}" --feedback # 向动作服务器发送一个目标(goal),并开始执行该任务,同时显示执行过程中的实时反 (feedback)。/action_name:动作名称(如 /turtlebot4_navigate) package/action/Type:动作数据类型(包名/动作文件名) "{goal}":目标参数(YAML 格式) --feedback:实时打印反馈(进度、状态等)
6. 创建工作空间与功能包
- 创建工作空间
# 创建一个名为ros2_ws的工作空间
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build # 此时仅有 build/install/log 目录
source install/setup.bash
- 创建功能包(C++ 与 Python两种不同的创建方式)
# 在src目录下
cd ~/ros2_ws/src
# Python : Python格式功能包创建命令: ros2 pkg create <包名> --build-type <功能包> --dependencies <依赖工具包> --node-name <节点名称>
ros2 pkg create helloworld_py --build-type ament_python --dependencies rclpy std_msgs --node-name helloworld_node
# C++ : ros2 pkg create <包名> --build-type <功能包> --dependencies <依赖工具包> --node-name <节点名称>
ros2 pkg create helloworld_cpp --build-type ament_cmake --dependencies rclcpp --node-name helloworld_node
- 在python功能包下编写一个自定义节点代码,创建一个publisher_node发布节点,向/chatter话题每秒定时发布1个消息
~/ros2_ws/src/helloworld_py/helloworld_py/publisher_node.py:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Publisher(Node):
def __init__(self):
super().__init__('publisher_node')
self.pub = self.create_publisher(String, 'chatter', 10)
self.timer = self.create_timer(1, self.timer_callback)
self.counter = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello ROS2 {self.counter}'
self.pub.publish(msg)
self.get_logger().info(f'Publishing: {msg.data}')
self.counter += 1
def main(args=None):
rclpy.init(args=args)
node = Publisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
- 功能包的编译和运行
cd ~/ros2_ws
colcon build --packages-select helloworld_py # 编译特定的Python包
source install/setup.bash # 加载最新的编译数据
ros2 run helloworld_py publisher_node # 运行该包下的自定义节点
7. 三种通信机制代码实现(Python/C++)
-
(1)话题(Topic)——发布者与订阅者的演示
-
创建工作空间
# 创建ROS2工作空间目录 mkdir -p ~/ros2_learning_ws/src cd ~/ros2_learning_ws/src -
创建功能包
# 创建一个Python包,依赖rclpy和std_msgs ros2 pkg create topic_demo --build-type ament_python --dependencies rclpy std_msgs # 此时项目结构 ros2_learning_ws/ └── src/ └── topic_demo/ ├── package.xml # 包配置文件 ├── setup.py # Python包安装配置 ├── setup.cfg # 安装脚本配置 └── topic_demo/ ├── __init__.py # Python包初始化文件 ├── publisher_node.py # 发布者节点 └── subscriber_node.py # 订阅者节点 -
配置文件
- package.xml - 包清单文件
<?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="content="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <!-- 包的基本信息 --> <name>topic_demo</name> <version>0.0.0</version> <description>ROS2 topic communication learning demo</description> <maintainer email="you@example.com">Your Name</maintainer> <license>Apache License 2.0</license> <!-- 编译依赖:构建时需要这些包 --> <buildtool_depend>ament_python</buildtool_depend> <!-- 运行时依赖:运行节点时需要这些包 --> <depend>rclpy</depend> <!-- ROS2 Python客户端库 --> <depend>std_msgs</depend> <!-- 标准消息类型 --> <!-- 测试依赖 --> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <!-- 导出配置:告诉ROS2这是Python包 --> <export> <build_type>ament_python</build_type> </export> </package> - setup.py - Python包安装配置,这个文件告诉ROS2如何安装你的Python包和节点。
from setuptools import setup, find_packages package_name = 'topic_demo' setup( # 包的基本信息 name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), # 自动查找所有Python包 # 数据文件:包含package.xml等 data_files=[ # 安装package.xml到share目录 ('share/ament_index/resource_index/packages', ['resource/' + package_name]), # 安装package.xml到包目录 ('share/' + package_name, ['package.xml']), ], # 安装要求 install_requires=['setuptools'], zip_safe=True, # 维护者信息 maintainer='Your Name', maintainer_email='you@example.com', description='ROS2 topic communication learning demo', license='Apache License 2.0', # 测试要求 tests_require=['pytest'], # 最重要的部分:定义可执行节点 entry_points={ 'console_scripts': [ # 格式:'节点名 = 包名.文件名:主函数名' 'simple_publisher = topic_demo.publisher_node:main', 'simple_subscriber = topic_demo.subscriber_node:main', ], }, ) - setup.cfg - 安装脚本配置
[develop] # 开发模式安装时,脚本安装目录 script_dir=$base/lib/topic_demo [install] # 正常安装时,脚本安装目录 install_scripts=$base/lib/topic_demo
- package.xml - 包清单文件
-
发布者节点:创建 Publisher,定时发布 std_msgs/String 到 chatroom 话题。
topic_demo/publisher_node.py#!/usr/bin/env python3 """ ROS2 发布者节点示例 这个节点创建一个发布者,定期向话题发送消息 """ import rclpy # 导入ROS2 Python客户端库 from rclpy.node import Node # 导入Node基类 from std_msgs.msg import String # 导入标准字符串消息类型 import time class SimplePublisher(Node): """ 简单发布者节点类 继承自Node类,创建一个ROS2节点 """ def __init__(self): """ 节点初始化函数 在这里设置节点名称、发布者、定时器等 """ # 调用父类构造函数,设置节点名称为'simple_publisher' super().__init__('simple_publisher') # 创建一个发布者 (Publisher) # 参数说明: # String - 消息类型(这里是标准字符串消息) # 'chatroom' - 话题名称(其他节点通过这个名字订阅) # 10 - 队列大小(缓存的消息数量) self.publisher = self.create_publisher(String, 'chatroom', 10) # 创建一个定时器 (Timer) # 参数说明: # 1.0 - 定时器周期(秒),这里每1秒触发一次 # self.timer_callback - 回调函数,定时器触发时调用 self.timer = self.create_timer(1.0, self.timer_callback) # 计数器,用于跟踪发送的消息数量 self.counter = 0 # 节点启动日志信息 self.get_logger().info('=== 发布者节点已启动 ===') self.get_logger().info('节点名称: simple_publisher') self.get_logger().info('发布话题: chatroom') self.get_logger().info('消息类型: std_msgs/String') self.get_logger().info('发布频率: 1 Hz (每秒1条消息)') def timer_callback(self): """ 定时器回调函数 当定时器到期时自动调用此函数 """ # 创建一个String类型的消息对象 msg = String() # 设置消息内容 msg.data = f'你好,这是第 {self.counter} 条消息! 时间: {time.strftime("%H:%M:%S")}' # 发布消息到话题 self.publisher.publish(msg) # 在控制台输出日志信息 self.get_logger().info(f'发布消息: "{msg.data}"') # 计数器加1 self.counter += 1 # 每10条消息输出一个分隔符,方便观察 if self.counter % 10 == 0: self.get_logger().info('--- 已发送10条消息 ---') def main(args=None): """ 主函数 - 节点的入口点 """ # 初始化ROS2 Python客户端库 # 必须在创建任何节点之前调用 rclpy.init(args=args) try: # 创建发布者节点实例 simple_publisher = SimplePublisher() # 保持节点运行,等待回调函数执行 # spin()会让节点保持活跃状态,直到被明确关闭 rclpy.spin(simple_publisher) except KeyboardInterrupt: # 处理Ctrl+C键盘中断 print('\n节点被用户中断') except Exception as e: # 处理其他异常 print(f'节点运行出错: {e}') finally: # 清理资源 if 'simple_publisher' in locals(): simple_publisher.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭ROS2客户端库 print('=== 发布者节点已关闭 ===') # Python标准的主程序入口 if __name__ == '__main__': main() -
订阅者节点:创建 Subscriber,回调函数处理接收到的消息。
topic_demo/subscriber_node.py#!/usr/bin/env python3 """ ROS2 订阅者节点示例 这个节点创建一个订阅者,监听话题并接收消息 """ import rclpy # 导入ROS2 Python客户端库 from rclpy.node import Node # 导入Node基类 from std_msgs.msg import String # 导入标准字符串消息类型 class SimpleSubscriber(Node): """ 简单订阅者节点类 继承自Node类,创建一个ROS2节点来监听话题 """ def __init__(self): """ 节点初始化函数 在这里设置节点名称、订阅者等 """ # 调用父类构造函数,设置节点名称为'simple_subscriber' super().__init__('simple_subscriber') # 创建一个订阅者 (Subscriber) # 参数说明: # String - 消息类型(必须与发布者的消息类型匹配) # 'chatroom' - 话题名称(必须与发布者的话题名称匹配) # self.listener_callback - 回调函数,收到消息时调用 # 10 - 队列大小(缓存的消息数量) self.subscription = self.create_subscription( String, 'chatroom', self.listener_callback, 10 ) # 防止订阅对象被垃圾回收 self.subscription # 消息计数器 self.message_count = 0 # 节点启动日志信息 self.get_logger().info('=== 订阅者节点已启动 ===') self.get_logger().info('节点名称: simple_subscriber') self.get_logger().info('订阅话题: chatroom') self.get_logger().info('消息类型: std_msgs/String') self.get_logger().info('等待接收消息...') def listener_callback(self, msg): """ 消息接收回调函数 当收到新消息时自动调用此函数 参数: msg - 接收到的消息对象 """ # 消息计数器加1 self.message_count += 1 # 处理接收到的消息 # msg.data 包含消息的实际内容 received_message = msg.data # 在控制台输出接收到的消息 self.get_logger().info(f'收到消息 [{self.message_count}]: "{received_message}"') # 根据消息内容做出不同的响应 if '你好' in received_message: self.get_logger().info(' → 检测到中文问候!') elif 'Hello' in received_message: self.get_logger().info(' → 检测到英文问候!') # 每5条消息输出统计信息 if self.message_count % 5 == 0: self.get_logger().info(f'--- 已成功接收 {self.message_count} 条消息 ---') def main(args=None): """ 主函数 - 节点的入口点 """ # 初始化ROS2 Python客户端库 rclpy.init(args=args) try: # 创建订阅者节点实例 simple_subscriber = SimpleSubscriber() # 保持节点运行,等待消息到来 # 当消息到达时,listener_callback会被自动调用 rclpy.spin(simple_subscriber) except KeyboardInterrupt: # 处理Ctrl+C键盘中断 print('\n节点被用户中断') except Exception as e: # 处理其他异常 print(f'节点运行出错: {e}') finally: # 清理资源 if 'simple_subscriber' in locals(): # 输出最终统计信息 simple_subscriber.get_logger().info(f'总共接收消息: {simple_subscriber.message_count} 条') simple_subscriber.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭ROS2客户端库 print('=== 订阅者节点已关闭 ===') # Python标准的主程序入口 if __name__ == '__main__': main() -
构建和运行
- 构建包
# 切换到工作空间根目录 cd ~/ros2_learning_ws # 构建特定的包 colcon build --packages-select topic_demo # 或者构建所有包 # colcon build # 加载环境变量(使新构建的包可用) source install/setup.bash - 运行节点
终端1 - 发布者节点
终端2 - 订阅者节点# 加载ROS2环境(如果还没加载) source /opt/ros/humble/setup.bash # 根据你的ROS2版本调整 source ~/ros2_learning_ws/install/setup.bash # 运行发布者节点 ros2 run topic_demo simple_publisher# 加载环境 source /opt/ros/humble/setup.bash source ~/ros2_learning_ws/install/setup.bash # 运行订阅者节点 ros2 run topic_demo simple_subscriber
- 构建包
-
-
(2)服务(Service)——服务端与客户端
- 创建新的功能包
# 进入工作空间src目录 cd ~/ros2_learning_ws/src # 创建服务示例包,添加必要的依赖 ros2 pkg create service_demo --build-type ament_python --dependencies rclpy example_interfaces 项目文件结构 ros2_learning_ws/ └── src/ └── service_demo/ ├── package.xml ├── setup.py ├── setup.cfg └── service_demo/ ├── __init__.py ├── service_server.py # 服务端节点 ├── service_client.py # 客户端节点 └── custom_service_demo/ # 自定义服务类型(可选) - package.xml
<?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="content="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>service_demo</name> <version>0.0.0</version> <description>ROS2 service communication learning demo</description> <maintainer email="you@example.com">Your Name</maintainer> <license>Apache License 2.0</license> <buildtool_depend>ament_python</buildtool_depend> <!-- 依赖项说明: rclpy: ROS2 Python客户端库 example_interfaces: 包含示例服务定义,如AddTwoInts --> <depend>rclpy</depend> <depend>example_interfaces</depend> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <export> <build_type>ament_python</build_type> </export> </package> - setup.py
from setuptools import setup, find_packages package_name = 'service_demo' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='Your Name', maintainer_email='you@example.com', description='ROS2 service communication learning demo', license='Apache License 2.0', tests_require=['pytest'], entry_points={ 'console_scripts': [ # 服务端节点 'add_two_ints_server = service_demo.service_server:main', # 客户端节点 'add_two_ints_client = service_demo.service_client:main', ], }, ) - 服务端:实现加法回调,返回响应。
#!/usr/bin/env python3 """ ROS2 服务端节点示例 这个节点创建一个服务端,提供两个整数相加的服务 """ import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class AddTwoIntsServer(Node): """ 加法服务端节点 提供两个整数相加的服务 """ def __init__(self): """ 节点初始化 """ # 调用父类构造函数,设置节点名称 super().__init__('add_two_ints_server') # 创建一个服务 (Service) # 参数说明: # AddTwoInts - 服务类型(来自example_interfaces。我们这里将使用自定义服务类型,因此需要先创建服务定义。 但是,为了简化,我们首先使用ROS2中已有的标准服务类型example_interfaces/srv/AddTwoInts,这个服务类型包含两个整数请求和一个整数响应。 ) # 'add_two_ints' - 服务名称(客户端通过这个名字调用服务) # self.add_two_ints_callback - 回调函数,处理客户端请求 self.service = self.create_service( AddTwoInts, 'add_two_ints', self.add_two_ints_callback ) # 服务计数器 self.request_count = 0 # 节点启动日志 self.get_logger().info('=== 加法服务端节点已启动 ===') self.get_logger().info('节点名称: add_two_ints_server') self.get_logger().info('服务名称: add_two_ints') self.get_logger().info('服务描述: 计算两个整数的和') self.get_logger().info('等待客户端请求...') def add_two_ints_callback(self, request, response): """ 服务请求回调函数 当客户端调用服务时自动执行 参数: request - 服务请求对象,包含客户端发送的数据 response - 服务响应对象,用于返回结果给客户端 返回: response - 填充了计算结果的响应对象 """ # 请求计数器加1 self.request_count += 1 # 从请求对象中获取两个整数 a = request.a b = request.b # 执行加法计算 sum_result = a + b # 将计算结果设置到响应对象中 response.sum = sum_result # 记录服务调用信息 self.get_logger().info(f'收到服务请求 [{self.request_count}]:') self.get_logger().info(f' 请求数据: a = {a}, b = {b}') self.get_logger().info(f' 计算结果: {a} + {b} = {sum_result}') self.get_logger().info(f' 返回响应: sum = {sum_result}') # 每5次请求输出统计信息 if self.request_count % 5 == 0: self.get_logger().info(f'--- 已处理 {self.request_count} 个服务请求 ---') # 返回响应对象 return response def main(args=None): """ 主函数 """ # 初始化ROS2 rclpy.init(args=args) try: # 创建服务端节点实例 add_two_ints_server = AddTwoIntsServer() # 保持节点运行,等待服务请求 # 当客户端调用服务时,回调函数会自动执行 rclpy.spin(add_two_ints_server) except KeyboardInterrupt: # 处理Ctrl+C中断 print('\n节点被用户中断') except Exception as e: # 处理其他异常 print(f'节点运行出错: {e}') finally: # 清理资源 if 'add_two_ints_server' in locals(): # 输出最终统计 add_two_ints_server.get_logger().info( f'服务运行结束,总共处理 {add_two_ints_server.request_count} 个请求' ) add_two_ints_server.destroy_node() rclpy.shutdown() print('=== 服务端节点已关闭 ===') if __name__ == '__main__': main() - 客户端:异步调用服务,等待结果。
#!/usr/bin/env python3 """ ROS2 客户端节点示例 这个节点创建一个客户端,调用加法服务 """ import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts import sys import random import time class AddTwoIntsClient(Node): """ 加法客户端节点 调用加法服务来计算两个整数的和 """ def __init__(self): """ 节点初始化 """ super().__init__('add_two_ints_client') # 创建一个服务客户端 (Service Client) # 参数说明: # AddTwoInts - 服务类型(必须与服务端一致) # 'add_two_ints' - 服务名称(必须与服务端一致) self.client = self.create_client(AddTwoInts, 'add_two_ints') # 等待服务端可用 self.get_logger().info('等待服务端上线...') while not self.client.wait_for_service(timeout_sec=1.0): self.get_logger().info('服务不可用,继续等待...') # 请求计数器 self.request_count = 0 self.get_logger().info('=== 加法客户端节点已启动 ===') self.get_logger().info('节点名称: add_two_ints_client') self.get_logger().info('服务名称: add_two_ints') self.get_logger().info('服务端已连接,准备发送请求') def send_request(self, a, b): """ 发送服务请求 参数: a - 第一个整数 b - 第二个整数 返回: future - 异步调用的Future对象 """ # 检查服务是否可用 if not self.client.service_is_ready(): self.get_logger().error('服务不可用') return None # 创建请求对象 request = AddTwoInts.Request() request.a = a request.b = b # 记录发送的请求 self.request_count += 1 self.get_logger().info(f'发送请求 [{self.request_count}]: {a} + {b}') # 异步调用服务,返回Future对象 future = self.client.call_async(request) return future def handle_response(self, future): """ 处理服务响应 参数: future - 异步调用的Future对象 """ try: # 获取响应结果 response = future.result() self.get_logger().info(f'收到响应: 计算结果 = {response.sum}') except Exception as e: self.get_logger().error(f'服务调用失败: {e}') def main(args=None): """ 主函数 """ rclpy.init(args=args) try: # 创建客户端节点 client_node = AddTwoIntsClient() # 处理命令行参数 if len(sys.argv) == 3: # 如果提供了命令行参数,使用指定的数字 try: a = int(sys.argv[1]) b = int(sys.argv[2]) numbers_to_send = [(a, b)] except ValueError: client_node.get_logger().error('参数必须是整数') return else: # 如果没有提供参数,生成随机测试数据 client_node.get_logger().info('未提供命令行参数,使用随机测试数据') numbers_to_send = [ (random.randint(1, 100), random.randint(1, 100)) for _ in range(5) # 发送5个随机请求 ] # 存储所有的Future对象 futures = [] # 发送所有请求 for a, b in numbers_to_send: future = client_node.send_request(a, b) if future: futures.append(future) time.sleep(1) # 每次请求间隔1秒 client_node.get_logger().info('所有请求已发送,等待响应...') # 等待所有响应完成 start_time = time.time() while futures and time.time() - start_time < 10: # 最多等待10秒 rclpy.spin_once(client_node, timeout_sec=0.1) # 检查哪些请求已完成 completed_futures = [] for future in futures: if future.done(): client_node.handle_response(future) completed_futures.append(future) # 移除已完成的future for future in completed_futures: futures.remove(future) if futures: client_node.get_logger().warning('有些请求超时未响应') client_node.get_logger().info('=== 客户端运行完成 ===') except KeyboardInterrupt: print('\n节点被用户中断') except Exception as e: print(f'节点运行出错: {e}') finally: if 'client_node' in locals(): client_node.destroy_node() rclpy.shutdown() print('=== 客户端节点已关闭 ===') if __name__ == '__main__': main() - 构建和运行
# 进入工作空间根目录 cd ~/ros2_learning_ws # 构建服务示例包 colcon build --packages-select service_demo # 加载环境 source install/setup.bash #终端1 - 启动服务端 source ~/ros2_learning_ws/install/setup.bash ros2 run service_demo add_two_ints_server #终端2 - 启动客户端(使用随机数) source ~/ros2_learning_ws/install/setup.bash ros2 run service_demo add_two_ints_client #终端3 - 启动客户端(指定数字) source ~/ros2_learning_ws/install/setup.bash ros2 run service_demo add_two_ints_client 15 27
- 创建新的功能包
-
自定义服务类型,需要时再查吧,别在这里卡太久
-
(3)动作(Action),不太用到,这里先过掉,边用边学即可。
8. Launch 文件
Launch文件是ROS2中用于自动启动多个节点的脚本。相当于一个"一键启动"脚本,可以同时启动整个机器人系统。ROS2 推荐使用 Python Launch 文件。
- 创建 专门的Launch 包
cd ~/ros2_file/row2_learning_ws/src
ros2 pkg create my _launch --build-type ament_python --dependencies launch_ros
- 目录结构
my _launch/
├── launch/ # 存放Launch文件
│ ├── simple_launch.py
│ └── multi_nodes_launch.py
├── my _launch/
│ ├── __init__.py
├── setup.py
├── setup.cfg
└── package.xml
- 示例 simple_launch.py
#!/usr/bin/env python3
"""
最简单的Launch文件示例
这个文件演示如何启动一个节点
"""
from launch import LaunchDescription
from launch_ros.actions import Node # 修正导入语句
def generate_launch_description():
"""
这个函数是Launch文件的入口点
必须返回一个LaunchDescription对象
"""
# 创建一个节点动作
talker_node = Node(
package='turtlesim', # 节点所在的包名
executable='turtlesim_node', # 节点的可执行文件名
output='screen' # 输出到屏幕
)
# 创建Launch描述
ld = LaunchDescription()
# 将节点添加到Launch描述中
ld.add_action(talker_node)
return ld
- Setup.py
from setuptools import setup
import os
from glob import glob
package_name = 'my_launch'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
# 安装package.xml
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# 安装Launch文件 - 这是关键部分!
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*.py'))),
# 如果将来有配置文件,也可以这样安装
# (os.path.join('share', package_name, 'config'),
# glob(os.path.join('config', '*.yaml'))),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='your_name',
maintainer_email='your_email@example.com',
description='First launch file examples',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
# 如果没有可执行节点,这里可以为空
],
},
)
- package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypelocation="http://www.ros.org/schema/package_format3.xsd"?>
<package format="3">
<name>my_launch</name>
<version>0.0.0</version>
<description>First launch file examples</description>
<maintainer email="you@example.com">Your Name</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<!-- 添加这些依赖 -->
<exec_depend>demo_nodes_cpp</exec_depend>
<exec_depend>launch_ros</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
- 运行
# 构建包
colcon build --packages-select my _launch
# 运行Launch文件
ros2 launch my_launch simple_launch.py
检查是否编译成功
9. 可视化工具:RQT 与 RViz2
- RQT——图形化调试面板
sudo apt install ros-humble-rqt*
rqt
-
常用插件:
- rqt_graph:显示节点/话题关系图。
- rqt_plot:实时绘制数据曲线。
- rqt_console:日志查看。
- rqt_reconfigure:动态参数调节。
-
RViz2——3D 可视化
sudo apt install ros-humble-rviz2
rviz2
核心 Display 类型:
RobotModel:显示 URDF 机器人模型。
TF:显示坐标变换树。
LaserScan:显示激光雷达数据。
Map:显示栅格地图。
Path:显示规划路径。
PointCloud2:显示点云。
Image: 显示图像
了解下大概即可,实战中用吧
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)