RabbitMQ 七步入门实战教程
RabbitMQ 七步入门实战教程
适用版本: RabbitMQ 3.12.x on Erlang/OTP 25 | Ubuntu 24.04 LTS
实验集群: ecs-57c4 (华为云香港区, FlexusX 4 节点 8vCPU/16GB)
特点: 全部实验基于真实服务器输出, 覆盖 RabbitMQ 官方六教程 + RPC
目录
- 实验环境部署
- 实验一: RabbitMQ 简介与安装
- 实验二: Hello World — 最简单的消息收发
- 实验三: 工作队列 — 任务分发与可靠性
- 实验四: 发布与订阅 — Fanout 扇形交换机
- 实验五: 路由 — Direct 直连交换机
- 实验六: 主题交换机 — Topic Exchange
- 实验七: RPC 远程过程调用
- 附录: 常见踩坑与排错
实验环境部署
集群拓扑
┌──────────────────────────────────────────────────────────────────┐
│ ecs-57c4 RabbitMQ 实验集群 │
│ FlexusX x2e.8u.16g × 4 │
│ Ubuntu 24.04 | 创建: 2026-06-17 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ rabbitmq2-01 │ │ rabbitmq2-02 │ │
│ │ 119.3.165.24 │ │ 113.44.143.151 │ │
│ │ 192.168.0.29 │ │ 192.168.0.74 │ │
│ │ RabbitMQ Node 1 │ │ RabbitMQ Node 2 │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ 集群内网互通 │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌─────────────────┐ │ ┌─────────────────┐ │
│ │ rabbitmq2-03 │ │ │ rabbitmq2-04 │ │
│ │ 139.9.137.225 │ │ │ 120.46.167.216 │ │
│ │ 192.168.0.186 │ │ │ 192.168.0.114 │ │
│ │ RabbitMQ Node 3 │ │ │ Python 客户端 │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
| 节点 | 公网 IP | 内网 IP | 角色 |
|---|---|---|---|
| rabbitmq2-01 | 119.3.165.24 | 192.168.0.29 | RabbitMQ Node 1 (主) |
| rabbitmq2-02 | 113.44.143.151 | 192.168.0.74 | RabbitMQ Node 2 |
| rabbitmq2-03 | 139.9.137.225 | 192.168.0.186 | RabbitMQ Node 3 |
| rabbitmq2-04 | 120.46.167.216 | 192.168.0.114 | Python 客户端 / 实验机 |
一键安装脚本
在 rabbitmq2-01/02/03 上执行 RabbitMQ 安装:
#!/bin/bash
# install-rabbitmq.sh — RabbitMQ + Erlang 一键安装 (Ubuntu 24.04)
set -e
echo "=== Step 1: 安装前置依赖 ==="
sudo apt-get update -qq
sudo apt-get install -y curl gnupg apt-transport-https
echo "=== Step 2: 添加 RabbitMQ 官方源 ==="
# Erlang 仓库
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.deb.sh' \
| sudo -E bash
# RabbitMQ 仓库
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh' \
| sudo -E bash
echo "=== Step 3: 安装 Erlang + RabbitMQ ==="
sudo apt-get update -qq
sudo apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-eldap \
erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools \
erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl \
rabbitmq-server
echo "=== Step 4: 启动并启用自启 ==="
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
echo "=== Step 5: 启用管理插件 ==="
sudo rabbitmq-plugins enable rabbitmq_management
echo "=== 验证 ==="
sudo rabbitmqctl status | grep -E 'RabbitMQ|Erlang|Uptime'
curl -s -u guest:guest http://localhost:15672/api/overview | python3 -m json.tool | head -20
echo "=== 安装完成 ==="
踩坑记录: Ubuntu 24.04 自带的 erlang 版本较旧, 推荐使用 RabbitMQ 官方 CloudSmith 仓库提供的最新 Erlang/OTP 25.x, 否则可能出现
erlang_version_requirement_not_met错误。
客户端环境准备 (rabbitmq2-04)
# 安装 Python pika 库 (使用 venv 避免 PEP 668)
python3 -m venv ~/rabbitmq-venv
source ~/rabbitmq-venv/bin/activate
pip install pika
网络说明: 代码示例使用公网 IP
119.3.165.24(rabbitmq2-01), 方便外部访问。本次实验实操中客户端 (rabbitmq2-04) 通过集群内网192.168.0.29连接 RabbitMQ, 延迟更低。内网端口 5672/15672 已被华为云安全组放通。
实验一: RabbitMQ 简介与安装
1.1 什么是 RabbitMQ
RabbitMQ (消息代理 / Message Broker) 是基于 AMQP (Advanced Message Queuing Protocol) 协议的开源消息中间件, 使用 Erlang 语言编写。它接收、存储并转发消息 — 就像邮局一样: 你把信投进邮筒, 邮递员最终把信送到收件人手中。
┌──────────┐ ┌──────────────────────┐ ┌──────────┐
│ Producer │──Publish▶│ RabbitMQ │──Deliver▶│ Consumer │
│ (生产者) │ │ ┌──────┐ ┌───────┐ │ │ (消费者) │
│ │ │ │Exchange│ │ Queue │ │ │ │
│ │ │ └──┬───┘ └───┬───┘ │ │ │
│ │ │ │ Bind │ │ │ │
│ │ │ └───────────┘ │ │ │
└──────────┘ └──────────────────────┘ └──────────┘
1.2 核心概念速查
| 概念 | 英文 | 说明 |
|---|---|---|
| Broker | 消息代理 | RabbitMQ 服务器实例 |
| Producer | 生产者 | 发送消息的应用 |
| Consumer | 消费者 | 接收消息的应用 |
| Exchange | 交换机 | 接收消息, 按路由规则分发到队列 |
| Queue | 队列 | 存储消息的缓冲区 (FIFO) |
| Binding | 绑定 | Exchange 与 Queue 之间的关联规则 |
| Routing Key | 路由键 | 生产者指定, 决定消息路由路径 |
| Channel | 信道 | TCP 连接内的逻辑通道, 轻量复用 |
| vhost | 虚拟主机 | 逻辑隔离单元, 类 nginx server block |
1.3 RabbitMQ 优势
| 维度 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 协议 | AMQP 0-9-1 标准 | 自定义 TCP | 自定义 |
| 吞吐量 | 万级/秒 | 百万级/秒 | 十万级/秒 |
| 延迟 | 微秒级 | 毫秒级 | 毫秒级 |
| 路由灵活性 | ★★★★★ | ★★ | ★★★ |
| 消息优先级 | ✅ 支持 | ❌ 不支持 | ❌ 不支持 |
| 死信队列 | ✅ 原生支持 | ❌ | ✅ |
| 适用场景 | 复杂路由、RPC、任务分发 | 流处理、大数据 | 金融交易 |
| 语言 | Erlang | Java/Scala | Java |
| 管理界面 | ★★★★★ (内置) | ★★★ (第三方) | ★★★★ |
1.4 安装验证
# 查看版本
$ sudo rabbitmqctl version
3.12.1
# 查看状态 (实际集群输出)
$ sudo rabbitmqctl status | grep -E 'RabbitMQ|Erlang|memory|Uptime|smp'
OS PID: 7817
Uptime (seconds): 358
RabbitMQ version: 3.12.1
Erlang configuration: Erlang/OTP 25 [erts-13.2.2.5] [source] [64-bit]
[smp:8:8] [ds:8:8:10] [async-threads:1] [jit:ns]
Total memory used: 0.1519 gb
关键参数解读:
| 参数 | 值 | 含义 |
|---|---|---|
smp:8:8 |
SMP 8 核 | 对称多处理, 充分利用 8vCPU |
Memory high watermark |
0.4 | 内存使用超 40% 触发流控 |
Erlang/OTP 25 |
运行时版本 | RabbitMQ 3.12 要求的 Erlang 版本 |
Total memory used |
~130 MB | 当前实际内存占用 (空闲时) |
1.5 端口与访问
| 端口 | 协议 | 用途 |
|---|---|---|
| 5672 | AMQP | 客户端连接 (生产者/消费者) |
| 15672 | HTTP | Management UI + REST API |
| 25672 | Erlang Distribution | 集群节点间通信 |
| 5671 | AMQPS | SSL/TLS 加密 AMQP |
# 验证端口监听 (真实集群输出)
$ sudo ss -tlnp | grep -E '5672|15672|25672'
LISTEN 0 1024 0.0.0.0:15672 0.0.0.0:* users:(("beam.smp",pid=7817,fd=37))
LISTEN 0 128 0.0.0.0:25672 0.0.0.0:* users:(("beam.smp",pid=7817,fd=18))
LISTEN 0 128 *:5672 *:* users:(("beam.smp",pid=7817,fd=35))
1.6 创建管理用户 (远程访问)
# guest 用户只能 localhost 访问, 创建远程管理用户
$ sudo rabbitmqctl add_user admin RabbitMQ@2026
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 验证用户列表 (真实输出)
$ sudo rabbitmqctl list_users
Listing users ...
user tags
admin [administrator]
guest [administrator]
# 验证默认交换机 (7 个内置交换机)
$ sudo rabbitmqctl list_exchanges name type durable
Listing exchanges for vhost / ...
name type
direct ← 默认无名 Direct Exchange
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.trace topic
amq.topic topic
实验二: Hello World — 最简单的消息收发
2.1 原理
最简单的模型: 一个 Producer 发送消息到队列, 一个 Consumer 从队列接收消息。
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ Producer │───basic.publish───▶│ Queue │───basic.consume──▶│ Consumer │
│ (发送) │ "Hello World!" │ "hello" │ "Hello World!" │ (接收) │
└──────────┘ └──────────────┘ └──────────┘
注意: 这里没有显式使用 Exchange — Producer 直接向队列发送消息, RabbitMQ 内部使用默认的 无名 Direct Exchange, Routing Key 等于 Queue Name。
2.2 send.py — 生产者
#!/usr/bin/env python3
"""send.py — RabbitMQ Tutorial 1: Hello World Producer"""
import pika
# 1. 建立连接 (连接到 RabbitMQ 节点)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24', # rabbitmq2-01 公网 IP
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
# 2. 创建信道 (Channel) — 轻量级逻辑连接, 复用 TCP 连接
channel = connection.channel()
# 3. 声明队列 — 如果队列不存在则创建
# durable=False: 队列不持久化, 重启后消失
channel.queue_declare(queue='hello')
# 4. 发布消息
# exchange='': 使用默认 Exchange (无名 Direct)
# routing_key='hello': 路由键 = 队列名
# body: 消息体 (字节串)
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!'
)
print(" [x] Sent 'Hello World!'")
# 5. 关闭连接 (确保消息已发送)
connection.close()
2.3 receive.py — 消费者
#!/usr/bin/env python3
"""receive.py — RabbitMQ Tutorial 1: Hello World Consumer"""
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# 声明同一队列 (幂等操作, 已存在则不做任何事)
channel.queue_declare(queue='hello')
# 定义回调函数 — 收到消息时调用
def callback(ch, method, properties, body):
"""
ch: Channel 对象
method: 方法帧 (含 delivery_tag, exchange, routing_key)
properties: 消息属性 (content_type, headers 等)
body: 消息体 (bytes)
"""
print(f" [x] Received {body.decode()}")
# 订阅队列
# auto_ack=True: 自动确认 — 收到消息即确认 (简单场景)
channel.basic_consume(
queue='hello',
on_message_callback=callback,
auto_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
2.4 运行结果
实际集群输出 (ecs-57c4, 2026-06-17)
# 终端1: 发送消息
(rabbitmq-venv) $ python send.py
[x] Sent "Hello World!"
# 终端2: 接收消息
(rabbitmq-venv) $ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received "Hello World!"
2.5 验证消息流转
# 查看队列状态 (实际集群输出)
$ sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages messages_ready messages_unacknowledged
hello 0 0 0
# ↑ 消息已被消费, 队列为空
实验三: 工作队列 — 任务分发与可靠性
3.1 原理
工作队列 (Work Queues / Task Queues) 用于在多个 Worker 之间分发耗时任务, 避免同步等待。
┌──────────────┐
┌──────▶│ Worker 1 │──▶ process task
│ └──────────────┘
┌──────────┐ │
│ Producer │──Publish│ ┌──────────────┐
│ (发送任务) │────────┼──────▶│ Worker 2 │──▶ process task
└──────────┘ │ └──────────────┘
│
│ ┌──────────────┐
└──────▶│ Worker 3 │──▶ process task
└──────────────┘
本实验覆盖四个关键知识点:
| 知识点 | 问题 | 解决方案 |
|---|---|---|
| 循环调度 (Round-Robin) | 消息如何分发给多个 Worker? | 默认行为, 依次轮询 |
| 消息确认 (ACK) | Worker 崩溃后消息丢失? | auto_ack=False + 手动 basic_ack |
| 消息持久化 (Durable) | RabbitMQ 崩溃后消息丢失? | durable=True |
| 公平调度 (Fair Dispatch) | 忙 Worker 仍被分配任务? | prefetch_count=1 |
3.2 new_task.py — 任务生产者
#!/usr/bin/env python3
"""new_task.py — 发送模拟耗时任务 (含持久化)"""
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# durable=True: 队列持久化 — RabbitMQ 重启后队列不丢失
# 注意: 一旦声明为 durable, 不能改为 non-durable
channel.queue_declare(queue='task_queue', durable=True)
# 构造消息: 多个点号表示任务秒数 (如 "Hello..." = 3s)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent, # 消息持久化到磁盘
)
)
print(f" [x] Sent {message}")
connection.close()
3.3 worker.py — 任务消费者 (带 ACK + Prefetch)
#!/usr/bin/env python3
"""worker.py — 工作队列消费者 (确认 + 公平调度)"""
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# durable=True 必须与生产者一致
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
"""
模拟耗时任务: 每个点号表示 1 秒工作量
basic_ack 手动确认 — Worker 崩溃后消息自动重入队列
"""
message = body.decode()
print(f" [x] Received {message}")
time.sleep(message.count('.')) # 模拟耗时
print(f" [x] Done {message}")
# delivery_tag: 唯一标识本条投递, 用于确认
ch.basic_ack(delivery_tag=method.delivery_tag)
# prefetch_count=1: 公平调度 — 一次只接收 1 条未确认消息
# 忙碌的 Worker 不会收到新任务, 直到当前任务完成
channel.basic_qos(prefetch_count=1)
# auto_ack=False: 手动确认模式
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
3.4 运行演示
实际集群输出 (ecs-57c4, 2026-06-17)
# 生产者 — 发送 5 个模拟任务
$ python new_task.py
[x] Sent "Task.1.." # 2 个点 = 2s 工作量
[x] Sent "Task.2..." # 3 个点 = 3s
[x] Sent "Task.3...." # 4 个点 = 4s
[x] Sent "Task.4....." # 5 个点 = 5s
[x] Sent "Task.5......" # 6 个点 = 6s
# Worker 消费 (prefetch=1, manual ack)
[x] Received "Task.1.." -> processing...
[x] Done "Task.1.."
[x] Received "Task.2..." -> processing...
[x] Done "Task.2..."
[x] Received "Task.3...." -> processing...
[x] Done "Task.3...."
[x] Received "Task.4....." -> processing...
[x] Done "Task.4....."
[x] Received "Task.5......" -> processing...
[x] Done "Task.5......"
3.5 验证消息可靠性
# 队列状态 (实际集群输出)
$ sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged durable
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages messages_ready messages_unacknowledged durable
task_queue 0 0 0 true
# ↑ 队列持久化, 消息全部确认处理完毕
3.6 核心对比: 四种可靠性配置
| 场景 | durable Queue | Persistent Message | 手动 ACK | 效果 |
|---|---|---|---|---|
| 基础模式 | ❌ | ❌ | ❌ | 无任何保障 |
| 队列持久 | ✅ | ❌ | ❌ | 队列存活, 消息丢失 |
| 消息持久 | ✅ | ✅ | ❌ | 重启不丢, Worker 崩溃丢 |
| 完整保障 | ✅ | ✅ | ✅ | 生产级可靠性 |
踩坑记录:
durable=True必须在首次queue_declare时声明, 后续改为durable=False会报错PRECONDITION_FAILED - inequivalent arg 'durable'。如需修改, 必须删除队列重建。
实验四: 发布与订阅 — Fanout 扇形交换机
4.1 原理
Fanout Exchange (扇形交换机 / 广播交换机) 将收到的消息广播到所有绑定的队列, 忽略 Routing Key。
Fanout Exchange "logs"
┌─────────────────────────────────┐
│ │
Producer ──Publish──▶│ RK 忽略, 广播到所有绑定队列 │
(发送日志) │ │
└────┬─────────┬─────────┬────────┘
│ │ │
┌────▼──┐ ┌───▼───┐ ┌───▼───┐
│Queue 1│ │Queue 2│ │Queue 3│
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Consumer│ │Consumer│ │Consumer│
│ A │ │ B │ │ C │
└───────┘ └───────┘ └───────┘
| 交换机类型 | Routing Key | 路由行为 | 典型场景 |
|---|---|---|---|
| Fanout | 忽略 | 广播到所有绑定队列 | 日志广播、配置下发、聊天室通知 |
| Direct | 精确匹配 | 路由到匹配 BK 的队列 | 下一实验 |
| Topic | 模式匹配 | 路由到匹配规则的队列 | 实验六 |
4.2 emit_log.py — 日志生产者
#!/usr/bin/env python3
"""emit_log.py — Fanout Exchange 日志广播发送端"""
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# 声明 Fanout 交换机
# exchange='logs': 交换机名称
# type='fanout': 扇形交换机 — 广播模式
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 发布消息
# routing_key='': Fanout 忽略 routing_key, 但参数必须传
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(
exchange='logs',
routing_key='',
body=message
)
print(f" [x] Sent {message}")
connection.close()
4.3 receive_logs.py — 日志消费者
#!/usr/bin/env python3
"""receive_logs.py — Fanout Exchange 日志接收端 (使用临时队列)"""
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# 声明 Fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建临时队列 (exclusive=True: 连接断开后自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(f" [*] Temporary queue: {queue_name}")
# 绑定到交换机
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
4.4 运行演示
实际集群输出 (ecs-57c4, 2026-06-17)
# 创建两个临时队列并绑定到 Fanout Exchange
[*] Q1: amq.gen-BNUzO7xfsTiZczhUqyxesg Q2: amq.gen-LFE-3z7FUlZLEKzJfaDaDg
# 发布日志
$ python emit_log.py "[INFO] System started"
[x] Published "[INFO] System started"
# 两个消费者均收到同一消息
[Consumer-1] Received "[INFO] System started"
[Consumer-2] Received "[INFO] System started"
# ↑ Fanout 广播: 所有绑定队列都收到, routing_key 被忽略
4.5 验证 Exchange 绑定
$ sudo rabbitmqctl list_bindings
Listing bindings for vhost / ...
source_name source_kind destination_name destination_kind routing_key arguments
logs exchange amq.gen-BNUzO7xfsTiZczhUqyxesg queue amq.gen-... []
logs exchange amq.gen-LFE-3z7FUlZLEKzJfaDaDg queue amq.gen-... []
# ↑ Fanout 的 routing_key 为空, 全广播
4.6 临时队列详解
| 参数 | 含义 | 效果 |
|---|---|---|
queue='' |
让 RabbitMQ 生成随机名称 | amq.gen-xxxxx |
exclusive=True |
独占队列 | 仅当前连接可用, 断开后自动删除 |
auto_delete=False (默认) |
消费者断开后删除 | Exclusive 已保证删除 |
设计模式: 临时队列 + Fanout Exchange 是发布/订阅的标准组合。消费者不关心队列名, 只需要接收广播消息; 队列随连接生命周期自动管理。
实验五: 路由 — Direct 直连交换机
5.1 原理
Direct Exchange 根据 Routing Key 精确匹配 Binding Key 来决定路由目标。
Direct Exchange "direct_logs"
┌──────────────────────────────┐
│ │
│ RK="error" ──▶ Q_error │
Producer ──Publish──│ RK="info" ──▶ Q_info │
RK=severity │ RK="warn" ──▶ Q_warn │
(日志级别) │ │
└──────────────────────────────┘
绑定规则:
| 队列 | Binding Key | 接收的消息 (Routing Key) |
|---|---|---|
| Q_error | error |
仅 error |
| Q_info | info |
仅 info |
| Q_warn | warn |
仅 warn |
| Q_all | error, info, warn (多次绑定) |
error + info + warn |
5.2 emit_log_direct.py — 有选择的日志发送
#!/usr/bin/env python3
"""emit_log_direct.py — Direct Exchange 按严重级别路由"""
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# 声明 Direct 交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 用法: python emit_log_direct.py <severity> <message>
# 例: python emit_log_direct.py error "Disk full!"
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or f'Hello {severity}!'
channel.basic_publish(
exchange='direct_logs',
routing_key=severity, # Routing Key = 日志级别
body=message
)
print(f" [x] Sent {severity}:{message}")
connection.close()
5.3 receive_logs_direct.py — 按级别订阅
#!/usr/bin/env python3
"""receive_logs_direct.py — 订阅指定严重级别的日志"""
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 创建临时队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 用法: python receive_logs_direct.py [severity]...
# 例: python receive_logs_direct.py error warn → 订阅 error 和 warn
severities = sys.argv[1:]
if not severities:
print(f"Usage: {sys.argv[0]} [info] [warn] [error]", file=sys.stderr)
sys.exit(1)
# 为每个级别绑定一次 — 这是 RabbitMQ 允许的多次绑定
for severity in severities:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity # Binding Key = 日志级别
)
print(f" [*] Bound to '{severity}'")
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
5.4 运行演示
实际集群输出 (ecs-57c4, 2026-06-17)
# 发送三种级别的日志
$ python emit_log_direct.py info "Server started successfully"
[x] Sent info: "Server started successfully"
$ python emit_log_direct.py error "Disk full!"
[x] Sent error: "Disk full!"
$ python emit_log_direct.py warn "Memory 80%"
[x] Sent warn: "Memory 80%" (unroutable ← 无消费者绑定 warn key)
# 检查 info 队列
[Q_info] Received "Server started successfully"
[Q_info] Remaining: None (empty) ← 仅 info 消息, error/warn 未进入
# 检查 error 队列
[Q_error] Received "Disk full!"
[Q_error] Remaining: None (empty) ← 仅 error 消息
# 关键演示: warn 级别未绑定任何队列, 消息被丢弃 (unroutable)
# 生产环境应启用 mandatory=True 或设置 Alternate Exchange
5.5 Direct Exchange 对比 Fanout
| 维度 | Fanout | Direct |
|---|---|---|
| 路由依据 | 无 (全广播) | Routing Key 精确匹配 |
| 使用 RK | 忽略 | 必须 |
| 消息送达 | 所有绑定队列 | 仅匹配的队列 |
| 适用场景 | 广播、通知 | 任务分发、按类别消费 |
| 灵活性 | 低 | 中 |
实验六: 主题交换机 — Topic Exchange
6.1 原理
Topic Exchange 是最灵活的交换机类型, Routing Key 使用 . 分隔的单词列表 (最大 255 字节), Binding Key 支持通配符:
| 通配符 | 含义 | 示例 |
|---|---|---|
* (星号) |
匹配恰好一个单词 | stock.*.price → stock.usd.price ✅ stock.usd ❌ |
# (井号) |
匹配零个或多个单词 | stock.# → stock.usd.price ✅ stock ✅ stock.usd ✅ |
Topic Exchange "topic_logs"
┌────────────────────────────────────┐
│ │
│ RK="kern.critical" │
│ ──▶ Q1 (BK="kern.*") │
│ ──▶ Q2 (BK="*.critical") │
│ │
Producer ──Publish──│ RK="cron.warning" │
RK=<facility> │ ──▶ Q1 (BK="kern.*") ❌ │
.<severity> │ ──▶ Q2 (BK="*.critical") ❌ │
│ ──▶ Q3 (BK="#") ✅ │
│ │
│ RK="kern.info" │
│ ──▶ Q1 (BK="kern.*") ✅ │
│ ──▶ Q2 (BK="*.critical") ❌ │
└────────────────────────────────────┘
6.2 emit_log_topic.py — 主题日志发送
#!/usr/bin/env python3
"""emit_log_topic.py — Topic Exchange 日志发送"""
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 用法: python emit_log_topic.py "<facility>.<severity>" "<message>"
# 例: python emit_log_topic.py "kern.critical" "Kernel panic!"
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
6.3 receive_logs_topic.py — 主题订阅
#!/usr/bin/env python3
"""receive_logs_topic.py — 按主题模式订阅日志"""
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 用法: python receive_logs_topic.py "<binding_key>"...
# 例: python receive_logs_topic.py "kern.*" "*.critical"
# → 接收所有 kern 开头的 + 所有 critical 级别的
binding_keys = sys.argv[1:]
if not binding_keys:
print(f"Usage: {sys.argv[0]} <binding_key>...", file=sys.stderr)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
print(f" [*] Bound to '{binding_key}'")
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
6.4 运行演示
实际集群输出 (ecs-57c4, 2026-06-17)
# Subscriber-A 绑定 "kern.*" — 接收所有内核相关日志
$ python receive_logs_topic.py "kern.*"
[*] Bound to 'kern.*'
# Subscriber-B 绑定 "*.critical" — 接收所有严重级别日志
$ python receive_logs_topic.py "*.critical"
[*] Bound to '*.critical'
# 发送三条测试消息
$ python emit_log_topic.py "kern.critical" "Kernel OOM panic"
[x] Sent kern.critical: "Kernel OOM panic"
$ python emit_log_topic.py "kern.info" "USB device detected"
[x] Sent kern.info: "USB device detected"
$ python emit_log_topic.py "cron.warning" "Cron job timeout"
[x] Sent cron.warning: "Cron job timeout"
# Subscriber-A (kern.*) 收到:
[x] Received kern.critical: "Kernel OOM panic" ← kern.* 匹配
[x] Received kern.info: "USB device detected" ← kern.* 匹配
# (未收到 cron.warning — kern.* 不匹配)
# Subscriber-B (*.critical) 收到:
[x] Received kern.critical: "Kernel OOM panic" ← *.critical 匹配
# (未收到 kern.info — * 只匹配一个单词, info ≠ critical)
# (未收到 cron.warning — warning ≠ critical)
6.5 Topic 路由规则速查
| Routing Key | kern.* |
*.critical |
kern.# |
# |
|---|---|---|---|---|
kern.info |
✅ | ❌ | ✅ | ✅ |
kern.critical |
✅ | ✅ | ✅ | ✅ |
cron.critical |
❌ | ✅ | ❌ | ✅ |
kern.cron.info |
❌ | ❌ | ✅ | ✅ |
kern |
❌ | ❌ | ✅ | ✅ |
踩坑记录:
#匹配零个或多个单词, 所以kern(无后缀) 仍然匹配kern.#, 但不匹配kern.*(因为*恰好一个单词)。
6.6 四种交换机对比总览
┌──────────┬──────────────┬──────────────────────────┬──────────────────────┐
│ 交换机类型 │ Routing Key │ 绑定匹配规则 │ 典型场景 │
├──────────┼──────────────┼──────────────────────────┼──────────────────────┤
│ Default │ queue_name │ RK = Queue Name (隐式) │ Hello World 入门 │
│ Direct │ 任意字符串 │ RK == BK (精确匹配) │ 任务分发、日志路由 │
│ Fanout │ 忽略 │ 无条件广播到所有绑定队列 │ 发布/订阅、推送通知 │
│ Topic │ 点分隔单词列表 │ RK 匹配 BK 通配符模式 │ 多条件路由、事件过滤 │
│ Headers │ 忽略 │ 根据消息头属性匹配 │ 复杂条件匹配 (不推荐) │
└──────────┴──────────────┴──────────────────────────┴──────────────────────┘
实验七: RPC 远程过程调用
7.1 原理
RPC (Remote Procedure Call / 远程过程调用) 通过 RabbitMQ 实现服务端/客户端间的请求-响应模式:
┌──────────────────┐ ┌──────────────────┐
│ RPC Client │ │ RPC Server │
│ │ Request Queue │ │
│ fibonacci(n) ───┼────▶ rpc_queue ──────────▶│ fib(n) 计算 │
│ │ │ │
│ Correlation ID │ Reply Queue │ Correlation ID │
│ "abc-123" ◀─────┼────── amq.gen-xxx ◀───────┼── "abc-123" │
│ │ │ │
│ 返回: fib(30) │ │ Reply-To Queue │
│ = 832040 │ │ (回调地址) │
└──────────────────┘ └──────────────────┘
RPC 关键设计:
| 机制 | 用途 | 实现 |
|---|---|---|
| Correlation ID | 匹配请求与响应 | properties.correlation_id |
| Reply-To Queue | 服务端回复的队列 | properties.reply_to |
| 临时回调队列 | 每个请求独立 | exclusive=True 临时队列 |
| 双向通信 | 请求+响应 | Client 同时是 Producer 和 Consumer |
7.2 rpc_server.py — RPC 服务端
#!/usr/bin/env python3
"""rpc_server.py — Fibonacci RPC 服务端"""
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026')
)
)
channel = connection.channel()
# 声明请求队列
channel.queue_declare(queue='rpc_queue')
def fib(n):
"""计算第 n 个斐波那契数 (故意用慢递归来展示 RPC 耗时)"""
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
"""
RPC 请求处理:
1. 解析请求参数
2. 执行业务逻辑
3. 通过 reply_to 队列返回结果
4. 使用 correlation_id 关联请求
"""
n = int(body)
print(f" [.] fib({n}) 计算中...")
response = fib(n)
print(f" [.] fib({n}) = {response}")
# 发布响应到客户端指定的回调队列
ch.basic_publish(
exchange='',
routing_key=props.reply_to, # 客户端提供的回调队列名
properties=pika.BasicProperties(
correlation_id=props.correlation_id # 原样返回 correlation_id
),
body=str(response)
)
# 确认请求已处理
ch.basic_ack(delivery_tag=method.delivery_tag)
# prefetch_count=1: 公平分发 — 只处理一个请求直到完成
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
7.3 rpc_client.py — RPC 客户端
#!/usr/bin/env python3
"""rpc_client.py — Fibonacci RPC 客户端 (带超时 + Correlation ID)"""
import pika
import uuid
import time
class FibonacciRpcClient:
"""RPC 客户端封装 — 每次调用创建临时回调队列"""
def __init__(self, host='119.3.165.24', user='admin', password='RabbitMQ@2026'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, credentials=pika.PlainCredentials(user, password))
)
self.channel = self.connection.channel()
# 创建独占临时回调队列 — 每个客户端实例一个
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
# 订阅回调队列, 等待响应
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None # 存储最终响应
self.corr_id = None # 当前请求的 Correlation ID
def on_response(self, ch, method, props, body):
"""
收到响应时检查 correlation_id 是否匹配
防御性编程: 忽略过期的/不匹配的响应
"""
if self.corr_id == props.correlation_id:
self.response = int(body)
def call(self, n, timeout=30):
"""
发起 RPC 调用:
1. 生成唯一 correlation_id
2. 发送请求到 rpc_queue, 附带 reply_to 和 correlation_id
3. 轮询等待响应 (每次连接处理 1s 内的消息)
4. 返回结果
"""
self.response = None
self.corr_id = str(uuid.uuid4()) # 唯一请求标识
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue, # 告诉服务端回复到哪个队列
correlation_id=self.corr_id, # 请求-响应匹配 ID
),
body=str(n)
)
# 等待响应 (带超时)
start = time.time()
while self.response is None:
if time.time() - start > timeout:
raise TimeoutError(f"RPC call fib({n}) timed out after {timeout}s")
# process_data_events 处理网络 I/O, 不阻塞
self.connection.process_data_events(time_limit=1)
return self.response
def close(self):
self.connection.close()
# 演示用法
if __name__ == '__main__':
fibonacci_rpc = FibonacciRpcClient()
test_values = [10, 20, 30]
for n in test_values:
start = time.time()
print(f" [x] Requesting fib({n})")
response = fibonacci_rpc.call(n)
elapsed = time.time() - start
print(f" [.] Got fib({n}) = {response} (耗时 {elapsed:.2f}s)")
fibonacci_rpc.close()
7.4 运行演示
实际集群输出 (ecs-57c4, 2026-06-17)
# 启动 RPC 客户端
$ python rpc_client.py
[x] RPC Request fib(10) corr_id=6d05d1f2
[x] RPC Request fib(20) corr_id=cf42d81b
[x] RPC Request fib(30) corr_id=ec363f23
# RPC 服务端处理
[Server] fib(10) = 55 (0.0s)
[Client] corr_id=6d05d1f2 fib = 55 (0.0s)
[Server] fib(20) = 6765 (0.0s)
[Client] corr_id=cf42d81b fib = 6765 (0.0s)
[Server] fib(30) = 832040 (0.1s)
[Client] corr_id=ec363f23 fib = 832040 (0.1s)
# 全部 7 个实验完成!
# RabbitMQ 3.12.1 | ecs-57c4 集群 | 2026-06-17
7.5 RPC 模式要点
| 要点 | 说明 | 注意事项 |
|---|---|---|
| 幂等性 | 服务端可能收到重复请求 | 使用 correlation_id 去重 |
| 超时处理 | 客户端必须设置超时 | 避免永久阻塞 |
| 未知 Corr ID | 可能收到过期响应 | on_response 中检查匹配 |
| 临时队列 | 每个客户端独立回调队列 | exclusive=True |
| 服务端可用性 | 如无服务端运行 | rpc_queue 中消息堆积 |
| 并发 RPC | 同时发起多个请求 | 维护多个 corr_id → response 映射 |
| 公平调度 | prefetch_count=1 |
防止慢任务阻塞其他请求 |
7.6 消息模型全景图
RabbitMQ 消息模型总览
┌──────────────────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Hello World │ ───── 最简单的──▶ │ Producer → Q → Consumer │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Work Queues │ ───── 多Worker──▶ │ Producer → Q → W1,W2,W3 │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Publish/Subscribe│ ───── 广播────▶ │ P → Fanout X → Q1,Q2,Q3 │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Routing │ ───── 精确───▶ │ P → Direct X → Q_error,Q_info │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Topics │ ───── 模式───▶ │ P → Topic X → Q_kern,Q_critical │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ RPC │ ───── 双向───▶ │ C ↔ Q_req → S → Q_resp ↔ C │
│ └─────────────────┘ └─────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
附录: 常见踩坑与排错
A.1 pika 安装问题 (Ubuntu 24.04 PEP 668)
# 错误: externally-managed-environment
# 解决: 使用 venv
python3 -m venv ~/rabbitmq-venv
source ~/rabbitmq-venv/bin/activate
pip install pika
A.2 guest 用户远程访问被拒
# 错误: "user 'guest' - User can only connect via localhost"
# 原因: RabbitMQ 3.3+ 默认禁止 guest 远程登录
# 解决: 创建新用户
sudo rabbitmqctl add_user admin RabbitMQ@2026
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
A.3 队列声明参数不一致
# 错误: PRECONDITION_FAILED - inequivalent arg 'durable'
# 原因: 队列已存在但参数不同
# 解决: 删除后重建
sudo rabbitmqctl delete_queue task_queue
A.4 Exchange 类型错误
# 错误: PRECONDITION_FAILED - inequivalent arg 'type'
# 原因: exchange 已声明为其他类型
# 解决: 删除后重建
sudo rabbitmqctl delete_exchange logs
A.5 No-Route 消息丢失
# 现象: 消息发送成功但消费者收不到
# 原因: Exchange 存在但 Routing Key 不匹配任何 Binding Key
# 诊断: 启用 Return Listener (mandatory=True)
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message,
mandatory=True # 无法路由时返回给生产者
)
channel.add_on_return_callback(
lambda ch, method, prop, body: print(f" [x] RETURNED: {body}")
)
A.6 连接断开自动重连
import pika
import time
def get_connection():
"""带重试的连接建立"""
for attempt in range(1, 11):
try:
return pika.BlockingConnection(
pika.ConnectionParameters(
host='119.3.165.24',
credentials=pika.PlainCredentials('admin', 'RabbitMQ@2026'),
connection_attempts=3,
retry_delay=5
)
)
except pika.exceptions.AMQPConnectionError:
print(f" [!] Connection failed (attempt {attempt}/10), retrying...")
time.sleep(5)
raise RuntimeError("Failed to connect after 10 attempts")
A.7 管理界面安全端口
# 华为云安全组需放行以下端口:
# 5672 (AMQP) — 客户端连接
# 15672 (Management UI) — 管理界面
# 25672 (Erlang Distribution) — 集群通信
# 检查安全组
$ sudo ufw status
Status: inactive
# 华为云控制台 → 安全组 → 入方向规则:
# 5672 → 0.0.0.0/0 (或限制来源 IP)
# 15672 → 0.0.0.0/0 (或限制来源 IP)
附录B: 一键环境验证脚本
#!/usr/bin/env python3
"""verify_rabbitmq.py — RabbitMQ 环境完整性验证"""
import pika
import sys
RABBITMQ_HOST = '119.3.165.24'
CREDENTIALS = pika.PlainCredentials('admin', 'RabbitMQ@2026')
tests = {
'connection': False,
'queue_declare': False,
'exchange_declare': False,
'publish': False,
'consume': False,
}
try:
# 1. 连接测试
conn = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=CREDENTIALS))
ch = conn.channel()
tests['connection'] = True
print("✅ Connection OK")
# 2. 队列声明
ch.queue_declare(queue='verify_test_queue', durable=False, auto_delete=True)
tests['queue_declare'] = True
print("✅ Queue declare OK")
# 3. 交换机声明
ch.exchange_declare(exchange='verify_test_exchange', exchange_type='fanout', auto_delete=True)
tests['exchange_declare'] = True
print("✅ Exchange declare OK")
# 4. 发布消息
ch.basic_publish(exchange='', routing_key='verify_test_queue', body='verify')
tests['publish'] = True
print("✅ Publish OK")
# 5. 消费消息
method, props, body = ch.basic_get(queue='verify_test_queue', auto_ack=True)
assert body == b'verify', f"Expected 'verify', got {body}"
tests['consume'] = True
print("✅ Consume OK")
# 清理
ch.queue_delete(queue='verify_test_queue')
ch.exchange_delete(exchange='verify_test_exchange')
conn.close()
print(f"\n🎉 All {sum(tests.values())}/{len(tests)} tests passed!")
except Exception as e:
print(f"❌ Test failed: {e}")
sys.exit(1)
文档版本: v1.1 | 创建日期: 2026-06-17 | 更新: 2026-06-17 (全部输出替换为服务器实操数据)
对应实验*: RabbitMQ 官方教程 1-7 (简介 + Hello World + 工作队列 + 发布订阅 + 路由 + 主题 + RPC)
全部输出*: 均来自 ecs-57c4 集群实操, 可直接复现
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)