从零搭建 ROS2 环境,掌握核心通信机制,完成 Gazebo 仿真、导航与实体机器人部署。
本文涵盖完整的学习路线、代码示例与实战项目。但要求学习者掌握Linux或Ubuntu操作系统,掌握Python和C++开发语言,会搭建部署虚拟机。
基于 Ubuntu 22.04 + ROS2 Humble

1.过一下概念:ROS2 是什么?

ROS2(Robot Operating System 2) 相当于机器人的操作系统。但它不是传统意义上Windows和Linux那样的操作系统,而是一个中间件,提供通信机制、工具库和硬件抽象,让不同开发者编写的复杂软件模块能够协同工作。
把机器人比作生物体,ROS2 就是神经系统和大脑——负责信息传递、决策与协调。

核心职责:

  • 通信骨干:节点、话题、服务、动作,让感知、决策、控制高效协作。
  • 工具生态:RVIZ2、rqt、命令行工具,调试可视化一站式。
  • 硬件抽象:统一驱动接口,屏蔽传感器/执行器差异。
    分布式计算:支持多机协同,模块可运行在不同计算机上。

2. 环境准备:Ubuntu 22.04 虚拟机

参考配置:

  • VMware 17
  • 内存 16GB,硬盘 100GB
    创建虚拟机
    按照标准 Ubuntu 22.04.5 安装流程完成虚拟机的创建即可。

系统优化:

  • 更换国内软件镜像源,提高更新和安装软件的速度:
sudo vim /etc/apt/sources.list
# 替换为以下内容(适用于 jammy)
deb https://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse
deb https://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse
deb https://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse
deb https://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse
deb-src https://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse
  • 更新并安装增强工具:
sudo apt update && sudo apt upgrade -y
sudo apt install open-vm-tools-desktop -y

3. 安装 ROS2 Humble

  • 设置语言环境(推荐英文)。确保系统语言环境支持UTF-8编码,这是ROS2正常工作的基础。打开终端,依次执行以下命令:
# 更新软件列表并安装locales包
sudo apt update && sudo apt install locales
# 生成英语区域设置
sudo locale-gen en_US en_US.UTF-8
# 永久设置系统语言环境(重启后依然有效)
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
# 为当前会话立即生效
export LANG=en_US.UTF-8	
# 检查当前时区设置
timedatectl status
# 如果时区不是上海,可以设置:
sudo timedatectl set-timezone Asia/Shanghai

测试方式:
在这里插入图片描述

  • 添加 ROS2 软件源
# 方案一:官方源(需 GPG 密钥,可能被墙)
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update && sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null

# 方案二:跳过 GPG(国内用户实用)
echo "deb [trusted=yes] http://packages.ros.org/ros2/ubuntu jammy main" | sudo tee /etc/apt/sources.list.d/ros2.list
sudo apt update
  • 初学者,推荐安装桌面完整版(Desktop),包含了ROS、RViz、示例程序和教程
# 更新软件包索引并升级系统
sudo apt update
sudo apt upgrade
# 桌面完整版ROS2
sudo apt install ros-humble-desktop
sudo apt install ros-dev-tools
  • 环境配置(自动加载)
# 为了方便使用,将ROS2的环境变量自动添加到你的bash会话中。
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc
  • 另外一种快捷安装方式鱼香ROS一键安装
# 下载并执行脚本
wget http://fishros.com/install -O fishros && . fishros

在这里插入图片描述

  • 测试demo:小乌龟控制
ros2 run turtlesim turtlesim_node          # 终端1小乌龟仿真机器人
ros2 run turtlesim turtle_teleop_key       # 终端2小乌龟机器人键盘控制器

(注意聚焦在终端2才能发出控制命令,在终端1查看机器人的移动)
完成环境的搭建即可,不强求理解什么,在实战中自然体会和领悟。

4. Colcon 构建工具

Colcon 是 ROS2 的项目构建管理器,类似于 make/catkin。colcon 是ROS2中非常重要的构建工具。
核心作用:

  • 编译代码:将源码编译成可执行文件。
  • 依赖管理:自动处理包间依赖。
  • 环境设置:生成工作空间环境配置。
  • 包发现:让 ROS2 能找到你的包。

安装 Colcon:

sudo apt install python3-colcon-common-extensions

测试在这里插入图片描述

创建如下工作空间结构:

ros2_ws/
├── src/                    # 源代码目录
│   ├── package_1/          # 功能包1
│   ├── package_2/          # 功能包2
│   └── ...                
├── build/                  # 编译中间文件(colcon创建)
├── install/                # 安装目录(colcon创建)
└── log/                    # 编译日志(colcon创建)

基本使用

# 1. 进入工作空间
cd ~/ros2_ws

# 2. 编译所有包
colcon build

# 3. 加载工作空间环境
source install/setup.bash

# 4. 现在ROS2就能找到你编译的包了
ros2 pkg list | grep my_package

常用命令,先扫一遍就行。了解下工具的功能的下设计使用思路

# 编译所有包
colcon build

# 编译特定包
colcon build --packages-select my_package

# 显示详细输出
colcon build --event-handlers console_direct+

# 并行编译(加快速度)
colcon build --parallel-workers 4

# 清理后重新编译
colcon build --cmake-clean-first

# 只编译依赖包
colcon build --packages-up-to my_package

5. 核心概念理解:节点、话题、服务、动作

首先我的理解是节点是ROS2系统的每个特定组成单位,而话题/服务/动作是三种不同的通讯方式。
(扫一下有个概念知道干嘛的即可,之后会一步步手搓)

节点(Node)

节点是 ROS2 中可执行程序的最小单位,每个节点负责一个特定功能(如摄像头驱动、导航计算)。

ros2 node list          # 查看运行中的节点
ros2 node info /node_name	# 查看节点的详细信息

演示,这是一个键盘控制节点
在这里插入图片描述

  • 发布者(Publishers)——核心输出,它发布了 3 个话题,其中最关键的是:
    • /cmd_vel (geometry_msgs/msg/Twist):这是核心输出话题。当你按下键盘方向键(如 i/j/k/l 等)时,该节点会计算线速度(前进/后退)和角速度(旋转),并发布到这个话题。下游的机器人底盘驱动节点订阅此话题即可移动。
    • /parameter_events 和 /rosout:这是 ROS 2 节点自带的标准话题,分别用于广播参数变更事件和输出日志信息,与业务逻辑无关。
  • 服务服务器(Service Servers)——仅限参数配置。它提供了 6 个服务,但全部都是 ROS 2 节点自动拥有的标准参数服务(如 get_parameters、set_parameters):
    这表明该节点没有自定义的业务服务。它只允许你在运行时通过 ros2 param set 修改其内部参数(例如调整速度缩放比例),但不提供“暂停”、“停止”等自定义功能接口。
  • 没有订阅者(Subscribers):该节点完全独立,不接收任何 ROS 话题数据来调整自身行为。
    没有动作服务器/客户端(Action Servers/Clients):它不处理需要反馈或取消的长时间任务(如导航),只做简单、持续的指令发送。

话题(Topic)——发布/订阅(异步)

单向、异步,适合传感器数据流。类比:广播系统。例如单一的摄像头节点发布一个图像话题,订阅该图像话题的所有节点都能同时收到图像。

  • 单向:数据只从发布者流向订阅者
  • 异步:发送后不管,不关心谁接收
  • 实时:适合传感器数据流
ros2 topic list	# 查看当前所有话题
ros2 topic echo /topic_name	# 查看话题里的数据
ros2 topic pub /topic std_msgs/String "data: 'hello'" # 手动模拟发布数据
# 查看话题关系
rqt_graph

演示:
1.启动一个键盘控制器节点,他会把键盘的输入发布到/cmd_vel话题
在这里插入图片描述
2.搞一个新终端查看这个/cmd_vel的数据。刚开始是阻塞的,只要在控制器节点按方向键,按键输入就会被监听到。
在这里插入图片描述
3.再启动一个节点,按数据格式,模拟手动发布一个数据

在这里插入图片描述
也成功被监听了
在这里插入图片描述
至于rqt_graph工具可以可视化查看订阅关系,例如这是一个宇树go2的四足控制器节点订阅了cmd_vel话题,所以键盘控制器就可以实现按键控制机器狗
在这里插入图片描述

服务(Service)——请求/响应(同步)

双向、同步,适合瞬时操作(如开关、查询)。类比:点餐。

ros2 service list # 查看当前所有服务
ros2 service call /service_name package/srv/Type "{args}" # 手动调用一个服务,发送请求并等待响应 /service_name:要调用的服务名称(需替换为实际名字) package/srv/Type:服务的数据类型(包名/服务文件名) "{args}":请求参数(YAML 格式)

动作(Action)——目标/反馈/结果(长时间任务)

异步、可取消、带反馈,适合导航、抓取等长时间任务。类比:宴席策划。

ros2 action list # 查看所有动作
ros2 action send_goal /action_name package/action/Type "{goal}" --feedback # 向动作服务器发送一个目标(goal),并开始执行该任务,同时显示执行过程中的实时反 (feedback)。/action_name:动作名称(如 /turtlebot4_navigate) package/action/Type:动作数据类型(包名/动作文件名) "{goal}":目标参数(YAML 格式) --feedback:实时打印反馈(进度、状态等)

6. 创建工作空间与功能包

  • 创建工作空间
# 创建一个名为ros2_ws的工作空间
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build   # 此时仅有 build/install/log 目录
source install/setup.bash
  • 创建功能包(C++ 与 Python两种不同的创建方式)
# 在src目录下
cd ~/ros2_ws/src
# Python : Python格式功能包创建命令: ros2 pkg create <包名> --build-type <功能包> --dependencies <依赖工具包> --node-name <节点名称>
ros2 pkg create helloworld_py --build-type ament_python --dependencies rclpy std_msgs --node-name helloworld_node
# C++ : ros2 pkg create <包名> --build-type <功能包> --dependencies <依赖工具包> --node-name <节点名称>
ros2 pkg create helloworld_cpp --build-type ament_cmake --dependencies rclcpp --node-name helloworld_node
  • 在python功能包下编写一个自定义节点代码,创建一个publisher_node发布节点,向/chatter话题每秒定时发布1个消息
    ~/ros2_ws/src/helloworld_py/helloworld_py/publisher_node.py:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class Publisher(Node):
    def __init__(self):
        super().__init__('publisher_node')
        self.pub = self.create_publisher(String, 'chatter', 10)
        self.timer = self.create_timer(1, self.timer_callback)
        self.counter = 0

    def timer_callback(self):
        msg = String()
        msg.data = f'Hello ROS2 {self.counter}'
        self.pub.publish(msg)
        self.get_logger().info(f'Publishing: {msg.data}')
        self.counter += 1

def main(args=None):
    rclpy.init(args=args)
    node = Publisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
  • 功能包的编译和运行
cd ~/ros2_ws
colcon build --packages-select helloworld_py	# 编译特定的Python包
source install/setup.bash							# 加载最新的编译数据
ros2 run helloworld_py publisher_node	# 运行该包下的自定义节点

7. 三种通信机制代码实现(Python/C++)

  • (1)话题(Topic)——发布者与订阅者的演示

    • 创建工作空间

      # 创建ROS2工作空间目录
      mkdir -p ~/ros2_learning_ws/src
      cd ~/ros2_learning_ws/src
      
    • 创建功能包

      # 创建一个Python包,依赖rclpy和std_msgs
      ros2 pkg create topic_demo --build-type ament_python --dependencies rclpy std_msgs
      # 此时项目结构
      ros2_learning_ws/
      └── src/
          └── topic_demo/
              ├── package.xml           # 包配置文件
              ├── setup.py              # Python包安装配置
              ├── setup.cfg             # 安装脚本配置
              └── topic_demo/
                  ├── __init__.py       # Python包初始化文件
                  ├── publisher_node.py # 发布者节点
                  └── subscriber_node.py # 订阅者节点
      
    • 配置文件

      • package.xml - 包清单文件
        <?xml version="1.0"?>
        <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="content="http://www.w3.org/2001/XMLSchema"?>
        <package format="3">
          <!-- 包的基本信息 -->
          <name>topic_demo</name>
          <version>0.0.0</version>
          <description>ROS2 topic communication learning demo</description>
          <maintainer email="you@example.com">Your Name</maintainer>
          <license>Apache License 2.0</license>
        
          <!-- 编译依赖:构建时需要这些包 -->
          <buildtool_depend>ament_python</buildtool_depend>
        
          <!-- 运行时依赖:运行节点时需要这些包 -->
          <depend>rclpy</depend>         <!-- ROS2 Python客户端库 -->
          <depend>std_msgs</depend>      <!-- 标准消息类型 -->
        
          <!-- 测试依赖 -->
          <test_depend>ament_copyright</test_depend>
          <test_depend>ament_flake8</test_depend>
          <test_depend>ament_pep257</test_depend>
          <test_depend>python3-pytest</test_depend>
        
          <!-- 导出配置:告诉ROS2这是Python包 -->
          <export>
            <build_type>ament_python</build_type>
          </export>
        </package>
        
        
      • setup.py - Python包安装配置,这个文件告诉ROS2如何安装你的Python包和节点。
        from setuptools import setup, find_packages
        
        package_name = 'topic_demo'
        
        setup(
            # 包的基本信息
            name=package_name,
            version='0.0.0',
            packages=find_packages(exclude=['test']),  # 自动查找所有Python包
            
            # 数据文件:包含package.xml等
            data_files=[
                # 安装package.xml到share目录
                ('share/ament_index/resource_index/packages',
                    ['resource/' + package_name]),
                # 安装package.xml到包目录
                ('share/' + package_name, ['package.xml']),
            ],
            
            # 安装要求
            install_requires=['setuptools'],
            zip_safe=True,
            
            # 维护者信息
            maintainer='Your Name',
            maintainer_email='you@example.com',
            description='ROS2 topic communication learning demo',
            license='Apache License 2.0',
            
            # 测试要求
            tests_require=['pytest'],
            
            # 最重要的部分:定义可执行节点
            entry_points={
                'console_scripts': [
                    # 格式:'节点名 = 包名.文件名:主函数名'
                    'simple_publisher = topic_demo.publisher_node:main',
                    'simple_subscriber = topic_demo.subscriber_node:main',
                ],
            },
        )
        
        
      • setup.cfg - 安装脚本配置
        [develop]
        # 开发模式安装时,脚本安装目录
        script_dir=$base/lib/topic_demo
        
        [install]
        # 正常安装时,脚本安装目录  
        install_scripts=$base/lib/topic_demo
        
    • 发布者节点:创建 Publisher,定时发布 std_msgs/String 到 chatroom 话题。
      topic_demo/publisher_node.py

      #!/usr/bin/env python3
      """
      ROS2 发布者节点示例
      这个节点创建一个发布者,定期向话题发送消息
      """
      
      import rclpy  # 导入ROS2 Python客户端库
      from rclpy.node import Node  # 导入Node基类
      from std_msgs.msg import String  # 导入标准字符串消息类型
      import time
      
      class SimplePublisher(Node):
          """
          简单发布者节点类
          继承自Node类,创建一个ROS2节点
          """
          
          def __init__(self):
              """
              节点初始化函数
              在这里设置节点名称、发布者、定时器等
              """
              # 调用父类构造函数,设置节点名称为'simple_publisher'
              super().__init__('simple_publisher')
              
              # 创建一个发布者 (Publisher)
              # 参数说明:
              #   String - 消息类型(这里是标准字符串消息)
              #   'chatroom' - 话题名称(其他节点通过这个名字订阅)
              #   10 - 队列大小(缓存的消息数量)
              self.publisher = self.create_publisher(String, 'chatroom', 10)
              
              # 创建一个定时器 (Timer)
              # 参数说明:
              #   1.0 - 定时器周期(秒),这里每1秒触发一次
              #   self.timer_callback - 回调函数,定时器触发时调用
              self.timer = self.create_timer(1.0, self.timer_callback)
              
              # 计数器,用于跟踪发送的消息数量
              self.counter = 0
              
              # 节点启动日志信息
              self.get_logger().info('=== 发布者节点已启动 ===')
              self.get_logger().info('节点名称: simple_publisher')
              self.get_logger().info('发布话题: chatroom')
              self.get_logger().info('消息类型: std_msgs/String')
              self.get_logger().info('发布频率: 1 Hz (每秒1条消息)')
          
          def timer_callback(self):
              """
              定时器回调函数
              当定时器到期时自动调用此函数
              """
              # 创建一个String类型的消息对象
              msg = String()
              
              # 设置消息内容
              msg.data = f'你好,这是第 {self.counter} 条消息! 时间: {time.strftime("%H:%M:%S")}'
              
              # 发布消息到话题
              self.publisher.publish(msg)
              
              # 在控制台输出日志信息
              self.get_logger().info(f'发布消息: "{msg.data}"')
              
              # 计数器加1
              self.counter += 1
              
              # 每10条消息输出一个分隔符,方便观察
              if self.counter % 10 == 0:
                  self.get_logger().info('--- 已发送10条消息 ---')
      
      def main(args=None):
          """
          主函数 - 节点的入口点
          """
          # 初始化ROS2 Python客户端库
          # 必须在创建任何节点之前调用
          rclpy.init(args=args)
          
          try:
              # 创建发布者节点实例
              simple_publisher = SimplePublisher()
              
              # 保持节点运行,等待回调函数执行
              # spin()会让节点保持活跃状态,直到被明确关闭
              rclpy.spin(simple_publisher)
              
          except KeyboardInterrupt:
              # 处理Ctrl+C键盘中断
              print('\n节点被用户中断')
          except Exception as e:
              # 处理其他异常
              print(f'节点运行出错: {e}')
          finally:
              # 清理资源
              if 'simple_publisher' in locals():
                  simple_publisher.destroy_node()  # 销毁节点
              rclpy.shutdown()  # 关闭ROS2客户端库
              print('=== 发布者节点已关闭 ===')
      
      # Python标准的主程序入口
      if __name__ == '__main__':
          main()
      
      
    • 订阅者节点:创建 Subscriber,回调函数处理接收到的消息。
      topic_demo/subscriber_node.py

      #!/usr/bin/env python3
      """
      ROS2 订阅者节点示例
      这个节点创建一个订阅者,监听话题并接收消息
      """
      
      import rclpy  # 导入ROS2 Python客户端库
      from rclpy.node import Node  # 导入Node基类
      from std_msgs.msg import String  # 导入标准字符串消息类型
      
      class SimpleSubscriber(Node):
          """
          简单订阅者节点类
          继承自Node类,创建一个ROS2节点来监听话题
          """
          
          def __init__(self):
              """
              节点初始化函数
              在这里设置节点名称、订阅者等
              """
              # 调用父类构造函数,设置节点名称为'simple_subscriber'
              super().__init__('simple_subscriber')
              
              # 创建一个订阅者 (Subscriber)
              # 参数说明:
              #   String - 消息类型(必须与发布者的消息类型匹配)
              #   'chatroom' - 话题名称(必须与发布者的话题名称匹配)
              #   self.listener_callback - 回调函数,收到消息时调用
              #   10 - 队列大小(缓存的消息数量)
              self.subscription = self.create_subscription(
                  String,
                  'chatroom',
                  self.listener_callback,
                  10
              )
              
              # 防止订阅对象被垃圾回收
              self.subscription
              
              # 消息计数器
              self.message_count = 0
              
              # 节点启动日志信息
              self.get_logger().info('=== 订阅者节点已启动 ===')
              self.get_logger().info('节点名称: simple_subscriber')
              self.get_logger().info('订阅话题: chatroom')
              self.get_logger().info('消息类型: std_msgs/String')
              self.get_logger().info('等待接收消息...')
          
          def listener_callback(self, msg):
              """
              消息接收回调函数
              当收到新消息时自动调用此函数
              
              参数:
                  msg - 接收到的消息对象
              """
              # 消息计数器加1
              self.message_count += 1
              
              # 处理接收到的消息
              # msg.data 包含消息的实际内容
              received_message = msg.data
              
              # 在控制台输出接收到的消息
              self.get_logger().info(f'收到消息 [{self.message_count}]: "{received_message}"')
              
              # 根据消息内容做出不同的响应
              if '你好' in received_message:
                  self.get_logger().info('  → 检测到中文问候!')
              elif 'Hello' in received_message:
                  self.get_logger().info('  → 检测到英文问候!')
              
              # 每5条消息输出统计信息
              if self.message_count % 5 == 0:
                  self.get_logger().info(f'--- 已成功接收 {self.message_count} 条消息 ---')
      
      def main(args=None):
          """
          主函数 - 节点的入口点
          """
          # 初始化ROS2 Python客户端库
          rclpy.init(args=args)
          
          try:
              # 创建订阅者节点实例
              simple_subscriber = SimpleSubscriber()
              
              # 保持节点运行,等待消息到来
              # 当消息到达时,listener_callback会被自动调用
              rclpy.spin(simple_subscriber)
              
          except KeyboardInterrupt:
              # 处理Ctrl+C键盘中断
              print('\n节点被用户中断')
          except Exception as e:
              # 处理其他异常
              print(f'节点运行出错: {e}')
          finally:
              # 清理资源
              if 'simple_subscriber' in locals():
                  # 输出最终统计信息
                  simple_subscriber.get_logger().info(f'总共接收消息: {simple_subscriber.message_count} 条')
                  simple_subscriber.destroy_node()  # 销毁节点
              rclpy.shutdown()  # 关闭ROS2客户端库
              print('=== 订阅者节点已关闭 ===')
      
      # Python标准的主程序入口
      if __name__ == '__main__':
          main()
      
      
    • 构建和运行

      • 构建包
        # 切换到工作空间根目录
        cd ~/ros2_learning_ws
        
        # 构建特定的包
        colcon build --packages-select topic_demo
        
        # 或者构建所有包
        # colcon build
        
        # 加载环境变量(使新构建的包可用)
        source install/setup.bash
        
        
      • 运行节点
        终端1 - 发布者节点
        # 加载ROS2环境(如果还没加载)
        source /opt/ros/humble/setup.bash  # 根据你的ROS2版本调整
        source ~/ros2_learning_ws/install/setup.bash
        
        # 运行发布者节点
        ros2 run topic_demo simple_publisher
        
        终端2 - 订阅者节点
        # 加载环境
        source /opt/ros/humble/setup.bash
        source ~/ros2_learning_ws/install/setup.bash
        
        # 运行订阅者节点
        ros2 run topic_demo simple_subscriber
        
        
  • (2)服务(Service)——服务端与客户端

    • 创建新的功能包
      # 进入工作空间src目录
      cd ~/ros2_learning_ws/src
      
      # 创建服务示例包,添加必要的依赖
      ros2 pkg create service_demo --build-type ament_python --dependencies rclpy example_interfaces
      
      项目文件结构
      ros2_learning_ws/
      └── src/
          └── service_demo/
              ├── package.xml
              ├── setup.py
              ├── setup.cfg
              └── service_demo/
                  ├── __init__.py
                  ├── service_server.py    # 服务端节点
                  ├── service_client.py    # 客户端节点
                  └── custom_service_demo/ # 自定义服务类型(可选)
      
      
    • package.xml
      <?xml version="1.0"?>
      <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="content="http://www.w3.org/2001/XMLSchema"?>
      <package format="3">
        <name>service_demo</name>
        <version>0.0.0</version>
        <description>ROS2 service communication learning demo</description>
        <maintainer email="you@example.com">Your Name</maintainer>
        <license>Apache License 2.0</license>
      
        <buildtool_depend>ament_python</buildtool_depend>
      
        <!-- 依赖项说明:
          rclpy: ROS2 Python客户端库
          example_interfaces: 包含示例服务定义,如AddTwoInts
        -->
        <depend>rclpy</depend>
        <depend>example_interfaces</depend>
      
        <test_depend>ament_copyright</test_depend>
        <test_depend>ament_flake8</test_depend>
        <test_depend>ament_pep257</test_depend>
        <test_depend>python3-pytest</test_depend>
      
        <export>
          <build_type>ament_python</build_type>
        </export>
      </package>
      
      
    • setup.py
      from setuptools import setup, find_packages
      
      package_name = 'service_demo'
      
      setup(
          name=package_name,
          version='0.0.0',
          packages=find_packages(exclude=['test']),
          data_files=[
              ('share/ament_index/resource_index/packages',
                  ['resource/' + package_name]),
              ('share/' + package_name, ['package.xml']),
          ],
          install_requires=['setuptools'],
          zip_safe=True,
          maintainer='Your Name',
          maintainer_email='you@example.com',
          description='ROS2 service communication learning demo',
          license='Apache License 2.0',
          tests_require=['pytest'],
          entry_points={
              'console_scripts': [
                  # 服务端节点
                  'add_two_ints_server = service_demo.service_server:main',
                  # 客户端节点
                  'add_two_ints_client = service_demo.service_client:main',
              ],
          },
      )
      
      
    • 服务端:实现加法回调,返回响应。
      #!/usr/bin/env python3
      """
      ROS2 服务端节点示例
      这个节点创建一个服务端,提供两个整数相加的服务
      """
      
      import rclpy
      from rclpy.node import Node
      from example_interfaces.srv import AddTwoInts
      
      class AddTwoIntsServer(Node):
          """
          加法服务端节点
          提供两个整数相加的服务
          """
      
          def __init__(self):
              """
              节点初始化
              """
              # 调用父类构造函数,设置节点名称
              super().__init__('add_two_ints_server')
              
              # 创建一个服务 (Service)
              # 参数说明:
              #   AddTwoInts - 服务类型(来自example_interfaces。我们这里将使用自定义服务类型,因此需要先创建服务定义。
      但是,为了简化,我们首先使用ROS2中已有的标准服务类型example_interfaces/srv/AddTwoInts,这个服务类型包含两个整数请求和一个整数响应。
      )
              #   'add_two_ints' - 服务名称(客户端通过这个名字调用服务)
              #   self.add_two_ints_callback - 回调函数,处理客户端请求
              self.service = self.create_service(
                  AddTwoInts, 
                  'add_two_ints', 
                  self.add_two_ints_callback
              )
              
              # 服务计数器
              self.request_count = 0
              
              # 节点启动日志
              self.get_logger().info('=== 加法服务端节点已启动 ===')
              self.get_logger().info('节点名称: add_two_ints_server')
              self.get_logger().info('服务名称: add_two_ints')
              self.get_logger().info('服务描述: 计算两个整数的和')
              self.get_logger().info('等待客户端请求...')
      
          def add_two_ints_callback(self, request, response):
              """
              服务请求回调函数
              当客户端调用服务时自动执行
              
              参数:
                  request - 服务请求对象,包含客户端发送的数据
                  response - 服务响应对象,用于返回结果给客户端
              
              返回:
                  response - 填充了计算结果的响应对象
              """
              # 请求计数器加1
              self.request_count += 1
              
              # 从请求对象中获取两个整数
              a = request.a
              b = request.b
              
              # 执行加法计算
              sum_result = a + b
              
              # 将计算结果设置到响应对象中
              response.sum = sum_result
              
              # 记录服务调用信息
              self.get_logger().info(f'收到服务请求 [{self.request_count}]:')
              self.get_logger().info(f'  请求数据: a = {a}, b = {b}')
              self.get_logger().info(f'  计算结果: {a} + {b} = {sum_result}')
              self.get_logger().info(f'  返回响应: sum = {sum_result}')
              
              # 每5次请求输出统计信息
              if self.request_count % 5 == 0:
                  self.get_logger().info(f'--- 已处理 {self.request_count} 个服务请求 ---')
              
              # 返回响应对象
              return response
      
      def main(args=None):
          """
          主函数
          """
          # 初始化ROS2
          rclpy.init(args=args)
          
          try:
              # 创建服务端节点实例
              add_two_ints_server = AddTwoIntsServer()
              
              # 保持节点运行,等待服务请求
              # 当客户端调用服务时,回调函数会自动执行
              rclpy.spin(add_two_ints_server)
              
          except KeyboardInterrupt:
              # 处理Ctrl+C中断
              print('\n节点被用户中断')
          except Exception as e:
              # 处理其他异常
              print(f'节点运行出错: {e}')
          finally:
              # 清理资源
              if 'add_two_ints_server' in locals():
                  # 输出最终统计
                  add_two_ints_server.get_logger().info(
                      f'服务运行结束,总共处理 {add_two_ints_server.request_count} 个请求'
                  )
                  add_two_ints_server.destroy_node()
              rclpy.shutdown()
              print('=== 服务端节点已关闭 ===')
      
      if __name__ == '__main__':
          main()
      
      
    • 客户端:异步调用服务,等待结果。
      #!/usr/bin/env python3
      """
      ROS2 客户端节点示例
      这个节点创建一个客户端,调用加法服务
      """
      
      import rclpy
      from rclpy.node import Node
      from example_interfaces.srv import AddTwoInts
      import sys
      import random
      import time
      
      class AddTwoIntsClient(Node):
          """
          加法客户端节点
          调用加法服务来计算两个整数的和
          """
      
          def __init__(self):
              """
              节点初始化
              """
              super().__init__('add_two_ints_client')
              
              # 创建一个服务客户端 (Service Client)
              # 参数说明:
              #   AddTwoInts - 服务类型(必须与服务端一致)
              #   'add_two_ints' - 服务名称(必须与服务端一致)
              self.client = self.create_client(AddTwoInts, 'add_two_ints')
              
              # 等待服务端可用
              self.get_logger().info('等待服务端上线...')
              while not self.client.wait_for_service(timeout_sec=1.0):
                  self.get_logger().info('服务不可用,继续等待...')
              
              # 请求计数器
              self.request_count = 0
              
              self.get_logger().info('=== 加法客户端节点已启动 ===')
              self.get_logger().info('节点名称: add_two_ints_client')
              self.get_logger().info('服务名称: add_two_ints')
              self.get_logger().info('服务端已连接,准备发送请求')
      
          def send_request(self, a, b):
              """
              发送服务请求
              
              参数:
                  a - 第一个整数
                  b - 第二个整数
              
              返回:
                  future - 异步调用的Future对象
              """
              # 检查服务是否可用
              if not self.client.service_is_ready():
                  self.get_logger().error('服务不可用')
                  return None
              
              # 创建请求对象
              request = AddTwoInts.Request()
              request.a = a
              request.b = b
              
              # 记录发送的请求
              self.request_count += 1
              self.get_logger().info(f'发送请求 [{self.request_count}]: {a} + {b}')
              
              # 异步调用服务,返回Future对象
              future = self.client.call_async(request)
              return future
      
          def handle_response(self, future):
              """
              处理服务响应
              
              参数:
                  future - 异步调用的Future对象
              """
              try:
                  # 获取响应结果
                  response = future.result()
                  self.get_logger().info(f'收到响应: 计算结果 = {response.sum}')
              except Exception as e:
                  self.get_logger().error(f'服务调用失败: {e}')
      
      def main(args=None):
          """
          主函数
          """
          rclpy.init(args=args)
          
          try:
              # 创建客户端节点
              client_node = AddTwoIntsClient()
              
              # 处理命令行参数
              if len(sys.argv) == 3:
                  # 如果提供了命令行参数,使用指定的数字
                  try:
                      a = int(sys.argv[1])
                      b = int(sys.argv[2])
                      numbers_to_send = [(a, b)]
                  except ValueError:
                      client_node.get_logger().error('参数必须是整数')
                      return
              else:
                  # 如果没有提供参数,生成随机测试数据
                  client_node.get_logger().info('未提供命令行参数,使用随机测试数据')
                  numbers_to_send = [
                      (random.randint(1, 100), random.randint(1, 100))
                      for _ in range(5)  # 发送5个随机请求
                  ]
              
              # 存储所有的Future对象
              futures = []
              
              # 发送所有请求
              for a, b in numbers_to_send:
                  future = client_node.send_request(a, b)
                  if future:
                      futures.append(future)
                  time.sleep(1)  # 每次请求间隔1秒
              
              client_node.get_logger().info('所有请求已发送,等待响应...')
              
              # 等待所有响应完成
              start_time = time.time()
              while futures and time.time() - start_time < 10:  # 最多等待10秒
                  rclpy.spin_once(client_node, timeout_sec=0.1)
                  
                  # 检查哪些请求已完成
                  completed_futures = []
                  for future in futures:
                      if future.done():
                          client_node.handle_response(future)
                          completed_futures.append(future)
                  
                  # 移除已完成的future
                  for future in completed_futures:
                      futures.remove(future)
              
              if futures:
                  client_node.get_logger().warning('有些请求超时未响应')
              
              client_node.get_logger().info('=== 客户端运行完成 ===')
              
          except KeyboardInterrupt:
              print('\n节点被用户中断')
          except Exception as e:
              print(f'节点运行出错: {e}')
          finally:
              if 'client_node' in locals():
                  client_node.destroy_node()
              rclpy.shutdown()
              print('=== 客户端节点已关闭 ===')
      
      if __name__ == '__main__':
          main()
      
      
    • 构建和运行
      # 进入工作空间根目录
      cd ~/ros2_learning_ws
      
      # 构建服务示例包
      colcon build --packages-select service_demo
      
      # 加载环境
      source install/setup.bash
      
      #终端1 - 启动服务端
      source ~/ros2_learning_ws/install/setup.bash
      ros2 run service_demo add_two_ints_server
      
      #终端2 - 启动客户端(使用随机数)
      source ~/ros2_learning_ws/install/setup.bash
      ros2 run service_demo add_two_ints_client
      
      #终端3 - 启动客户端(指定数字)
      source ~/ros2_learning_ws/install/setup.bash
      ros2 run service_demo add_two_ints_client 15 27
      
      
  • 自定义服务类型,需要时再查吧,别在这里卡太久

  • (3)动作(Action),不太用到,这里先过掉,边用边学即可。

8. Launch 文件

Launch文件是ROS2中用于自动启动多个节点的脚本。相当于一个"一键启动"脚本,可以同时启动整个机器人系统。ROS2 推荐使用 Python Launch 文件。

  • 创建 专门的Launch 包
cd ~/ros2_file/row2_learning_ws/src
ros2 pkg create my _launch --build-type ament_python --dependencies launch_ros
  • 目录结构
my _launch/
├── launch/                    # 存放Launch文件
│   ├── simple_launch.py
│   └── multi_nodes_launch.py
├── my _launch/
│   ├── __init__.py
├── setup.py
├── setup.cfg
└── package.xml

  • 示例 simple_launch.py
#!/usr/bin/env python3
"""
最简单的Launch文件示例
这个文件演示如何启动一个节点
"""

from launch import LaunchDescription
from launch_ros.actions import Node  # 修正导入语句

def generate_launch_description():
    """
    这个函数是Launch文件的入口点
    必须返回一个LaunchDescription对象
    """
    # 创建一个节点动作
    talker_node = Node(
        package='turtlesim',          # 节点所在的包名
        executable='turtlesim_node',  # 节点的可执行文件名
        output='screen'               # 输出到屏幕
    )

    # 创建Launch描述
    ld = LaunchDescription()
    
    # 将节点添加到Launch描述中
    ld.add_action(talker_node)
    
    return ld

  • Setup.py
from setuptools import setup
import os
from glob import glob

package_name = 'my_launch'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        # 安装package.xml
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        
        # 安装Launch文件 - 这是关键部分!
        (os.path.join('share', package_name, 'launch'), 
         glob(os.path.join('launch', '*.py'))),
        
        # 如果将来有配置文件,也可以这样安装
        # (os.path.join('share', package_name, 'config'), 
        #  glob(os.path.join('config', '*.yaml'))),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='your_name',
    maintainer_email='your_email@example.com',
    description='First launch file examples',
    license='Apache License 2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            # 如果没有可执行节点,这里可以为空
        ],
    },
)

  • package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypelocation="http://www.ros.org/schema/package_format3.xsd"?>
<package format="3">
  <name>my_launch</name>
  <version>0.0.0</version>
  <description>First launch file examples</description>
  <maintainer email="you@example.com">Your Name</maintainer>
  <license>Apache License 2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <!-- 添加这些依赖 -->
  <exec_depend>demo_nodes_cpp</exec_depend>
  <exec_depend>launch_ros</exec_depend>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

  • 运行
# 构建包
colcon build --packages-select my _launch

# 运行Launch文件
ros2 launch my_launch simple_launch.py

检查是否编译成功
在这里插入图片描述

9. 可视化工具:RQT 与 RViz2

  • RQT——图形化调试面板
sudo apt install ros-humble-rqt*
rqt
  • 常用插件:

    • rqt_graph:显示节点/话题关系图。
    • rqt_plot:实时绘制数据曲线。
    • rqt_console:日志查看。
    • rqt_reconfigure:动态参数调节。
  • RViz2——3D 可视化

sudo apt install ros-humble-rviz2
rviz2

核心 Display 类型:

RobotModel:显示 URDF 机器人模型。
TF:显示坐标变换树。
LaserScan:显示激光雷达数据。
Map:显示栅格地图。
Path:显示规划路径。
PointCloud2:显示点云。
Image: 显示图像

了解下大概即可,实战中用吧

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐