1. 系统整体架构

1.1 硬件架构

┌─────────────────────────────────────────────────────────────┐ │                     服务机器人硬件架构                          │ ├─────────────────────────────────────────────────────────────┤ │                                                             │ │  ┌─────────────────────────────────────────────────────┐   │ │  │                    计算单元                            │   │ │  │  ┌─────────┐  ┌─────────┐  ┌─────────┐              │   │ │  │  │ 主控制器 │  │  GPU   │  │  NPU   │              │   │ │  │  │ (ARM/x86)│  │ (AI推理)│  │ (加速) │              │   │ │  │  └─────────┘  └─────────┘  └─────────┘              │   │ │  └─────────────────────────────────────────────────────┘   │ │                                                             │ │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │ │  │   感知系统   │  │   运动系统   │  │   作业系统   │       │ │  │  ┌───────┐ │  │  ┌───────┐ │  │  ┌───────┐ │       │ │  │  │激光雷达│ │  │  │ 电机驱动│ │  │  │清洁模块│ │       │ │  │  │深度相机│ │  │  │ 里程计 │ │  │  │机械臂  │ │       │ │  │  │超声波  │ │  │  │ IMU   │ │  │  │舱体   │ │       │ │  │  │麦克风  │ │  │  │ 轮子  │ │  │  │传感器 │ │       │ │  │  └───────┘ │  │  └───────┘ │  │  └───────┘ │       │ │  └─────────────┘  └─────────────┘  └─────────────┘       │ │                                                             │ └─────────────────────────────────────────────────────────────┘

1.2 软件架构

# 机器人软件系统架构 class RobotSoftwareArchitecture:    """    基于ROS2的机器人软件架构    """        # 实时控制层    real_time = {        "motion_control": "运动控制 (50Hz+)",  # 电机控制、速度PID        "safety_monitor": "安全监控",  # 碰撞检测、紧急停止        "sensor_fusion": "传感器融合"  # IMU融合、里程计    }        # 应用层    application = {        "navigation": "导航系统",  # SLAM、路径规划        "perception": "感知系统",  # 目标检测、语义理解        "interaction": "交互系统",  # 语音、屏幕、灯光        "task": "任务系统"  # 业务逻辑、状态机    }        # 云服务层    cloud = {        "remote_monitoring": "远程监控",        "ota_update": "OTA升级",        "data_analytics": "数据分析",        "fleet_management": "多机调度"    }

2. 操作系统与中间件

2.1 ROS2架构

# ROS2 Galactic / Humble # 机器人软件栈 # 核心通信 ros2 run rclcpp_components component_container # 传感器驱动 ros2 run velodyne_driver velodyne_driver_node ros2 run realsense2_camera realsense2_camera_node # 导航栈 ros2 launch nav2_bringup navigation_launch.py # 感知 ros2 run perception_pkg yolo_node # 交互 ros2 run interaction_pkg asr_node ros2 run interaction_pkg tts_node

2.2 实时性配置

// robot_config.json {  "system": {    "os": "Ubuntu 22.04 + PREEMPT_RT",    "kernel_version": "5.15.0-rt",    "real_time_config": {      "cpu_affinity": [2, 3, 4, 5],      "nice_priority": -10,      "rt_runtime": 950000    }  },  "network": {    "ethernet": "1000Mbps",    "wifi": "WiFi6",    "5g": "optional"  } }

3. 感知系统

3.1 激光雷达驱动

class LaserScanner:    """    激光雷达驱动    """        def __init__(self, model='velodyne_vlp16'):        import velodyne_driver.pointcloudII as pc2                self.model = model        self.cloud_pub = rospy.Publisher(            '/velodyne_points',            PointCloud2,            queue_size=10        )                # 雷达参数        self.params = {            'vp16': {                'rpm': 600,                'angle_min': -pi,                'angle_max': pi,                'range_min': 0.5,                'range_max': 100.0            }        }        def process_scan(self, raw_data):        """        处理原始扫描数据        Returns:            scan: LaserScan消息        """        # 解析数据包        points = self.parse_packet(raw_data)                # 过滤无效点        valid_points = self.filter_points(points)                # 转换为激光扫描        scan = LaserScan()        scan.ranges = self.points_to_ranges(valid_points)        scan.angles = self.compute_angles(valid_points)        scan.header.stamp = rospy.Time.now()                return scan        def points_to_ranges(self, points):        """        将点云转换为距离数组        """        ranges = []                for angle in self.angle_increment:            # 找到对应角度的点            matching_points = [p for p in points                            if abs(p.angle - angle) < self.angle_increment/2]                        if matching_points:                min_range = min(p.range for p in matching_points)                ranges.append(min_range)            else:                ranges.append(float('inf'))                return ranges

3.2 深度相机驱动

class DepthCamera:    """    Intel RealSense深度相机驱动    """        def __init__(self, model='d455'):        import pyrealsense2 as rs                self.pipeline = rs.pipeline()        self.config = rs.config()                # 配置流        self.config.enable_stream(            rs.stream.depth,            640, 480,            rs.format.z16, 30        )        self.config.enable_stream(            rs.stream.color,            1280, 720,            rs.format.bgr8, 30        )        self.config.enable_stream(            rs.stream.infrared,            640, 480,            rs.format.y8, 30        )                # 启动pipeline        self.profile = self.pipeline.start(self.config)        def get_frames(self):        """        获取深度和RGB图像        Returns:            depth: 深度图像            color: RGB图像        """        frames = self.pipeline.wait_for_frames()                depth_frame = frames.get_depth_frame()        color_frame = frames.get_color_frame()                depth = np.asanyarray(depth_frame.get_data())        color = np.asanyarray(color_frame.get_data())                return depth, color        def depth_to_pointcloud(self, depth, intrinsics):        """        深度图转点云        """        points = []                for v in range(0, depth.shape[0], 2):            for u in range(0, depth.shape[1], 2):                d = depth[v, u]                if d == 0:                    continue                                # 反投影                x = (u - intrinsics.ppx) * d / intrinsics.fx                y = (v - intrinsics.ppy) * d / intrinsics.fy                z = d                                points.append([x, y, z])                return np.array(points, dtype=np.float32)

3.3 多传感器时间同步

class SensorSynchronizer:    """    多传感器时间同步    """        def __init__(self):        self.sensors = {}        self.buffers = {}  # 时间戳缓冲区        self.sync_tolerance = 0.05  # 50ms同步容差        def register_sensor(self, name, callback):        """        注册传感器        """        self.sensors[name] = callback        self.buffers[name] = []        def receive(self, sensor_name, timestamp, data):        """        接收传感器数据        """        self.buffers[sensor_name].append((timestamp, data))        self.try_sync()        def try_sync(self):        """        尝试同步多传感器数据        """        if not all(self.buffers[s].has_data()                   for s in self.sensors):            return                # 找到时间最接近的数据组        timestamps = [self.buffers[s][0][0]                     for s in self.sensors]                # 检查是否在容差范围内        min_ts = min(timestamps)        max_ts = max(timestamps)                if max_ts - min_ts < self.sync_tolerance:            # 同步成功            synced_data = {}            for s in self.sensors:                ts, data = self.buffers[s].pop(0)                synced_data[s] = data                        # 发布同步数据            self.publish(synced_data)

4. 导航系统

4.1 Cartographer SLAM配置

-- cartographer_for_service_robot.lua include "map_builder.lua" include "trajectory_builder.lua" options = {  map_builder = MAP_BUILDER,  trajectory_builder = TRAJECTORY_BUILDER,  map_frame = "map",  tracking_frame = "base_link",  published_frame = "base_link",  odom_frame = "odom",  provide_odom_frame = true,  use_odometry = true,  publish_frequency_hz = 10.0,  resolution = 0.05,  -- 5cm栅格分辨率 } MAP_BUILDER.use_trajectory_builder_2d = true TRAJECTORY_BUILDER_2D = {  min_range = 0.1,  max_range = 30.0,  missing_data_ray_length = 5.0,    -- 传感器配置  num_laser_filters = 3,    -- 子图参数  submaps = {    num_laser_filters = 3,    grid_type = "PROBABILITYGRID",        -- 占据栅格参数    range_data_inserter = {      hit_probability = 0.557,      miss_probability = 0.496,      insert_free_space = true,    },  },    -- 自适应蒙特卡洛定位  pure_localization = false,    -- 运动滤波器  motion_filter = {    max_distance_meters = 0.2,    max_angle_radians = math.rad(1.0),  }, } -- 扫描匹配参数 TRAJECTORY_BUILDER_2D.ceres_scan_matcher = {  occupied_space_weight = 2.0,  translation_weight = 10.0,  rotation_weight = 15.0,  esdf_server = {    voxel_size = 0.05,  }, } -- 全局闭环检测 POSE_GRAPH.optimization_problem.huber_scale = 1e2 POSE_GRAPH.constraint_builder.sampler = {  min_score = 0.55, } POSE_GRAPH.optimization_problem.local_slam_pose_translation_weight = 1e5 POSE_GRAPH.optimization_problem.local_slam_pose_rotation_weight = 1e5

4.2 MoveBase导航配置

# move_base_params.yaml move_base:  # 全局规划器  base_global_planner: "navfn/NavfnROS"    # 局部规划器  base_local_planner: "dwa_local_planner/DWAPlannerROS"    # 全局规划器参数  NavfnROS:    allow_unknown: true    planner_window_x: 0.0    planner_window_y: 0.0    default_tolerance: 0.0    use_dijkstra: true    # 局部规划器参数  DWAPlannerROS:    # 速度限制    max_vel_x: 0.8    max_vel_y: 0.0    max_vel_theta: 1.5    min_vel_x: 0.0    min_vel_theta: -1.5        # 加速度限制    acc_lim_x: 1.5    acc_lim_y: 0.0    acc_lim_theta: 2.5        # 预测时间参数    sim_time: 2.0    vx_samples: 20    vy_samples: 0    vtheta_samples: 40        # 评价函数权重    path_distance_bias: 32.0      # 路径跟踪权重    goal_distance_bias: 20.0     # 目标接近权重    occdist_bias: 0.01           # 障碍物躲避权重    forward_point_distance: 0.325        # 振荡检测    oscillation_reset_dist: 0.05        # 目标偏差容忍    xy_goal_tolerance: 0.15    yaw_goal_tolerance: 0.15        # 缩放速度    scale_vel_x: 0.625    scale_vel_theta: 0.625

4.3 多机调度系统

class FleetManager:    """    多机器人调度系统    """        def __init__(self):        self.robots = {}  # robot_id -> RobotInfo        self.tasks = {}   # task_id -> TaskInfo                # 调度器        self.scheduler = TaskScheduler()                # 通信        self.comms = FleetComms()                # 监控        self.monitor = FleetMonitor()        def add_robot(self, robot_id, robot_info):        """        添加机器人到车队        """        self.robots[robot_id] = {            'id': robot_id,            'pose': robot_info.pose,            'state': 'idle',            'battery': robot_info.battery,            'capabilities': robot_info.capabilities,            'last_update': time.time()        }                # 订阅机器人状态        self.comms.subscribe_robot_state(robot_id, self.on_robot_state)        def submit_task(self, task):        """        提交任务        """        task_id = self._generate_task_id()        self.tasks[task_id] = {            'id': task_id,            'type': task.type,            'start': task.start,            'end': task.end,            'priority': task.priority,            'requirements': task.requirements,            'status': 'pending',            'assigned_robot': None,            'created_at': time.time()        }                # 触发调度        self.schedule()                return task_id        def schedule(self):        """        任务调度        """        # 获取待调度任务        pending_tasks = [t for t in self.tasks.values()                        if t['status'] == 'pending']                # 按优先级排序        pending_tasks.sort(key=lambda x: (-x['priority'], x['created_at']))                for task in pending_tasks:            # 找到最优机器人            robot = self.scheduler.find_best_robot(task, self.robots)                        if robot:                # 分配任务                self.assign_task(task['id'], robot['id'])        def assign_task(self, task_id, robot_id):        """        分配任务给机器人        """        self.tasks[task_id]['status'] = 'assigned'        self.tasks[task_id]['assigned_robot'] = robot_id                # 发送任务给机器人        task = self.tasks[task_id]        self.comms.send_task(robot_id, task)                # 更新机器人状态        self.robots[robot_id]['state'] = 'busy'        def on_robot_state(self, robot_id, state):        """        机器人状态回调        """        if robot_id in self.robots:            self.robots[robot_id]['pose'] = state.pose            self.robots[robot_id]['battery'] = state.battery            self.robots[robot_id]['last_update'] = time.time()                        # 检查任务完成            if state.task_status == 'completed':                self.on_task_complete(state.completed_task_id)                        # 检查任务失败            elif state.task_status == 'failed':                self.on_task_failed(state.failed_task_id)        def on_task_complete(self, task_id):        """        任务完成处理        """        if task_id in self.tasks:            self.tasks[task_id]['status'] = 'completed'            self.tasks[task_id]['completed_at'] = time.time()                        robot_id = self.tasks[task_id]['assigned_robot']            self.robots[robot_id]['state'] = 'idle'                        # 触发下一轮调度            self.schedule()        def on_task_failed(self, task_id):        """        任务失败处理        """        if task_id in self.tasks:            self.tasks[task_id]['status'] = 'failed'            self.tasks[task_id]['failed_reason'] = 'robot_error'                        robot_id = self.tasks[task_id]['assigned_robot']            self.robots[robot_id]['state'] = 'error'                        # 重新调度            self.tasks[task_id]['status'] = 'pending'            self.schedule()

5. 交互系统

5.1 语音识别服务

class SpeechRecognition:    """    流式语音识别服务    """        def __init__(self):        # 麦克风配置        self.sample_rate = 16000        self.chunk_size = 1600  # 100ms                # ASR模型        self.asr_model = self._load_asr_model()                # VAD模型        self.vad = VADModel()        def recognize_stream(self, audio_stream):        """        流式识别        Yields:            text: 识别结果        """        buffer = []        vad_state = 'idle'                for chunk in audio_stream:            buffer.append(chunk)                        # VAD检测            speech_probs = self.vad.process(chunk)                        if vad_state == 'idle' and speech_probs > 0.5:                vad_state = 'speaking'                buffer_start = len(buffer) - self.vad_window                        elif vad_state == 'speaking' and speech_probs < 0.3:                vad_state = 'idle'                                # 获取语音段                speech_audio = buffer[buffer_start:]                                # 识别                text = self._recognize(''.join(speech_audio))                if text:                    yield text        def _recognize(self, audio_data):        """        语音识别        """        # 特征提取        feat = self.extract_features(audio_data)                # ASR推理        result = self.asr_model.decode(feat)                return result['text']

5.2 对话管理

class DialogueManager:    """    对话管理器    """        def __init__(self):        # 知识图谱        self.knowledge_graph = KnowledgeGraph()                # 意图识别        self.intent_classifier = IntentClassifier()                # 槽位提取        self.slot_extractor = SlotExtractor()                # 对话状态        self.dialogue_state = DialogueState()                # 自然语言生成        self.nlg = NaturalLanguageGenerator()        def process(self, user_input):        """        处理用户输入        Returns:            response: 系统回复        """        # 语义理解        nlu_result = self._understand(user_input)                # 更新对话状态        self.dialogue_state.update(nlu_result)                # 决策回复策略        response = self._decide_response(nlu_result)                return response        def _understand(self, text):        """        语义理解        """        # 意图识别        intent, confidence = self.intent_classifier.predict(text)                # 槽位提取        slots = self.slot_extractor.extract(text)                return {            'intent': intent,            'slots': slots,            'confidence': confidence,            'text': text        }        def _decide_response(self, nlu_result):        """        决策回复        """        intent = nlu_result['intent']        slots = nlu_result['slots']                # 检查是否满足意图要求        if self._intent_satisfied(intent, slots):            # 可以完成任务            result = self._execute_task(intent, slots)            response = self.nlg.generate_success(intent, result)        elif self._missing_required(intent, slots):            # 缺少必填信息            missing_slot = self._get_missing(intent, slots)            response = self.nlg.generate_ask(missing_slot)        else:            # 无法理解            response = self.nlg.generate_fallback()                return response        def _execute_task(self, intent, slots):        """        执行任务        """        if intent == 'guide':            location = slots.get('destination')            path = self._plan_path(location)            return {'path': path, 'location': location}                elif intent == 'query':            entity = slots.get('entity')            info = self.knowledge_graph.query(entity)            return {'info': info}                # ... 其他意图处理

6. 安全系统

6.1 安全监控模块

class SafetyMonitor:    """    安全监控系统    """        def __init__(self):        self.safety_state = 'normal'        self.emergency_stop_flag = False                # 安全参数        self.min_safe_distance = 0.5  # m        self.max_speed_normal = 1.0     # m/s        self.max_speed_caution = 0.5    # m/s        def check(self, sensor_data):        """        安全检查        """        # 检查障碍物        if self._obstacle_detected(sensor_data):            return self._handle_obstacle(sensor_data)                # 检查悬崖        if self._cliff_detected(sensor_data):            return self._handle_cliff(sensor_data)                # 检查碰撞        if self._collision_detected(sensor_data):            return self._handle_collision(sensor_data)                # 检查速度        if self._speed_exceeded(sensor_data):            return self._handle_speed_exceed(sensor_data)                return 'normal'        def _obstacle_detected(self, sensor_data):        """        检测障碍物        """        laser_ranges = sensor_data['laser_ranges']        min_range = min(laser_ranges)                return min_range < self.min_safe_distance        def _handle_obstacle(self, sensor_data):        """        障碍物处理        """        laser_ranges = sensor_data['laser_ranges']        min_idx = np.argmin(laser_ranges)                # 判断障碍物位置        angle = (min_idx - len(laser_ranges)/2) * 0.5 * pi / 180                if abs(angle) < 30:  # 正前方            return 'stop'        else:            return 'turn_away'  # 转向避开        def emergency_stop(self):        """        紧急停止        """        self.emergency_stop_flag = True        self.safety_state = 'emergency'                # 停止运动        self._stop_motion()                # 发出警报        self._sound_alarm()                # 上报后台        self._report_incident('emergency_stop')

7. 技术指标

| 模块 | 指标 | 要求值 | 测试方法 |

导航

定位精度

≤±2cm

100次测试

导航

路径成功率

≥99%

500次导航

感知

障碍检测率

≥99%

障碍物测试

交互

语音识别率

≥95%

1000句测试

交互

意图准确率

≥92%

1000句测试

安全

碰撞率

0次

10000小时运行

系统

可用性

≥99.5%

月度统计

文档版本: v1.0

文档版本: v1.0

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐