平时实验报告
·
实验一环境搭建 基础消息收发
原生java依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-experiment</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- RabbitMQ Java Client 依赖 -->
<!-- 这是连接 RabbitMQ 服务器的“驱动”,没有它 Java 就连不上 MQ -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version> <!-- 兼容性最好的版本 -->
</dependency>
<!-- SLF4J 日志实现 (解决红色报错) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>
</project>
producer生产者
package com.example.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接参数 (默认配置)
factory.setHost("localhost"); // 设置 RabbitMQ 所在的主机 IP,本机就是 localhost
factory.setPort(5672); // 设置端口,注意是 5672,不是管理界面的 15672
factory.setUsername("guest"); // 默认用户名
factory.setPassword("guest"); // 默认密码
factory.setVirtualHost("/"); // 设置虚拟机,默认是 /
// 3. 建立连接
Connection connection = factory.newConnection();
// 4. 创建信道 (Channel),这是进行消息操作的主要通道
Channel channel = connection.createChannel();
// 5. 声明(创建)队列
// 参数1:队列名称 "simple_queue"
// 参数2:是否持久化 (false表示重启RabbitMQ后队列消失)
// 参数3:是否排他 (false表示不限制仅当前连接可用)
// 参数4:是否自动删除 (false表示不用完后自动删除)
// 参数5:其他参数
channel.queueDeclare("simple_queue", false, false, false, null);
// 6. 发送消息
String message = "Hello RabbitMQ! 这是第一条消息。";
// 参数1:交换机名称 (空字符串表示使用默认交换机)
// 参数2:路由键 (也就是队列的名字)
// 参数3:其他属性 (null表示默认)
// 参数4:消息体 (必须转为字节数组)
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println(" [x] 已发送消息: '" + message + "'");
// 7. 关闭资源
channel.close();
connection.close();
}
}
consumer消费者
package com.example.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂 (和生产者一样)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// 2. 建立连接
Connection connection = factory.newConnection();
// 3. 创建信道
Channel channel = connection.createChannel();
// 4. 声明队列 (必须和生产者一致,如果队列不存在则创建,存在则直接使用)
channel.queueDeclare("simple_queue", false, false, false, null);
System.out.println(" [*] 等待接收消息...");
// 5. 定义接收到消息后的回调逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// delivery.getBody() 获取的是字节数组,需要转成字符串
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 接收到了消息: '" + message + "'");
};
// 6. 开始消费消息
// 参数1:队列名称
// 参数2:自动确认 (true表示一旦RabbitMQ发出消息,就认为消息已处理,不管消费者是否真的处理完)
// 参数3:消费回调接口
channel.basicConsume("simple_queue", true, deliverCallback, consumerTag -> { });
// 注意:消费者程序不会自动结束,它会一直运行等待消息,所以这里不需要手动关闭连接
}
}

实验二工作模式实践

简单模式
producer
package com.example.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerSimple {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// --- 官方原生连接代码开始 ---
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// 建立连接并创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// --- 官方原生连接代码结束 ---
// 声明队列 (简单模式通常不需要持久化,所以 durable 设为 false)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello Simple Queue!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] 已发送: '" + message + "'");
}
}
}
consumer
package com.example.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerSimple {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// --- 官方原生连接代码开始 ---
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// --- 官方原生连接代码结束 ---
// 声明队列(确保队列存在,和生成者保持一致)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] 等待接收消息。按 CTRL+C 退出...");
// 定义收到消息后的处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 接收到: '" + message + "'");
};
// 开始消费消息 (autoAck=true 表示自动确认收到消息)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
工作模式



两个消费者交替接收消息
producer
package com.example.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; // 1. 导入官方的 ConnectionFactory
public class ProducerWork {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] argv) throws Exception {
// ==========================================
// 2. 下面是替代 ConnectionUtil 的原生代码
// ==========================================
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接参数 (如果你的RabbitMQ装在本地,默认这些即可)
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// 3. 建立连接和通道
// 注意:这里建议用 try-with-resources 语法,或者最后手动 close,防止资源泄露
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// ==========================================
// 3. 声明队列 (保持你原来的逻辑)
// ==========================================
// durable = true 开启持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// ==========================================
// 4. 发送消息 (保持你原来的逻辑)
// ==========================================
for (int i = 0; i < 50; i++) {
String message = "任务数据 " + i + getDots(i);
// basicPublish 参数: (交换机, 路由键/队列名, 其他属性, 消息体字节数组)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] 已发送: '" + message + "'");
// 模拟发送间隔(可选)
// Thread.sleep(100);
}
} // try 结束会自动关闭 channel 和 connection
}
// 辅助方法:模拟任务耗时
private static String getDots(int i) {
StringBuilder dots = new StringBuilder();
int count = (i % 5) + 1;
for (int j = 0; j < count; j++) {
dots.append(".");
}
return dots.toString();
}
}
consumer
package com.example.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerWork {
// 队列名称,必须和生产者保持一致
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// 2. 建立连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明队列 (参数必须和生产者一致,否则会报 406 错误)
// durable=true 表示开启持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 4. 设置公平分发 (Fair Dispatch)
// 如果不加这句,RabbitMQ 可能会一次性把消息发给第一个消费者,导致负载不均衡
channel.basicQos(1);
// 5. 定义收到消息后的回调逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
System.out.println(" [x] Received '" + message + "'");
// 模拟工作耗时:根据消息长度,每个字符休眠 1秒
doWork(message);
System.out.println(" [x] Done");
} finally {
// 6. 手动发送确认 (ACK)
// 告诉 RabbitMQ:我已经处理完了,你可以把这条消息删掉了
// 参数:delivery.getEnvelope().getDeliveryTag() 是消息的唯一ID,false表示只确认当前这一条
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 7. 开始消费
// 参数 autoAck=false 表示关闭自动确认,开启手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
// 模拟任务耗时
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
发布/订阅模式



producer
package com.example.publishsubscribe;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class PubSubProducer {
// 交换机名称
private final static String EXCHANGE_NAME = "logs_fanout";
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 获取连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 声明交换机 (类型: Fanout - 扇出/广播)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 4. 发送消息
String message = "Hello Publish/Subscribe! 这是一条广播消息!";
// 注意:发送到交换机时,routingKey 设为空字符串
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] 生产者已发送: '" + message + "'");
}
}
}
consumer
package com.example.publishsubscribe;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubConsumer {
private final static String EXCHANGE_NAME = "logs_fanout";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 1. 声明交换机 (必须和生产者一致)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 2. 创建临时队列 (队列名由 RabbitMQ 随机生成)
// 参数1: "" (空字符串) 表示随机生成队列名
// 参数2: true 表示 exclusive (排他/临时),连接断开后队列自动删除
String queueName = channel.queueDeclare().getQueue();
// 3. 将队列绑定到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] 消费者等待消息中... 队列名: " + queueName);
// 4. 定义消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 收到: '" + message + "'");
};
// 5. 开始消费
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
路由模式




ReceiveLogsDirect消费者
package com.example.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsDirect {
// 交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 1. 声明交换机 (类型为 Direct)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 2. 创建一个临时队列 (非持久化、独占、自动删除)
String queueName = channel.queueDeclare().getQueue();
// ============================================
// 🔴 关键步骤:绑定键
// 在这里修改你想关注的日志级别,比如 "info", "warning", "error"
// 你可以运行两次这个类,一次填 "info",一次填 "error" 来对比效果
// ============================================
String bindingKey = "error";
// 3. 将队列绑定到交换机,并指定 bindingKey
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println(" [*] 等待接收 [" + bindingKey + "] 级别的消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 收到 '" + bindingKey + "': '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
EmitLogDirect生产者
package com.example.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 1. 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// ============================================
// 🔴 关键步骤:发送消息时指定 RoutingKey (日志级别)
// 这里我们模拟发送不同级别的日志
// ============================================
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {
String message = "这是一条 " + severity + " 级别的日志消息";
// 2. 发送消息,第三个参数是 routingKey
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] 发送 '" + severity + "': '" + message + "'");
}
}
}
}
如何测试
- 运行消费者 A:
- 打开
ReceiveLogsDirect.java。 - 把代码里的
bindingKey改为"error"。 - 运行它。
- 打开
- 运行消费者 B:
- 再次打开
ReceiveLogsDirect.java(或者复制一份)。 - 把代码里的
bindingKey改为"info"。 - 运行它。
- 再次打开
- 运行生产者:
- 运行
EmitLogDirect.java。 - 消费者 A (监听 error) 只会收到 error 消息。
- 消费者 B (监听 info) 只会收到 info 消息。
- warning 消息没人监听,所以谁也不会收到。
- 运行
Topics(主题模式)
怎么玩?
- 运行消费者:先运行
TopicConsumer。 - 观察:你会看到它只接收到了
*.orange.*匹配的消息(比如那只橙色的兔子和大象)。 - 修改通配符:
- 把消费者里的
bindingKey改成lazy.#。 - 重新运行消费者。
- 再运行生产者,你会发现它现在接收到了所有以
lazy开头的消息。
- 把消费者里的
这就是 通配符 的威力!
*(星号):代表一个单词。#(井号):代表零个或多个单词。




producer
package com.example.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Topic 交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 模拟发送不同类型的消息
// routingKey 格式通常为: <业务>.<地区>.<级别>
send(channel, "quick.orange.rabbit", "消息1:一只橙色的兔子");
send(channel, "lazy.orange.elephant", "消息2:一只橙色的懒大象");
send(channel, "quick.orange.fox", "消息3:一只橙色的狐狸");
send(channel, "lazy.brown.fox", "消息4:一只棕色的懒狐狸");
send(channel, "lazy.pink.rabbit", "消息5:一只粉色的懒兔子 (多单词)");
send(channel, "quick.blue.rabbit", "消息6:一只蓝色的兔子");
}
}
private static void send(Channel channel, String routingKey, String message) throws Exception {
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] 发送: '" + routingKey + "' : '" + message + "'");
Thread.sleep(500); // 稍微停顿一下,方便观察
}
}
consumer
package com.example.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumer {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机,类型为 TOPIC
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// --- 关键点:绑定 Routing Key ---
// 这里你可以修改 bindingKey 来测试不同的通配符效果
// *.orange.* : 匹配 3个词,中间那个必须是 orange
// lazy.# : 匹配以 lazy 开头的所有消息
String bindingKey = "*.orange.*"; // 尝试修改这个值看看效果
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println(" [*] 等待 '" + bindingKey + "' 的消息. 按 CTRL+C 退出");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] 接收到 routingKey='" + routingKey + "': '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
实验四springboot整合
基础版
配置 application.properties
- 在项目左侧的文件树中,依次展开:
src->main->resources。 - 找到并双击打开
application.properties文件。 - 清空 文件里的所有内容,复制粘贴以下代码:
properties
编辑
# 服务器端口
server.port=8080
# RabbitMQ 配置
# 服务器地址,默认本机
spring.rabbitmq.host=localhost
# 端口号,默认 5672
spring.rabbitmq.port=5672
# 用户名,默认 guest
spring.rabbitmq.username=guest
# 密码,根据你提供的信息,这里使用 guest
spring.rabbitmq.password=guest
# 虚拟主机,默认 /
spring.rabbitmq.virtual-host=/
注意: 这里我严格按照你的要求,将 password 设置为了 guest。
第四步:创建消息生产者(Sender)
我们需要一个类来负责发送消息。
- 在左侧
java目录下,找到包com.example.lab4rabbitmq。 - 右键点击 这个包 -> New -> Java Class。
- 类名输入:
RabbitSender。 - 将以下代码完整复制进去(覆盖原有代码):
java
编辑
package com.example.lab4rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
// 自动注入 RabbitTemplate,它是发送消息的核心工具类
@Autowired
private RabbitTemplate rabbitTemplate;
// 定义队列名称,必须和配置类中的队列名一致
final static String queueName = "hello-queue";
// 发送消息的方法
public void send(String message) {
// convertAndSend(队列名, 消息内容)
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("已发送消息: " + message);
}
}
第五步:创建消息消费者(Receiver)
我们需要一个类来负责接收消息。
- 同样在
com.example.lab4rabbitmq包下。 - 右键点击 -> New -> Java Class。
- 类名输入:
RabbitReceiver。 - 复制以下代码:
java
编辑
package com.example.lab4rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// @RabbitListener 注解用于监听指定的队列
@RabbitListener(queues = "hello-queue")
public class RabbitReceiver {
// @RabbitHandler 注解用于处理接收到的消息
@RabbitHandler
public void process(String message) {
System.out.println("接收者收到消息: " + message);
}
}
第六步:配置队列(Config)
Spring Boot 不会自动创建队列,我们需要显式地告诉容器创建一个名为 "hello-queue" 的队列。
- 在
com.example.lab4rabbitmq包下。 - 右键点击 -> New -> Java Class。
- 类名输入:
RabbitConfig。 - 复制以下代码:
java
编辑
package com.example.lab4rabbitmq;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
@Configuration
public class RabbitConfig {
// 定义队列名称常量
public static final String QUEUE_NAME = "hello-queue";
// 创建一个队列 Bean,Spring Boot 启动时会自动注册这个队列到 RabbitMQ 服务器
@Bean
public Queue queue() {
// new Queue(队列名称, 是否持久化)
return new Queue(QUEUE_NAME, true);
}
}
第七步:编写测试控制器(Controller)
为了方便在浏览器里测试发送消息,我们写一个简单的 Web 接口。
- 在
com.example.lab4rabbitmq包下。 - 右键点击 -> New -> Java Class。
- 类名输入:
HelloController。 - 复制以下代码:
java
编辑
package com.example.lab4rabbitmq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping("/send")
public String sendMsg() {
String msg = "Hello RabbitMQ!";
rabbitSender.send(msg);
return "消息已发送: " + msg;
}
}
第八步:运行与测试
现在所有代码都写好了,让我们来运行它。
-
启动程序:
- 找到
src->main->java->com.example.lab4rabbitmq下的Lab4RabbitmqApplication.java(名字可能略有不同,带有Application后缀的主类)。 - 点击代码左侧绿色的三角形图标,选择 Run。
- 等待控制台输出,直到看到
Tomcat started on port 8080之类的字样,说明启动成功。
- 找到
-
测试发送:
- 打开你的浏览器(Chrome/Edge)。
- 在地址栏输入:
http://localhost:8080/send并回车。 - 页面应该显示:
消息已发送: Hello RabbitMQ!
-
查看接收结果:
- 回到 IDEA 的 控制台(Console) 窗口。
- 你应该能看到类似这样的输出:
已发送消息: Hello RabbitMQ!
接收者收到消息: Hello RabbitMQ!
如果控制台出现了这两行日志,恭喜你!你的 Spring Boot 整合 RabbitMQ 实验成功了!
进阶版
通常实验文档(如你之前提到的《实验 4》)后面还会有更高级的内容,比如:
- 对象传输:如何发送一个 Java 对象(User 类)而不是简单的字符串?(通常需要实现
Serializable接口)。 - 不同交换机模式:
- Fanout(广播模式):一个消息发给多个消费者。
- Direct(路由模式):根据 key 选择性接收。
- Topic(主题模式):模糊匹配 key。
-
既然你要“都来”,那我们就一次性把 RabbitMQ 最核心的 对象传输 和 三种交换机模式(广播、路由、主题)全部搞定。
为了方便管理,我会把代码拆分成不同的包结构,这样你的项目会很清晰。senduser会报错

rabbitConfig
package com.example.lab4rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
// ==================== 1. 简单队列 (之前的 Hello) ====================
public static final String SIMPLE_QUEUE = "simple.queue";
// ==================== 2. 对象传输队列 ====================
public static final String OBJECT_QUEUE = "object.queue";
// ==================== 3. Fanout (广播模式) ====================
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE_A = "fanout.queue.a";
public static final String FANOUT_QUEUE_B = "fanout.queue.b";
// ==================== 4. Direct (路由模式) ====================
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE_RED = "direct.queue.red";
public static final String DIRECT_QUEUE_GREEN = "direct.queue.green";
// ==================== 5. Topic (主题模式) ====================
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE_MSG = "topic.queue.msg";
public static final String TOPIC_QUEUE_NEWS = "topic.queue.news";
// --- 声明队列 ---
@Bean
public Queue simpleQueue() { return new Queue(SIMPLE_QUEUE); }
@Bean
public Queue objectQueue() { return new Queue(OBJECT_QUEUE); }
@Bean
public Queue fanoutQueueA() { return new Queue(FANOUT_QUEUE_A); }
@Bean
public Queue fanoutQueueB() { return new Queue(FANOUT_QUEUE_B); }
@Bean
public Queue directQueueRed() { return new Queue(DIRECT_QUEUE_RED); }
@Bean
public Queue directQueueGreen() { return new Queue(DIRECT_QUEUE_GREEN); }
@Bean
public Queue topicQueueMsg() { return new Queue(TOPIC_QUEUE_MSG); }
@Bean
public Queue topicQueueNews() { return new Queue(TOPIC_QUEUE_NEWS); }
// --- 声明交换机 ---
@Bean
public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); }
@Bean
public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); }
@Bean
public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); }
// --- 绑定 (Binding) ---
// Fanout 绑定 (不需要 routingKey)
@Bean
public Binding bindingFanoutA(FanoutExchange fanoutExchange, Queue fanoutQueueA) {
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}
@Bean
public Binding bindingFanoutB(FanoutExchange fanoutExchange, Queue fanoutQueueB) {
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}
// Direct 绑定 (需要 routingKey)
@Bean
public Binding bindingDirectRed(DirectExchange directExchange, Queue directQueueRed) {
return BindingBuilder.bind(directQueueRed).to(directExchange).with("red");
}
@Bean
public Binding bindingDirectGreen(DirectExchange directExchange, Queue directQueueGreen) {
return BindingBuilder.bind(directQueueGreen).to(directExchange).with("green");
}
// Topic 绑定 (模糊匹配 # 匹配多个单词, * 匹配一个单词)
@Bean
public Binding bindingTopicMsg(TopicExchange topicExchange, Queue topicQueueMsg) {
return BindingBuilder.bind(topicQueueMsg).to(topicExchange).with("log.info"); // 精确匹配 info
}
@Bean
public Binding bindingTopicNews(TopicExchange topicExchange, Queue topicQueueNews) {
return BindingBuilder.bind(topicQueueNews).to(topicExchange).with("news.#"); // 匹配 news 开头的所有
}
}
rabbitController
package com.example.lab4rabbitmq.controller;
import com.example.lab4rabbitmq.User;
import com.example.lab4rabbitmq.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1. 发送对象
@GetMapping("/sendObject")
public String sendObject() {
User user = new User("张三", 25);
rabbitTemplate.convertAndSend(RabbitConfig.OBJECT_QUEUE, user);
return "已发送对象: " + user.toString();
}
// 2. 发送 Fanout (广播)
@GetMapping("/sendFanout")
public String sendFanout() {
String msg = "这是广播消息,大家都要听!";
// 第一个参数是交换机名,第二个是 routingKey (Fanout模式下为空)
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", msg);
return "已发送广播消息";
}
// 3. 发送 Direct (路由)
@GetMapping("/sendDirect")
public String sendDirect() {
String msgRed = "红色警报!";
String msgGreen = "绿色通行。";
// 发送给 Direct 交换机,指定 routingKey 为 "red"
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "red", msgRed);
// 发送给 Direct 交换机,指定 routingKey 为 "green"
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "green", msgGreen);
return "已发送 Direct 消息 (Red & Green)";
}
// 4. 发送 Topic (主题)
@GetMapping("/sendTopic")
public String sendTopic() {
String msg1 = "这是一条 info 日志";
String msg2 = "这是一条 news 体育新闻";
String msg3 = "这是一条 news 国际新闻";
// 匹配 log.info -> 只有 Msg 消费者能收到
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "log.info", msg1);
// 匹配 news.# -> News 消费者能收到
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "news.sports", msg2);
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "news.world", msg3);
return "已发送 Topic 消息";
}
}
allreceivers
package com.example.lab4rabbitmq.receiver;
import com.example.lab4rabbitmq.User;
import com.example.lab4rabbitmq.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AllReceivers {
// 1. 接收对象
@RabbitListener(queues = RabbitConfig.OBJECT_QUEUE)
public void receiveObject(User user) {
System.out.println("【对象消费者】收到用户: " + user.toString());
}
// 2. Fanout 消费者 A
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_A)
public void receiveFanoutA(String msg) {
System.out.println("【Fanout消费者A】收到广播: " + msg);
}
// 3. Fanout 消费者 B
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_B)
public void receiveFanoutB(String msg) {
System.out.println("【Fanout消费者B】收到广播: " + msg);
}
// 4. Direct 消费者 (Red)
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_RED)
public void receiveDirectRed(String msg) {
System.out.println("【Direct消费者-红色】收到消息: " + msg);
}
// 5. Direct 消费者 (Green)
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_GREEN)
public void receiveDirectGreen(String msg) {
System.out.println("【Direct消费者-绿色】收到消息: " + msg);
}
// 6. Topic 消费者 (Msg)
@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_MSG)
public void receiveTopicMsg(String msg) {
System.out.println("【Topic消费者-Info】收到消息: " + msg);
}
// 7. Topic 消费者 (News)
@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_NEWS)
public void receiveTopicNews(String msg) {
System.out.println("【Topic消费者-News】收到消息: " + msg);
}
}
rabbitreceivers
package com.example.lab4rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// @RabbitListener 注解用于监听指定的队列
@RabbitListener(queues = "hello-queue")
public class RabbitReceiver {
// @RabbitHandler 注解用于处理接收到的消息
@RabbitHandler
public void process(String message) {
System.out.println("接收者收到消息: " + message);
}
}
rabbitsender
package com.example.lab4rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.lab4rabbitmq.config.RabbitConfig; // 新增导入
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1. 发送普通字符串
public void send(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE, message);
System.out.println("已发送字符串: " + message);
}
// 2. 发送对象
public void sendObject(User user) {
rabbitTemplate.convertAndSend(RabbitConfig.OBJECT_QUEUE, user);
System.out.println("已发送对象: " + user);
}
// 3. 发送 Direct 消息
public void sendDirect(String message, String routingKey) {
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, routingKey, message);
System.out.println("已发送 Direct 消息: " + message + " (路由键: " + routingKey + ")");
}
// 4. 发送 Topic 消息
public void sendTopic(String message, String routingKey) {
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, routingKey, message);
System.out.println("已发送 Topic 消息: " + message + " (路由键: " + routingKey + ")");
}
// 5. 发送 Fanout 消息
public void sendFanout(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", message);
System.out.println("已发送 Fanout 广播消息: " + message);
}
}
user
package com.example.lab4rabbitmq;
import java.io.Serializable;
// 必须实现 Serializable 接口
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private Integer age;
// 必须有无参构造
public User() {}
public User(String name, Integer age) {
this.name = name;
this.age = age;
}
// Getter 和 Setter
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>4.0.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>lab4-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>lab4-rabbitmq</name>
<description>lab4-rabbitmq</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 放在 <dependencies> 标签里面 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
依赖解释
<!-- Spring AMQP Starter -->
<!-- 作用:这是 Spring 对 AMQP(高级消息队列协议)的标准实现。 -->
<!-- 它自动配置了 RabbitMQ 的连接工厂、RabbitTemplate(用于发送/接收消息)以及监听容器。 -->
<!-- 没有它,你就得手动写一堆代码来连接 RabbitMQ。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Spring Web MVC Starter -->
<!-- 作用:构建 Web 应用的核心依赖。 -->
<!-- 它包含了 Spring MVC 框架(用于处理 HTTP 请求)、Tomcat 嵌入式服务器(默认)、以及 JSON 转换器。 -->
<!-- 这让你能通过 @RestController 写接口,接收前端请求。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>
<!-- Spring AMQP Test Starter -->
<!-- 作用:专门用于测试的依赖(仅在测试阶段生效 scope="test")。 -->
<!-- 它提供了模拟(Mock)RabbitMQ 的工具,让你在不启动真实 RabbitMQ 服务的情况下也能运行单元测试。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Web Test Starter -->
<!-- 作用:Web 层测试依赖。 -->
<!-- 它包含了 Spring Test 模块和 JUnit,允许你进行 MockMvc 测试,模拟 HTTP 请求来测试你的 Controller 接口。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot JSON Starter -->
<!-- 作用:JSON 处理支持。 -->
<!-- 虽然 Web 依赖通常自带 JSON 转换,但显式引入这个依赖可以确保 Jackson 库被包含进来。 -->
<!-- 在 RabbitMQ 中,我们通常不直接发送 Java 对象(会有反序列化安全问题),而是将对象转换为 JSON 字符串发送,这个依赖就是干这个的。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
进阶依赖推荐清单
虽然你的项目现在能跑起来了,但为了开发更高效、功能更完善,以下依赖在实际开发中非常常用。你可以根据 2026 年的开发趋势(如云原生、响应式编程)进行选择:
1. 开发效率神器
- Lombok
- 作用:通过注解自动生成
getter、setter、toString、构造函数等样板代码。 - 场景:你的
User类实体如果不用 Lombok,需要写几十行冗余代码;用了之后只需要几个注解。 - 坐标:xml
编辑
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency>
- 作用:通过注解自动生成
2. 数据存储 (如果你需要存数据)
- Spring Data JPA / Spring Data MongoDB
- 作用:简化数据库操作。RabbitMQ 负责“传输”,通常需要数据库来“存储”最终结果。
- 场景:接收消息后,将用户数据保存到 MySQL 或 MongoDB 中。
- 坐标 (以 MySQL 为例):xml
编辑
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-j</artifactId> </dependency>
3. 测试增强 (现代开发推荐)
- Testcontainers
- 作用:在 Docker 容器中启动真实的数据库或中间件进行测试。
- 场景:之前的
spring-boot-starter-amqp-test是“模拟”RabbitMQ。如果你想做集成测试(Integration Test),确保代码能和真实的 RabbitMQ 交互,Testcontainers 是 2026 年的标准做法。 - 坐标:xml
编辑
<dependency> <groupId>org.testcontainers</groupId> <artifactId>rabbitmq</artifactId> <scope>test</scope> </dependency>
4. 系统监控与运维
- Spring Boot Actuator
- 作用:为你的应用提供“健康检查”、“指标监控”、“环境信息”等端点。
- 场景:在生产环境中,运维人员需要知道 RabbitMQ 连接是否正常、系统内存是否充足。
- 坐标:xml
编辑
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
5. 缓存加速
- Spring Data Redis
- 作用:集成 Redis 缓存。
- 场景:RabbitMQ 处理消息可能有延迟,或者消息内容是高频读取的配置信息。将处理结果或配置缓存到 Redis 中,可以极大提升前端接口(Web MVC)的响应速度。
- 坐标:xml
编辑
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
application
# RabbitMQ 连接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ # 消息确认配置 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true # 消息消费确认配置 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.amqp.deserialization.trust.all=true
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)