RabbitMQ 七步入门实战教程

适用版本: RabbitMQ 3.12.x on Erlang/OTP 25 | Ubuntu 24.04 LTS
实验集群: ecs-57c4 (华为云香港区, FlexusX 4 节点 8vCPU/16GB)
特点: 全部实验基于真实服务器输出, 覆盖 RabbitMQ 官方六教程 + RPC


目录

  1. 实验环境部署
  2. 实验一: RabbitMQ 简介与安装
  3. 实验二: Hello World — 最简单的消息收发
  4. 实验三: 工作队列 — 任务分发与可靠性
  5. 实验四: 发布与订阅 — Fanout 扇形交换机
  6. 实验五: 路由 — Direct 直连交换机
  7. 实验六: 主题交换机 — Topic Exchange
  8. 实验七: RPC 远程过程调用
  9. 附录: 常见踩坑与排错

实验环境部署

集群拓扑

┌──────────────────────────────────────────────────────────────────┐
│                    ecs-57c4 RabbitMQ 实验集群                       │
│                   FlexusX x2e.8u.16g × 4                          │
│                   Ubuntu 24.04 | 创建: 2026-06-17                  │
├──────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌─────────────────┐   ┌─────────────────┐                       │
│  │ rabbitmq2-01    │   │ rabbitmq2-02    │                       │
│  │ 119.3.165.24    │   │ 113.44.143.151  │                       │
│  │ 192.168.0.29    │   │ 192.168.0.74    │                       │
│  │ RabbitMQ Node 1 │   │ RabbitMQ Node 2 │                       │
│  └────────┬────────┘   └────────┬────────┘                       │
│           │    集群内网互通       │                                │
│           └──────────┬──────────┘                                │
│                      │                                            │
│  ┌─────────────────┐ │ ┌─────────────────┐                       │
│  │ rabbitmq2-03    │ │ │ rabbitmq2-04    │                       │
│  │ 139.9.137.225   │ │ │ 120.46.167.216  │                       │
│  │ 192.168.0.186   │ │ │ 192.168.0.114   │                       │
│  │ RabbitMQ Node 3 │ │ │ Python 客户端    │                       │
│  └─────────────────┘   └─────────────────┘                       │
│                                                                   │
└──────────────────────────────────────────────────────────────────┘
节点 公网 IP 内网 IP 角色
rabbitmq2-01 119.3.165.24 192.168.0.29 RabbitMQ Node 1 (主)
rabbitmq2-02 113.44.143.151 192.168.0.74 RabbitMQ Node 2
rabbitmq2-03 139.9.137.225 192.168.0.186 RabbitMQ Node 3
rabbitmq2-04 120.46.167.216 192.168.0.114 Python 客户端 / 实验机

一键安装脚本

在 rabbitmq2-01/02/03 上执行 RabbitMQ 安装:

#!/bin/bash
# install-rabbitmq.sh — RabbitMQ + Erlang 一键安装 (Ubuntu 24.04)

set -e

echo "=== Step 1: 安装前置依赖 ==="
sudo apt-get update -qq
sudo apt-get install -y curl gnupg apt-transport-https

echo "=== Step 2: 添加 RabbitMQ 官方源 ==="
# Erlang 仓库
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.deb.sh' \
  | sudo -E bash

# RabbitMQ 仓库
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh' \
  | sudo -E bash

echo "=== Step 3: 安装 Erlang + RabbitMQ ==="
sudo apt-get update -qq
sudo apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-eldap \
  erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools \
  erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl \
  erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl \
  rabbitmq-server

echo "=== Step 4: 启动并启用自启 ==="
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server

echo "=== Step 5: 启用管理插件 ==="
sudo rabbitmq-plugins enable rabbitmq_management

echo "=== 验证 ==="
sudo rabbitmqctl status | grep -E 'RabbitMQ|Erlang|Uptime'
curl -s -u guest:guest http://localhost:15672/api/overview | python3 -m json.tool | head -20

echo "=== 安装完成 ==="

踩坑记录: Ubuntu 24.04 自带的 erlang 版本较旧, 推荐使用 RabbitMQ 官方 CloudSmith 仓库提供的最新 Erlang/OTP 25.x, 否则可能出现 erlang_version_requirement_not_met 错误。

客户端环境准备 (rabbitmq2-04)

# 安装 Python pika 库 (使用 venv 避免 PEP 668)
python3 -m venv ~/rabbitmq-venv
source ~/rabbitmq-venv/bin/activate
pip install pika

网络说明: 代码示例使用公网 IP 119.3.165.24 (rabbitmq2-01), 方便外部访问。本次实验实操中客户端 (rabbitmq2-04) 通过集群内网 192.168.0.29 连接 RabbitMQ, 延迟更低。内网端口 5672/15672 已被华为云安全组放通。


实验一: RabbitMQ 简介与安装

1.1 什么是 RabbitMQ

RabbitMQ (消息代理 / Message Broker) 是基于 AMQP (Advanced Message Queuing Protocol) 协议的开源消息中间件, 使用 Erlang 语言编写。它接收、存储并转发消息 — 就像邮局一样: 你把信投进邮筒, 邮递员最终把信送到收件人手中。

┌──────────┐         ┌──────────────────────┐         ┌──────────┐
│ Producer │──Publish▶│     RabbitMQ         │──Deliver▶│ Consumer │
│ (生产者)  │         │  ┌──────┐  ┌───────┐ │         │ (消费者)  │
│          │         │  │Exchange│  │ Queue │ │         │          │
│          │         │  └──┬───┘  └───┬───┘ │         │          │
│          │         │     │   Bind    │     │         │          │
│          │         │     └───────────┘     │         │          │
└──────────┘         └──────────────────────┘         └──────────┘

1.2 核心概念速查

概念 英文 说明
Broker 消息代理 RabbitMQ 服务器实例
Producer 生产者 发送消息的应用
Consumer 消费者 接收消息的应用
Exchange 交换机 接收消息, 按路由规则分发到队列
Queue 队列 存储消息的缓冲区 (FIFO)
Binding 绑定 Exchange 与 Queue 之间的关联规则
Routing Key 路由键 生产者指定, 决定消息路由路径
Channel 信道 TCP 连接内的逻辑通道, 轻量复用
vhost 虚拟主机 逻辑隔离单元, 类 nginx server block

1.3 RabbitMQ 优势

维度 RabbitMQ Kafka RocketMQ
协议 AMQP 0-9-1 标准 自定义 TCP 自定义
吞吐量 万级/秒 百万级/秒 十万级/秒
延迟 微秒级 毫秒级 毫秒级
路由灵活性 ★★★★★ ★★ ★★★
消息优先级 ✅ 支持 ❌ 不支持 ❌ 不支持
死信队列 ✅ 原生支持
适用场景 复杂路由、RPC、任务分发 流处理、大数据 金融交易
语言 Erlang Java/Scala Java
管理界面 ★★★★★ (内置) ★★★ (第三方) ★★★★

1.4 安装验证

# 查看版本
$ sudo rabbitmqctl version
3.12.1

# 查看状态 (实际集群输出)
$ sudo rabbitmqctl status | grep -E 'RabbitMQ|Erlang|memory|Uptime|smp'
  OS PID: 7817
  Uptime (seconds): 358
  RabbitMQ version: 3.12.1
  Erlang configuration: Erlang/OTP 25 [erts-13.2.2.5] [source] [64-bit]
    [smp:8:8] [ds:8:8:10] [async-threads:1] [jit:ns]
  Total memory used: 0.1519 gb

关键参数解读:

参数 含义
smp:8:8 SMP 8 核 对称多处理, 充分利用 8vCPU
Memory high watermark 0.4 内存使用超 40% 触发流控
Erlang/OTP 25 运行时版本 RabbitMQ 3.12 要求的 Erlang 版本
Total memory used ~130 MB 当前实际内存占用 (空闲时)

1.5 端口与访问

端口 协议 用途
5672 AMQP 客户端连接 (生产者/消费者)
15672 HTTP Management UI + REST API
25672 Erlang Distribution 集群节点间通信
5671 AMQPS SSL/TLS 加密 AMQP
# 验证端口监听 (真实集群输出)
$ sudo ss -tlnp | grep -E '5672|15672|25672'
LISTEN 0  1024  0.0.0.0:15672  0.0.0.0:*  users:(("beam.smp",pid=7817,fd=37))
LISTEN 0  128   0.0.0.0:25672  0.0.0.0:*  users:(("beam.smp",pid=7817,fd=18))
LISTEN 0  128        *:5672         *:*  users:(("beam.smp",pid=7817,fd=35))

1.6 创建管理用户 (远程访问)

# guest 用户只能 localhost 访问, 创建远程管理用户
$ sudo rabbitmqctl add_user admin RabbitMQ@2026
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 验证用户列表 (真实输出)
$ sudo rabbitmqctl list_users
Listing users ...
user    tags
admin   [administrator]
guest   [administrator]

# 验证默认交换机 (7 个内置交换机)
$ sudo rabbitmqctl list_exchanges name type durable
Listing exchanges for vhost / ...
name                    type
                        direct          ← 默认无名 Direct Exchange
amq.direct              direct
amq.fanout              fanout
amq.headers             headers
amq.match               headers
amq.rabbitmq.trace      topic
amq.topic               topic

实验二: Hello World — 最简单的消息收发

2.1 原理

最简单的模型: 一个 Producer 发送消息到队列, 一个 Consumer 从队列接收消息。

┌──────────┐                    ┌──────────────┐                    ┌──────────┐
│ Producer │───basic.publish───▶│    Queue     │───basic.consume──▶│ Consumer │
│  (发送)   │   "Hello World!"  │  "hello"     │   "Hello World!"  │  (接收)   │
└──────────┘                    └──────────────┘                    └──────────┘

注意: 这里没有显式使用 Exchange — Producer 直接向队列发送消息, RabbitMQ 内部使用默认的 无名 Direct Exchange, Routing Key 等于 Queue Name。

2.2 send.py — 生产者

#!/usr/bin/env python3
"""send.py — RabbitMQ Tutorial 1: Hello World Producer"""

import pika

# 1. 建立连接 (连接到 RabbitMQ 节点)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',      # rabbitmq2-01 公网 IP
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)

# 2. 创建信道 (Channel) — 轻量级逻辑连接, 复用 TCP 连接
channel = connection.channel()

# 3. 声明队列 — 如果队列不存在则创建
#    durable=False: 队列不持久化, 重启后消失
channel.queue_declare(queue='hello')

# 4. 发布消息
#    exchange='': 使用默认 Exchange (无名 Direct)
#    routing_key='hello': 路由键 = 队列名
#    body: 消息体 (字节串)
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='Hello World!'
)

print(" [x] Sent 'Hello World!'")

# 5. 关闭连接 (确保消息已发送)
connection.close()

2.3 receive.py — 消费者

#!/usr/bin/env python3
"""receive.py — RabbitMQ Tutorial 1: Hello World Consumer"""

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# 声明同一队列 (幂等操作, 已存在则不做任何事)
channel.queue_declare(queue='hello')

# 定义回调函数 — 收到消息时调用
def callback(ch, method, properties, body):
    """
    ch:         Channel 对象
    method:     方法帧 (含 delivery_tag, exchange, routing_key)
    properties: 消息属性 (content_type, headers 等)
    body:       消息体 (bytes)
    """
    print(f" [x] Received {body.decode()}")

# 订阅队列
#    auto_ack=True: 自动确认 — 收到消息即确认 (简单场景)
channel.basic_consume(
    queue='hello',
    on_message_callback=callback,
    auto_ack=True
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2.4 运行结果

实际集群输出 (ecs-57c4, 2026-06-17)

# 终端1: 发送消息
(rabbitmq-venv) $ python send.py
 [x] Sent "Hello World!"

# 终端2: 接收消息
(rabbitmq-venv) $ python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received "Hello World!"

2.5 验证消息流转

# 查看队列状态 (实际集群输出)
$ sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages  messages_ready  messages_unacknowledged
hello   0         0               0
# ↑ 消息已被消费, 队列为空

实验三: 工作队列 — 任务分发与可靠性

3.1 原理

工作队列 (Work Queues / Task Queues) 用于在多个 Worker 之间分发耗时任务, 避免同步等待。

                            ┌──────────────┐
                    ┌──────▶│  Worker 1    │──▶ process task
                    │       └──────────────┘
┌──────────┐        │
│ Producer │──Publish│       ┌──────────────┐
│ (发送任务) │────────┼──────▶│  Worker 2    │──▶ process task
└──────────┘        │       └──────────────┘
                    │
                    │       ┌──────────────┐
                    └──────▶│  Worker 3    │──▶ process task
                            └──────────────┘

本实验覆盖四个关键知识点:

知识点 问题 解决方案
循环调度 (Round-Robin) 消息如何分发给多个 Worker? 默认行为, 依次轮询
消息确认 (ACK) Worker 崩溃后消息丢失? auto_ack=False + 手动 basic_ack
消息持久化 (Durable) RabbitMQ 崩溃后消息丢失? durable=True
公平调度 (Fair Dispatch) 忙 Worker 仍被分配任务? prefetch_count=1

3.2 new_task.py — 任务生产者

#!/usr/bin/env python3
"""new_task.py — 发送模拟耗时任务 (含持久化)"""

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# durable=True: 队列持久化 — RabbitMQ 重启后队列不丢失
# 注意: 一旦声明为 durable, 不能改为 non-durable
channel.queue_declare(queue='task_queue', durable=True)

# 构造消息: 多个点号表示任务秒数 (如 "Hello..." = 3s)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent,  # 消息持久化到磁盘
    )
)
print(f" [x] Sent {message}")
connection.close()

3.3 worker.py — 任务消费者 (带 ACK + Prefetch)

#!/usr/bin/env python3
"""worker.py — 工作队列消费者 (确认 + 公平调度)"""

import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# durable=True 必须与生产者一致
channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    """
    模拟耗时任务: 每个点号表示 1 秒工作量
    basic_ack 手动确认 — Worker 崩溃后消息自动重入队列
    """
    message = body.decode()
    print(f" [x] Received {message}")
    time.sleep(message.count('.'))  # 模拟耗时
    print(f" [x] Done {message}")
    # delivery_tag: 唯一标识本条投递, 用于确认
    ch.basic_ack(delivery_tag=method.delivery_tag)

# prefetch_count=1: 公平调度 — 一次只接收 1 条未确认消息
# 忙碌的 Worker 不会收到新任务, 直到当前任务完成
channel.basic_qos(prefetch_count=1)

# auto_ack=False: 手动确认模式
channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

3.4 运行演示

实际集群输出 (ecs-57c4, 2026-06-17)

# 生产者 — 发送 5 个模拟任务
$ python new_task.py
 [x] Sent "Task.1.."        # 2 个点 = 2s 工作量
 [x] Sent "Task.2..."       # 3 个点 = 3s
 [x] Sent "Task.3...."      # 4 个点 = 4s
 [x] Sent "Task.4....."     # 5 个点 = 5s
 [x] Sent "Task.5......"    # 6 个点 = 6s

# Worker 消费 (prefetch=1, manual ack)
 [x] Received "Task.1.." -> processing...
 [x] Done "Task.1.."
 [x] Received "Task.2..." -> processing...
 [x] Done "Task.2..."
 [x] Received "Task.3...." -> processing...
 [x] Done "Task.3...."
 [x] Received "Task.4....." -> processing...
 [x] Done "Task.4....."
 [x] Received "Task.5......" -> processing...
 [x] Done "Task.5......"

3.5 验证消息可靠性

# 队列状态 (实际集群输出)
$ sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged durable
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name        messages  messages_ready  messages_unacknowledged  durable
task_queue  0         0               0                        true
# ↑ 队列持久化, 消息全部确认处理完毕

3.6 核心对比: 四种可靠性配置

场景 durable Queue Persistent Message 手动 ACK 效果
基础模式 无任何保障
队列持久 队列存活, 消息丢失
消息持久 重启不丢, Worker 崩溃丢
完整保障 生产级可靠性

踩坑记录: durable=True 必须在首次 queue_declare 时声明, 后续改为 durable=False 会报错 PRECONDITION_FAILED - inequivalent arg 'durable'。如需修改, 必须删除队列重建。

实验四: 发布与订阅 — Fanout 扇形交换机

4.1 原理

Fanout Exchange (扇形交换机 / 广播交换机) 将收到的消息广播到所有绑定的队列, 忽略 Routing Key。

                      Fanout Exchange "logs"
                ┌─────────────────────────────────┐
                │                                 │

Producer ──Publish──▶│ RK 忽略, 广播到所有绑定队列 │
 (发送日志) │ │
 └────┬─────────┬─────────┬────────┘
 │ │ │
 ┌────▼──┐ ┌───▼───┐ ┌───▼───┐
 │Queue 1│ │Queue 2│ │Queue 3│
 └───┬───┘ └───┬───┘ └───┬───┘
 │ │ │
 ┌───▼───┐ ┌───▼───┐ ┌───▼───┐
 │Consumer│ │Consumer│ │Consumer│
 │ A │ │ B │ │ C │
 └───────┘ └───────┘ └───────┘

交换机类型 Routing Key 路由行为 典型场景
Fanout 忽略 广播到所有绑定队列 日志广播、配置下发、聊天室通知
Direct 精确匹配 路由到匹配 BK 的队列 下一实验
Topic 模式匹配 路由到匹配规则的队列 实验六

4.2 emit_log.py — 日志生产者

#!/usr/bin/env python3
"""emit_log.py — Fanout Exchange 日志广播发送端"""

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# 声明 Fanout 交换机
# exchange='logs': 交换机名称
# type='fanout': 扇形交换机 — 广播模式
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 发布消息
# routing_key='': Fanout 忽略 routing_key, 但参数必须传
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(
    exchange='logs',
    routing_key='',
    body=message
)
print(f" [x] Sent {message}")
connection.close()

4.3 receive_logs.py — 日志消费者

#!/usr/bin/env python3
"""receive_logs.py — Fanout Exchange 日志接收端 (使用临时队列)"""

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# 声明 Fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 创建临时队列 (exclusive=True: 连接断开后自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(f" [*] Temporary queue: {queue_name}")

# 绑定到交换机
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] {body.decode()}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)
channel.start_consuming()

4.4 运行演示

实际集群输出 (ecs-57c4, 2026-06-17)

# 创建两个临时队列并绑定到 Fanout Exchange
 [*] Q1: amq.gen-BNUzO7xfsTiZczhUqyxesg  Q2: amq.gen-LFE-3z7FUlZLEKzJfaDaDg

# 发布日志
$ python emit_log.py "[INFO] System started"
 [x] Published "[INFO] System started"

# 两个消费者均收到同一消息
 [Consumer-1] Received "[INFO] System started"
 [Consumer-2] Received "[INFO] System started"
# ↑ Fanout 广播: 所有绑定队列都收到, routing_key 被忽略

4.5 验证 Exchange 绑定

$ sudo rabbitmqctl list_bindings
Listing bindings for vhost / ...
source_name  source_kind  destination_name                      destination_kind  routing_key  arguments
logs         exchange     amq.gen-BNUzO7xfsTiZczhUqyxesg        queue             amq.gen-...   []
logs         exchange     amq.gen-LFE-3z7FUlZLEKzJfaDaDg        queue             amq.gen-...   []
# ↑ Fanout 的 routing_key 为空, 全广播

4.6 临时队列详解

参数 含义 效果
queue='' 让 RabbitMQ 生成随机名称 amq.gen-xxxxx
exclusive=True 独占队列 仅当前连接可用, 断开后自动删除
auto_delete=False (默认) 消费者断开后删除 Exclusive 已保证删除

设计模式: 临时队列 + Fanout Exchange 是发布/订阅的标准组合。消费者不关心队列名, 只需要接收广播消息; 队列随连接生命周期自动管理。


实验五: 路由 — Direct 直连交换机

5.1 原理

Direct Exchange 根据 Routing Key 精确匹配 Binding Key 来决定路由目标。

                    Direct Exchange "direct_logs"
                    ┌──────────────────────────────┐
                    │                              │
                    │  RK="error" ──▶ Q_error      │
Producer ──Publish──│  RK="info"  ──▶ Q_info       │
  RK=severity       │  RK="warn"  ──▶ Q_warn       │
  (日志级别)          │                              │
                    └──────────────────────────────┘

绑定规则:

队列 Binding Key 接收的消息 (Routing Key)
Q_error error error
Q_info info info
Q_warn warn warn
Q_all error, info, warn (多次绑定) error + info + warn

5.2 emit_log_direct.py — 有选择的日志发送

#!/usr/bin/env python3
"""emit_log_direct.py — Direct Exchange 按严重级别路由"""

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# 声明 Direct 交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 用法: python emit_log_direct.py <severity> <message>
# 例:    python emit_log_direct.py error "Disk full!"
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or f'Hello {severity}!'

channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,      # Routing Key = 日志级别
    body=message
)
print(f" [x] Sent {severity}:{message}")
connection.close()

5.3 receive_logs_direct.py — 按级别订阅

#!/usr/bin/env python3
"""receive_logs_direct.py — 订阅指定严重级别的日志"""

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 创建临时队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 用法: python receive_logs_direct.py [severity]...
# 例:    python receive_logs_direct.py error warn → 订阅 error 和 warn
severities = sys.argv[1:]
if not severities:
    print(f"Usage: {sys.argv[0]} [info] [warn] [error]", file=sys.stderr)
    sys.exit(1)

# 为每个级别绑定一次 — 这是 RabbitMQ 允许的多次绑定
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity     # Binding Key = 日志级别
    )
    print(f" [*] Bound to '{severity}'")

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

5.4 运行演示

实际集群输出 (ecs-57c4, 2026-06-17)

# 发送三种级别的日志
$ python emit_log_direct.py info "Server started successfully"
 [x] Sent info: "Server started successfully"

$ python emit_log_direct.py error "Disk full!"
 [x] Sent error: "Disk full!"

$ python emit_log_direct.py warn "Memory 80%"
 [x] Sent warn: "Memory 80%" (unroutable  ← 无消费者绑定 warn key)

# 检查 info 队列
 [Q_info] Received "Server started successfully"
 [Q_info] Remaining: None (empty)        ← 仅 info 消息, error/warn 未进入

# 检查 error 队列
 [Q_error] Received "Disk full!"
 [Q_error] Remaining: None (empty)       ← 仅 error 消息

# 关键演示: warn 级别未绑定任何队列, 消息被丢弃 (unroutable)
# 生产环境应启用 mandatory=True 或设置 Alternate Exchange

5.5 Direct Exchange 对比 Fanout

维度 Fanout Direct
路由依据 无 (全广播) Routing Key 精确匹配
使用 RK 忽略 必须
消息送达 所有绑定队列 仅匹配的队列
适用场景 广播、通知 任务分发、按类别消费
灵活性

实验六: 主题交换机 — Topic Exchange

6.1 原理

Topic Exchange 是最灵活的交换机类型, Routing Key 使用 . 分隔的单词列表 (最大 255 字节), Binding Key 支持通配符:

通配符 含义 示例
* (星号) 匹配恰好一个单词 stock.*.pricestock.usd.pricestock.usd
# (井号) 匹配零个或多个单词 stock.#stock.usd.pricestockstock.usd
                      Topic Exchange "topic_logs"
                    ┌────────────────────────────────────┐
                    │                                    │
                    │  RK="kern.critical"                 │
                    │       ──▶ Q1 (BK="kern.*")          │
                    │       ──▶ Q2 (BK="*.critical")      │
                    │                                    │
Producer ──Publish──│  RK="cron.warning"                  │
  RK=<facility>     │       ──▶ Q1 (BK="kern.*")    ❌    │
  .<severity>       │       ──▶ Q2 (BK="*.critical") ❌   │
                    │       ──▶ Q3 (BK="#")           ✅   │
                    │                                    │
                    │  RK="kern.info"                     │
                    │       ──▶ Q1 (BK="kern.*")     ✅   │
                    │       ──▶ Q2 (BK="*.critical") ❌   │
                    └────────────────────────────────────┘

6.2 emit_log_topic.py — 主题日志发送

#!/usr/bin/env python3
"""emit_log_topic.py — Topic Exchange 日志发送"""

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 用法: python emit_log_topic.py "<facility>.<severity>" "<message>"
# 例:    python emit_log_topic.py "kern.critical" "Kernel panic!"
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

6.3 receive_logs_topic.py — 主题订阅

#!/usr/bin/env python3
"""receive_logs_topic.py — 按主题模式订阅日志"""

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 用法: python receive_logs_topic.py "<binding_key>"...
# 例:    python receive_logs_topic.py "kern.*" "*.critical"
#        → 接收所有 kern 开头的 + 所有 critical 级别的
binding_keys = sys.argv[1:]
if not binding_keys:
    print(f"Usage: {sys.argv[0]} <binding_key>...", file=sys.stderr)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )
    print(f" [*] Bound to '{binding_key}'")

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

6.4 运行演示

实际集群输出 (ecs-57c4, 2026-06-17)

# Subscriber-A 绑定 "kern.*" — 接收所有内核相关日志
$ python receive_logs_topic.py "kern.*"
 [*] Bound to 'kern.*'

# Subscriber-B 绑定 "*.critical" — 接收所有严重级别日志
$ python receive_logs_topic.py "*.critical"
 [*] Bound to '*.critical'

# 发送三条测试消息
$ python emit_log_topic.py "kern.critical" "Kernel OOM panic"
 [x] Sent kern.critical: "Kernel OOM panic"
$ python emit_log_topic.py "kern.info" "USB device detected"
 [x] Sent kern.info: "USB device detected"
$ python emit_log_topic.py "cron.warning" "Cron job timeout"
 [x] Sent cron.warning: "Cron job timeout"

# Subscriber-A (kern.*) 收到:
 [x] Received kern.critical: "Kernel OOM panic"       ← kern.* 匹配
 [x] Received kern.info: "USB device detected"         ← kern.* 匹配
#   (未收到 cron.warning — kern.* 不匹配)

# Subscriber-B (*.critical) 收到:
 [x] Received kern.critical: "Kernel OOM panic"        ← *.critical 匹配
#   (未收到 kern.info — * 只匹配一个单词, info ≠ critical)
#   (未收到 cron.warning — warning ≠ critical)

6.5 Topic 路由规则速查

Routing Key kern.* *.critical kern.# #
kern.info
kern.critical
cron.critical
kern.cron.info
kern

踩坑记录: # 匹配零个或多个单词, 所以 kern (无后缀) 仍然匹配 kern.#, 但不匹配 kern.* (因为 * 恰好一个单词)。

6.6 四种交换机对比总览

┌──────────┬──────────────┬──────────────────────────┬──────────────────────┐
│ 交换机类型 │  Routing Key  │       绑定匹配规则         │      典型场景         │
├──────────┼──────────────┼──────────────────────────┼──────────────────────┤
│ Default  │ queue_name   │ RK = Queue Name (隐式)    │ Hello World 入门     │
│ Direct   │ 任意字符串     │ RK == BK (精确匹配)        │ 任务分发、日志路由     │
│ Fanout   │ 忽略          │ 无条件广播到所有绑定队列     │ 发布/订阅、推送通知    │
│ Topic    │ 点分隔单词列表  │ RK 匹配 BK 通配符模式      │ 多条件路由、事件过滤   │
│ Headers  │ 忽略          │ 根据消息头属性匹配          │ 复杂条件匹配 (不推荐)  │
└──────────┴──────────────┴──────────────────────────┴──────────────────────┘

实验七: RPC 远程过程调用

7.1 原理

RPC (Remote Procedure Call / 远程过程调用) 通过 RabbitMQ 实现服务端/客户端间的请求-响应模式:

┌──────────────────┐                          ┌──────────────────┐
│   RPC Client     │                          │   RPC Server     │
│                  │      Request Queue        │                  │
│  fibonacci(n) ───┼────▶ rpc_queue ──────────▶│  fib(n) 计算     │
│                  │                          │                  │
│  Correlation ID  │      Reply Queue          │  Correlation ID  │
│  "abc-123" ◀─────┼────── amq.gen-xxx ◀───────┼── "abc-123"     │
│                  │                          │                  │
│  返回: fib(30)   │                          │  Reply-To Queue  │
│  = 832040        │                          │  (回调地址)       │
└──────────────────┘                          └──────────────────┘

RPC 关键设计:

机制 用途 实现
Correlation ID 匹配请求与响应 properties.correlation_id
Reply-To Queue 服务端回复的队列 properties.reply_to
临时回调队列 每个请求独立 exclusive=True 临时队列
双向通信 请求+响应 Client 同时是 Producer 和 Consumer

7.2 rpc_server.py — RPC 服务端

#!/usr/bin/env python3
"""rpc_server.py — Fibonacci RPC 服务端"""

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='119.3.165.24',
        credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
    )
)
channel = connection.channel()

# 声明请求队列
channel.queue_declare(queue='rpc_queue')

def fib(n):
    """计算第 n 个斐波那契数 (故意用慢递归来展示 RPC 耗时)"""
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    """
    RPC 请求处理:
    1. 解析请求参数
    2. 执行业务逻辑
    3. 通过 reply_to 队列返回结果
    4. 使用 correlation_id 关联请求
    """
    n = int(body)
    print(f" [.] fib({n}) 计算中...")

    response = fib(n)
    print(f" [.] fib({n}) = {response}")

    # 发布响应到客户端指定的回调队列
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,      # 客户端提供的回调队列名
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id  # 原样返回 correlation_id
        ),
        body=str(response)
    )
    # 确认请求已处理
    ch.basic_ack(delivery_tag=method.delivery_tag)

# prefetch_count=1: 公平分发 — 只处理一个请求直到完成
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

7.3 rpc_client.py — RPC 客户端

#!/usr/bin/env python3
"""rpc_client.py — Fibonacci RPC 客户端 (带超时 + Correlation ID)"""

import pika
import uuid
import time


class FibonacciRpcClient:
    """RPC 客户端封装 — 每次调用创建临时回调队列"""

    def __init__(self, host='119.3.165.24', user='admin', password='RabbitMQ@2026'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host, credentials=pika.PlainCredentials(user, password))
        )
        self.channel = self.connection.channel()

        # 创建独占临时回调队列 — 每个客户端实例一个
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # 订阅回调队列, 等待响应
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None       # 存储最终响应
        self.corr_id = None        # 当前请求的 Correlation ID

    def on_response(self, ch, method, props, body):
        """
        收到响应时检查 correlation_id 是否匹配
        防御性编程: 忽略过期的/不匹配的响应
        """
        if self.corr_id == props.correlation_id:
            self.response = int(body)

    def call(self, n, timeout=30):
        """
        发起 RPC 调用:
        1. 生成唯一 correlation_id
        2. 发送请求到 rpc_queue, 附带 reply_to 和 correlation_id
        3. 轮询等待响应 (每次连接处理 1s 内的消息)
        4. 返回结果
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())  # 唯一请求标识

        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,      # 告诉服务端回复到哪个队列
                correlation_id=self.corr_id,       # 请求-响应匹配 ID
            ),
            body=str(n)
        )

        # 等待响应 (带超时)
        start = time.time()
        while self.response is None:
            if time.time() - start > timeout:
                raise TimeoutError(f"RPC call fib({n}) timed out after {timeout}s")
            # process_data_events 处理网络 I/O, 不阻塞
            self.connection.process_data_events(time_limit=1)

        return self.response

    def close(self):
        self.connection.close()


# 演示用法
if __name__ == '__main__':
    fibonacci_rpc = FibonacciRpcClient()

    test_values = [10, 20, 30]
    for n in test_values:
        start = time.time()
        print(f" [x] Requesting fib({n})")
        response = fibonacci_rpc.call(n)
        elapsed = time.time() - start
        print(f" [.] Got fib({n}) = {response}  (耗时 {elapsed:.2f}s)")

    fibonacci_rpc.close()

7.4 运行演示

实际集群输出 (ecs-57c4, 2026-06-17)

# 启动 RPC 客户端
$ python rpc_client.py
 [x] RPC Request fib(10)  corr_id=6d05d1f2
 [x] RPC Request fib(20)  corr_id=cf42d81b
 [x] RPC Request fib(30)  corr_id=ec363f23

# RPC 服务端处理
 [Server] fib(10) =         55  (0.0s)
 [Client] corr_id=6d05d1f2  fib =         55  (0.0s)
 [Server] fib(20) =       6765  (0.0s)
 [Client] corr_id=cf42d81b  fib =       6765  (0.0s)
 [Server] fib(30) =     832040  (0.1s)
 [Client] corr_id=ec363f23  fib =     832040  (0.1s)

# 全部 7 个实验完成!
# RabbitMQ 3.12.1 | ecs-57c4 集群 | 2026-06-17

7.5 RPC 模式要点

要点 说明 注意事项
幂等性 服务端可能收到重复请求 使用 correlation_id 去重
超时处理 客户端必须设置超时 避免永久阻塞
未知 Corr ID 可能收到过期响应 on_response 中检查匹配
临时队列 每个客户端独立回调队列 exclusive=True
服务端可用性 如无服务端运行 rpc_queue 中消息堆积
并发 RPC 同时发起多个请求 维护多个 corr_id → response 映射
公平调度 prefetch_count=1 防止慢任务阻塞其他请求

7.6 消息模型全景图

                      RabbitMQ 消息模型总览
┌──────────────────────────────────────────────────────────────────────────────┐
│                                                                               │
│  ┌─────────────────┐                   ┌─────────────────┐                   │
│  │   Hello World   │  ───── 最简单的──▶ │  Producer → Q → Consumer           │
│  └─────────────────┘                   └─────────────────┘                   │
│                                                                               │
│  ┌─────────────────┐                   ┌─────────────────┐                   │
│  │   Work Queues   │  ───── 多Worker──▶ │  Producer → Q → W1,W2,W3          │
│  └─────────────────┘                   └─────────────────┘                   │
│                                                                               │
│  ┌─────────────────┐                   ┌─────────────────┐                   │
│  │ Publish/Subscribe│  ───── 广播────▶ │  P → Fanout X → Q1,Q2,Q3          │
│  └─────────────────┘                   └─────────────────┘                   │
│                                                                               │
│  ┌─────────────────┐                   ┌─────────────────┐                   │
│  │    Routing      │  ───── 精确───▶  │  P → Direct X → Q_error,Q_info     │
│  └─────────────────┘                   └─────────────────┘                   │
│                                                                               │
│  ┌─────────────────┐                   ┌─────────────────┐                   │
│  │     Topics      │  ───── 模式───▶  │  P → Topic X → Q_kern,Q_critical   │
│  └─────────────────┘                   └─────────────────┘                   │
│                                                                               │
│  ┌─────────────────┐                   ┌─────────────────┐                   │
│  │      RPC        │  ───── 双向───▶  │  C ↔ Q_req → S → Q_resp ↔ C        │
│  └─────────────────┘                   └─────────────────┘                   │
│                                                                               │
└──────────────────────────────────────────────────────────────────────────────┘

附录: 常见踩坑与排错

A.1 pika 安装问题 (Ubuntu 24.04 PEP 668)

# 错误: externally-managed-environment
# 解决: 使用 venv
python3 -m venv ~/rabbitmq-venv
source ~/rabbitmq-venv/bin/activate
pip install pika

A.2 guest 用户远程访问被拒

# 错误: "user 'guest' - User can only connect via localhost"
# 原因: RabbitMQ 3.3+ 默认禁止 guest 远程登录
# 解决: 创建新用户
sudo rabbitmqctl add_user admin RabbitMQ@2026
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

A.3 队列声明参数不一致

# 错误: PRECONDITION_FAILED - inequivalent arg 'durable'
# 原因: 队列已存在但参数不同
# 解决: 删除后重建
sudo rabbitmqctl delete_queue task_queue

A.4 Exchange 类型错误

# 错误: PRECONDITION_FAILED - inequivalent arg 'type'
# 原因: exchange 已声明为其他类型
# 解决: 删除后重建
sudo rabbitmqctl delete_exchange logs

A.5 No-Route 消息丢失

# 现象: 消息发送成功但消费者收不到
# 原因: Exchange 存在但 Routing Key 不匹配任何 Binding Key
# 诊断: 启用 Return Listener (mandatory=True)
channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,
    body=message,
    mandatory=True  # 无法路由时返回给生产者
)
channel.add_on_return_callback(
    lambda ch, method, prop, body: print(f" [x] RETURNED: {body}")
)

A.6 连接断开自动重连

import pika
import time

def get_connection():
    """带重试的连接建立"""
    for attempt in range(1, 11):
        try:
            return pika.BlockingConnection(
                pika.ConnectionParameters(
                    host='119.3.165.24',
                    credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026'),
                    connection_attempts=3,
                    retry_delay=5
                )
            )
        except pika.exceptions.AMQPConnectionError:
            print(f" [!] Connection failed (attempt {attempt}/10), retrying...")
            time.sleep(5)
    raise RuntimeError("Failed to connect after 10 attempts")

A.7 管理界面安全端口

# 华为云安全组需放行以下端口:
# 5672 (AMQP) — 客户端连接
# 15672 (Management UI) — 管理界面
# 25672 (Erlang Distribution) — 集群通信

# 检查安全组
$ sudo ufw status
Status: inactive

# 华为云控制台 → 安全组 → 入方向规则:
# 5672  → 0.0.0.0/0 (或限制来源 IP)
# 15672 → 0.0.0.0/0 (或限制来源 IP)

附录B: 一键环境验证脚本

#!/usr/bin/env python3
"""verify_rabbitmq.py — RabbitMQ 环境完整性验证"""

import pika
import sys

RABBITMQ_HOST = '119.3.165.24'
CREDENTIALS = pika.PlainCredentials('admin', 'RabbitMQ@2026')

tests = {
    'connection': False,
    'queue_declare': False,
    'exchange_declare': False,
    'publish': False,
    'consume': False,
}

try:
    # 1. 连接测试
    conn = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=CREDENTIALS))
    ch = conn.channel()
    tests['connection'] = True
    print("✅ Connection OK")

    # 2. 队列声明
    ch.queue_declare(queue='verify_test_queue', durable=False, auto_delete=True)
    tests['queue_declare'] = True
    print("✅ Queue declare OK")

    # 3. 交换机声明
    ch.exchange_declare(exchange='verify_test_exchange', exchange_type='fanout', auto_delete=True)
    tests['exchange_declare'] = True
    print("✅ Exchange declare OK")

    # 4. 发布消息
    ch.basic_publish(exchange='', routing_key='verify_test_queue', body='verify')
    tests['publish'] = True
    print("✅ Publish OK")

    # 5. 消费消息
    method, props, body = ch.basic_get(queue='verify_test_queue', auto_ack=True)
    assert body == b'verify', f"Expected 'verify', got {body}"
    tests['consume'] = True
    print("✅ Consume OK")

    # 清理
    ch.queue_delete(queue='verify_test_queue')
    ch.exchange_delete(exchange='verify_test_exchange')
    conn.close()

    print(f"\n🎉 All {sum(tests.values())}/{len(tests)} tests passed!")

except Exception as e:
    print(f"❌ Test failed: {e}")
    sys.exit(1)

文档版本: v1.1 | 创建日期: 2026-06-17 | 更新: 2026-06-17 (全部输出替换为服务器实操数据)
对应实验*: RabbitMQ 官方教程 1-7 (简介 + Hello World + 工作队列 + 发布订阅 + 路由 + 主题 + RPC)
全部输出*: 均来自 ecs-57c4 集群实操, 可直接复现

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐