实验一环境搭建 基础消息收发

原生java依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-experiment</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- RabbitMQ Java Client 依赖 -->
        <!-- 这是连接 RabbitMQ 服务器的“驱动”,没有它 Java 就连不上 MQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version> <!-- 兼容性最好的版本 -->
        </dependency>

        <!-- SLF4J 日志实现 (解决红色报错) -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
        </dependency>
    </dependencies>
</project>

producer生产者

package com.example.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 2. 设置连接参数 (默认配置)
        factory.setHost("localhost"); // 设置 RabbitMQ 所在的主机 IP,本机就是 localhost
        factory.setPort(5672);        // 设置端口,注意是 5672,不是管理界面的 15672
        factory.setUsername("guest"); // 默认用户名
        factory.setPassword("guest"); // 默认密码
        factory.setVirtualHost("/");  // 设置虚拟机,默认是 /

        // 3. 建立连接
        Connection connection = factory.newConnection();

        // 4. 创建信道 (Channel),这是进行消息操作的主要通道
        Channel channel = connection.createChannel();

        // 5. 声明(创建)队列
        // 参数1:队列名称 "simple_queue"
        // 参数2:是否持久化 (false表示重启RabbitMQ后队列消失)
        // 参数3:是否排他 (false表示不限制仅当前连接可用)
        // 参数4:是否自动删除 (false表示不用完后自动删除)
        // 参数5:其他参数
        channel.queueDeclare("simple_queue", false, false, false, null);

        // 6. 发送消息
        String message = "Hello RabbitMQ! 这是第一条消息。";

        // 参数1:交换机名称 (空字符串表示使用默认交换机)
        // 参数2:路由键 (也就是队列的名字)
        // 参数3:其他属性 (null表示默认)
        // 参数4:消息体 (必须转为字节数组)
        channel.basicPublish("", "simple_queue", null, message.getBytes());

        System.out.println(" [x] 已发送消息: '" + message + "'");

        // 7. 关闭资源
        channel.close();
        connection.close();
    }
}

consumer消费者

package com.example.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂 (和生产者一样)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        // 2. 建立连接
        Connection connection = factory.newConnection();

        // 3. 创建信道
        Channel channel = connection.createChannel();

        // 4. 声明队列 (必须和生产者一致,如果队列不存在则创建,存在则直接使用)
        channel.queueDeclare("simple_queue", false, false, false, null);

        System.out.println(" [*] 等待接收消息...");

        // 5. 定义接收到消息后的回调逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // delivery.getBody() 获取的是字节数组,需要转成字符串
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收到了消息: '" + message + "'");
        };

        // 6. 开始消费消息
        // 参数1:队列名称
        // 参数2:自动确认 (true表示一旦RabbitMQ发出消息,就认为消息已处理,不管消费者是否真的处理完)
        // 参数3:消费回调接口
        channel.basicConsume("simple_queue", true, deliverCallback, consumerTag -> { });

        // 注意:消费者程序不会自动结束,它会一直运行等待消息,所以这里不需要手动关闭连接
    }
}

实验二工作模式实践

简单模式

producer

package com.example.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerSimple {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // --- 官方原生连接代码开始 ---
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        // 建立连接并创建通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // --- 官方原生连接代码结束 ---

            // 声明队列 (简单模式通常不需要持久化,所以 durable 设为 false)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 发送消息
            String message = "Hello Simple Queue!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] 已发送: '" + message + "'");
        }
    }
}

consumer

package com.example.simple;

import com.rabbitmq.client.*;
import java.io.IOException;

public class ConsumerSimple {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // --- 官方原生连接代码开始 ---
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // --- 官方原生连接代码结束 ---

        // 声明队列(确保队列存在,和生成者保持一致)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] 等待接收消息。按 CTRL+C 退出...");

        // 定义收到消息后的处理逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收到: '" + message + "'");
        };

        // 开始消费消息 (autoAck=true 表示自动确认收到消息)
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

工作模式

两个消费者交替接收消息

producer

package com.example.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; // 1. 导入官方的 ConnectionFactory

public class ProducerWork {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] argv) throws Exception {
        // ==========================================
        // 2. 下面是替代 ConnectionUtil 的原生代码
        // ==========================================

        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 2. 设置连接参数 (如果你的RabbitMQ装在本地,默认这些即可)
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        // 3. 建立连接和通道
        // 注意:这里建议用 try-with-resources 语法,或者最后手动 close,防止资源泄露
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // ==========================================
            // 3. 声明队列 (保持你原来的逻辑)
            // ==========================================
            // durable = true 开启持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // ==========================================
            // 4. 发送消息 (保持你原来的逻辑)
            // ==========================================
            for (int i = 0; i < 50; i++) {
                String message = "任务数据 " + i + getDots(i);

                // basicPublish 参数: (交换机, 路由键/队列名, 其他属性, 消息体字节数组)
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] 已发送: '" + message + "'");

                // 模拟发送间隔(可选)
                // Thread.sleep(100);
            }
        } // try 结束会自动关闭 channel 和 connection
    }

    // 辅助方法:模拟任务耗时
    private static String getDots(int i) {
        StringBuilder dots = new StringBuilder();
        int count = (i % 5) + 1;
        for (int j = 0; j < count; j++) {
            dots.append(".");
        }
        return dots.toString();
    }
}

consumer

package com.example.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerWork {
    // 队列名称,必须和生产者保持一致
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        // 2. 建立连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明队列 (参数必须和生产者一致,否则会报 406 错误)
        // durable=true 表示开启持久化
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 4. 设置公平分发 (Fair Dispatch)
        // 如果不加这句,RabbitMQ 可能会一次性把消息发给第一个消费者,导致负载不均衡
        channel.basicQos(1);

        // 5. 定义收到消息后的回调逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            try {
                System.out.println(" [x] Received '" + message + "'");
                // 模拟工作耗时:根据消息长度,每个字符休眠 1秒
                doWork(message);
                System.out.println(" [x] Done");
            } finally {
                // 6. 手动发送确认 (ACK)
                // 告诉 RabbitMQ:我已经处理完了,你可以把这条消息删掉了
                // 参数:delivery.getEnvelope().getDeliveryTag() 是消息的唯一ID,false表示只确认当前这一条
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        // 7. 开始消费
        // 参数 autoAck=false 表示关闭自动确认,开启手动确认模式
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

    // 模拟任务耗时
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

发布/订阅模式

producer

package com.example.publishsubscribe;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PubSubProducer {
    // 交换机名称
    private final static String EXCHANGE_NAME = "logs_fanout";

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 2. 获取连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 3. 声明交换机 (类型: Fanout - 扇出/广播)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            // 4. 发送消息
            String message = "Hello Publish/Subscribe! 这是一条广播消息!";
            // 注意:发送到交换机时,routingKey 设为空字符串
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

            System.out.println(" [x] 生产者已发送: '" + message + "'");
        }
    }
}

consumer

package com.example.publishsubscribe;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PubSubConsumer {
    private final static String EXCHANGE_NAME = "logs_fanout";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 1. 声明交换机 (必须和生产者一致)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        // 2. 创建临时队列 (队列名由 RabbitMQ 随机生成)
        // 参数1: "" (空字符串) 表示随机生成队列名
        // 参数2: true 表示 exclusive (排他/临时),连接断开后队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 3. 将队列绑定到交换机
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] 消费者等待消息中... 队列名: " + queueName);

        // 4. 定义消息回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 收到: '" + message + "'");
        };

        // 5. 开始消费
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

路由模式

ReceiveLogsDirect消费者

package com.example.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsDirect {
    // 交换机名称
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 1. 声明交换机 (类型为 Direct)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 2. 创建一个临时队列 (非持久化、独占、自动删除)
        String queueName = channel.queueDeclare().getQueue();

        // ============================================
        // 🔴 关键步骤:绑定键
        // 在这里修改你想关注的日志级别,比如 "info", "warning", "error"
        // 你可以运行两次这个类,一次填 "info",一次填 "error" 来对比效果
        // ============================================
        String bindingKey = "error";

        // 3. 将队列绑定到交换机,并指定 bindingKey
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        System.out.println(" [*] 等待接收 [" + bindingKey + "] 级别的消息...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 收到 '" + bindingKey + "': '" + message + "'");
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

EmitLogDirect生产者

package com.example.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 1. 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // ============================================
            // 🔴 关键步骤:发送消息时指定 RoutingKey (日志级别)
            // 这里我们模拟发送不同级别的日志
            // ============================================

            String[] severities = {"info", "warning", "error"};

            for (String severity : severities) {
                String message = "这是一条 " + severity + " 级别的日志消息";
                // 2. 发送消息,第三个参数是 routingKey
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
                System.out.println(" [x] 发送 '" + severity + "': '" + message + "'");
            }
        }
    }
}

如何测试

  1. 运行消费者 A
    • 打开 ReceiveLogsDirect.java
    • 把代码里的 bindingKey 改为 "error"
    • 运行它。
  2. 运行消费者 B
    • 再次打开 ReceiveLogsDirect.java (或者复制一份)。
    • 把代码里的 bindingKey 改为 "info"
    • 运行它。
  3. 运行生产者
    • 运行 EmitLogDirect.java
    • 消费者 A (监听 error) 只会收到 error 消息。
    • 消费者 B (监听 info) 只会收到 info 消息。
    • warning 消息没人监听,所以谁也不会收到。

Topics(主题模式)

怎么玩?

  1. 运行消费者:先运行 TopicConsumer
  2. 观察:你会看到它只接收到了 *.orange.* 匹配的消息(比如那只橙色的兔子和大象)。
  3. 修改通配符
    • 把消费者里的 bindingKey 改成 lazy.#
    • 重新运行消费者。
    • 再运行生产者,你会发现它现在接收到了所有以 lazy 开头的消息。

这就是 通配符 的威力!

  • * (星号):代表一个单词。
  • # (井号):代表零个或多个单词。

producer

package com.example.topics;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TopicProducer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Topic 交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            // 模拟发送不同类型的消息
            // routingKey 格式通常为: <业务>.<地区>.<级别>

            send(channel, "quick.orange.rabbit", "消息1:一只橙色的兔子");
            send(channel, "lazy.orange.elephant", "消息2:一只橙色的懒大象");
            send(channel, "quick.orange.fox", "消息3:一只橙色的狐狸");
            send(channel, "lazy.brown.fox", "消息4:一只棕色的懒狐狸");
            send(channel, "lazy.pink.rabbit", "消息5:一只粉色的懒兔子 (多单词)");
            send(channel, "quick.blue.rabbit", "消息6:一只蓝色的兔子");

        }
    }

    private static void send(Channel channel, String routingKey, String message) throws Exception {
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] 发送: '" + routingKey + "' : '" + message + "'");
        Thread.sleep(500); // 稍微停顿一下,方便观察
    }
}

consumer

package com.example.topics;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明交换机,类型为 TOPIC
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();

        // --- 关键点:绑定 Routing Key ---
        // 这里你可以修改 bindingKey 来测试不同的通配符效果
        // *.orange.* : 匹配 3个词,中间那个必须是 orange
        // lazy.# : 匹配以 lazy 开头的所有消息

        String bindingKey = "*.orange.*"; // 尝试修改这个值看看效果

        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        System.out.println(" [*] 等待 '" + bindingKey + "' 的消息. 按 CTRL+C 退出");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [x] 接收到 routingKey='" + routingKey + "': '" + message + "'");
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

实验四springboot整合

基础版

配置 application.properties

  1. 在项目左侧的文件树中,依次展开:src -> main -> resources
  2. 找到并双击打开 application.properties 文件。
  3. 清空 文件里的所有内容,复制粘贴以下代码:

properties

编辑

# 服务器端口
server.port=8080

# RabbitMQ 配置
# 服务器地址,默认本机
spring.rabbitmq.host=localhost
# 端口号,默认 5672
spring.rabbitmq.port=5672
# 用户名,默认 guest
spring.rabbitmq.username=guest
# 密码,根据你提供的信息,这里使用 guest
spring.rabbitmq.password=guest
# 虚拟主机,默认 /
spring.rabbitmq.virtual-host=/

注意: 这里我严格按照你的要求,将 password 设置为了 guest


第四步:创建消息生产者(Sender)

我们需要一个类来负责发送消息。

  1. 在左侧 java 目录下,找到包 com.example.lab4rabbitmq
  2. 右键点击 这个包 -> New -> Java Class
  3. 类名输入:RabbitSender
  4. 将以下代码完整复制进去(覆盖原有代码):

java

编辑

package com.example.lab4rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    // 自动注入 RabbitTemplate,它是发送消息的核心工具类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 定义队列名称,必须和配置类中的队列名一致
    final static String queueName = "hello-queue";

    // 发送消息的方法
    public void send(String message) {
        // convertAndSend(队列名, 消息内容)
        rabbitTemplate.convertAndSend(queueName, message);
        System.out.println("已发送消息: " + message);
    }
}

第五步:创建消息消费者(Receiver)

我们需要一个类来负责接收消息。

  1. 同样在 com.example.lab4rabbitmq 包下。
  2. 右键点击 -> New -> Java Class
  3. 类名输入:RabbitReceiver
  4. 复制以下代码:

java

编辑

package com.example.lab4rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// @RabbitListener 注解用于监听指定的队列
@RabbitListener(queues = "hello-queue")
public class RabbitReceiver {

    // @RabbitHandler 注解用于处理接收到的消息
    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者收到消息: " + message);
    }
}

第六步:配置队列(Config)

Spring Boot 不会自动创建队列,我们需要显式地告诉容器创建一个名为 "hello-queue" 的队列。

  1. 在 com.example.lab4rabbitmq 包下。
  2. 右键点击 -> New -> Java Class
  3. 类名输入:RabbitConfig
  4. 复制以下代码:

java

编辑

package com.example.lab4rabbitmq;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;

@Configuration
public class RabbitConfig {

    // 定义队列名称常量
    public static final String QUEUE_NAME = "hello-queue";

    // 创建一个队列 Bean,Spring Boot 启动时会自动注册这个队列到 RabbitMQ 服务器
    @Bean
    public Queue queue() {
        // new Queue(队列名称, 是否持久化)
        return new Queue(QUEUE_NAME, true);
    }
}

第七步:编写测试控制器(Controller)

为了方便在浏览器里测试发送消息,我们写一个简单的 Web 接口。

  1. 在 com.example.lab4rabbitmq 包下。
  2. 右键点击 -> New -> Java Class
  3. 类名输入:HelloController
  4. 复制以下代码:

java

编辑

package com.example.lab4rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private RabbitSender rabbitSender;

    @GetMapping("/send")
    public String sendMsg() {
        String msg = "Hello RabbitMQ!";
        rabbitSender.send(msg);
        return "消息已发送: " + msg;
    }
}

第八步:运行与测试

现在所有代码都写好了,让我们来运行它。

  1. 启动程序

    • 找到 src -> main -> java -> com.example.lab4rabbitmq 下的 Lab4RabbitmqApplication.java(名字可能略有不同,带有 Application 后缀的主类)。
    • 点击代码左侧绿色的三角形图标,选择 Run
    • 等待控制台输出,直到看到 Tomcat started on port 8080 之类的字样,说明启动成功。
  2. 测试发送

    • 打开你的浏览器(Chrome/Edge)。
    • 在地址栏输入:http://localhost:8080/send 并回车。
    • 页面应该显示:消息已发送: Hello RabbitMQ!
  3. 查看接收结果

    • 回到 IDEA 的 控制台(Console) 窗口。
    • 你应该能看到类似这样的输出:

      已发送消息: Hello RabbitMQ!
      接收者收到消息: Hello RabbitMQ!

如果控制台出现了这两行日志,恭喜你!你的 Spring Boot 整合 RabbitMQ 实验成功了!

进阶版

通常实验文档(如你之前提到的《实验 4》)后面还会有更高级的内容,比如:

  • 对象传输:如何发送一个 Java 对象(User 类)而不是简单的字符串?(通常需要实现 Serializable 接口)。
  • 不同交换机模式
    • Fanout(广播模式):一个消息发给多个消费者。
    • Direct(路由模式):根据 key 选择性接收。
    • Topic(主题模式):模糊匹配 key。
  • 既然你要“都来”,那我们就一次性把 RabbitMQ 最核心的 对象传输 和 三种交换机模式(广播、路由、主题)全部搞定。

    为了方便管理,我会把代码拆分成不同的包结构,这样你的项目会很清晰。senduser会报错

rabbitConfig

package com.example.lab4rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    // ==================== 1. 简单队列 (之前的 Hello) ====================
    public static final String SIMPLE_QUEUE = "simple.queue";

    // ==================== 2. 对象传输队列 ====================
    public static final String OBJECT_QUEUE = "object.queue";

    // ==================== 3. Fanout (广播模式) ====================
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE_A = "fanout.queue.a";
    public static final String FANOUT_QUEUE_B = "fanout.queue.b";

    // ==================== 4. Direct (路由模式) ====================
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE_RED = "direct.queue.red";
    public static final String DIRECT_QUEUE_GREEN = "direct.queue.green";

    // ==================== 5. Topic (主题模式) ====================
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE_MSG = "topic.queue.msg";
    public static final String TOPIC_QUEUE_NEWS = "topic.queue.news";

    // --- 声明队列 ---
    @Bean
    public Queue simpleQueue() { return new Queue(SIMPLE_QUEUE); }

    @Bean
    public Queue objectQueue() { return new Queue(OBJECT_QUEUE); }

    @Bean
    public Queue fanoutQueueA() { return new Queue(FANOUT_QUEUE_A); }

    @Bean
    public Queue fanoutQueueB() { return new Queue(FANOUT_QUEUE_B); }

    @Bean
    public Queue directQueueRed() { return new Queue(DIRECT_QUEUE_RED); }

    @Bean
    public Queue directQueueGreen() { return new Queue(DIRECT_QUEUE_GREEN); }

    @Bean
    public Queue topicQueueMsg() { return new Queue(TOPIC_QUEUE_MSG); }

    @Bean
    public Queue topicQueueNews() { return new Queue(TOPIC_QUEUE_NEWS); }

    // --- 声明交换机 ---
    @Bean
    public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); }

    @Bean
    public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); }

    @Bean
    public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); }

    // --- 绑定 (Binding) ---

    // Fanout 绑定 (不需要 routingKey)
    @Bean
    public Binding bindingFanoutA(FanoutExchange fanoutExchange, Queue fanoutQueueA) {
        return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
    }
    @Bean
    public Binding bindingFanoutB(FanoutExchange fanoutExchange, Queue fanoutQueueB) {
        return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
    }

    // Direct 绑定 (需要 routingKey)
    @Bean
    public Binding bindingDirectRed(DirectExchange directExchange, Queue directQueueRed) {
        return BindingBuilder.bind(directQueueRed).to(directExchange).with("red");
    }
    @Bean
    public Binding bindingDirectGreen(DirectExchange directExchange, Queue directQueueGreen) {
        return BindingBuilder.bind(directQueueGreen).to(directExchange).with("green");
    }

    // Topic 绑定 (模糊匹配 # 匹配多个单词, * 匹配一个单词)
    @Bean
    public Binding bindingTopicMsg(TopicExchange topicExchange, Queue topicQueueMsg) {
        return BindingBuilder.bind(topicQueueMsg).to(topicExchange).with("log.info"); // 精确匹配 info
    }
    @Bean
    public Binding bindingTopicNews(TopicExchange topicExchange, Queue topicQueueNews) {
        return BindingBuilder.bind(topicQueueNews).to(topicExchange).with("news.#"); // 匹配 news 开头的所有
    }
}

rabbitController

package com.example.lab4rabbitmq.controller;

import com.example.lab4rabbitmq.User;
import com.example.lab4rabbitmq.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 1. 发送对象
    @GetMapping("/sendObject")
    public String sendObject() {
        User user = new User("张三", 25);
        rabbitTemplate.convertAndSend(RabbitConfig.OBJECT_QUEUE, user);
        return "已发送对象: " + user.toString();
    }

    // 2. 发送 Fanout (广播)
    @GetMapping("/sendFanout")
    public String sendFanout() {
        String msg = "这是广播消息,大家都要听!";
        // 第一个参数是交换机名,第二个是 routingKey (Fanout模式下为空)
        rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", msg);
        return "已发送广播消息";
    }

    // 3. 发送 Direct (路由)
    @GetMapping("/sendDirect")
    public String sendDirect() {
        String msgRed = "红色警报!";
        String msgGreen = "绿色通行。";
        // 发送给 Direct 交换机,指定 routingKey 为 "red"
        rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "red", msgRed);
        // 发送给 Direct 交换机,指定 routingKey 为 "green"
        rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "green", msgGreen);
        return "已发送 Direct 消息 (Red & Green)";
    }

    // 4. 发送 Topic (主题)
    @GetMapping("/sendTopic")
    public String sendTopic() {
        String msg1 = "这是一条 info 日志";
        String msg2 = "这是一条 news 体育新闻";
        String msg3 = "这是一条 news 国际新闻";

        // 匹配 log.info -> 只有 Msg 消费者能收到
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "log.info", msg1);

        // 匹配 news.# -> News 消费者能收到
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "news.sports", msg2);
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "news.world", msg3);

        return "已发送 Topic 消息";
    }
}

allreceivers

package com.example.lab4rabbitmq.receiver;

import com.example.lab4rabbitmq.User;
import com.example.lab4rabbitmq.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class AllReceivers {

    // 1. 接收对象
    @RabbitListener(queues = RabbitConfig.OBJECT_QUEUE)
    public void receiveObject(User user) {
        System.out.println("【对象消费者】收到用户: " + user.toString());
    }

    // 2. Fanout 消费者 A
    @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_A)
    public void receiveFanoutA(String msg) {
        System.out.println("【Fanout消费者A】收到广播: " + msg);
    }

    // 3. Fanout 消费者 B
    @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_B)
    public void receiveFanoutB(String msg) {
        System.out.println("【Fanout消费者B】收到广播: " + msg);
    }

    // 4. Direct 消费者 (Red)
    @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_RED)
    public void receiveDirectRed(String msg) {
        System.out.println("【Direct消费者-红色】收到消息: " + msg);
    }

    // 5. Direct 消费者 (Green)
    @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_GREEN)
    public void receiveDirectGreen(String msg) {
        System.out.println("【Direct消费者-绿色】收到消息: " + msg);
    }

    // 6. Topic 消费者 (Msg)
    @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_MSG)
    public void receiveTopicMsg(String msg) {
        System.out.println("【Topic消费者-Info】收到消息: " + msg);
    }

    // 7. Topic 消费者 (News)
    @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_NEWS)
    public void receiveTopicNews(String msg) {
        System.out.println("【Topic消费者-News】收到消息: " + msg);
    }
}

rabbitreceivers

package com.example.lab4rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// @RabbitListener 注解用于监听指定的队列
@RabbitListener(queues = "hello-queue")
public class RabbitReceiver {

    // @RabbitHandler 注解用于处理接收到的消息
    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者收到消息: " + message);
    }
}

rabbitsender

package com.example.lab4rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.lab4rabbitmq.config.RabbitConfig; // 新增导入

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 1. 发送普通字符串
    public void send(String message) {
        rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE, message);
        System.out.println("已发送字符串: " + message);
    }

    // 2. 发送对象
    public void sendObject(User user) {
        rabbitTemplate.convertAndSend(RabbitConfig.OBJECT_QUEUE, user);
        System.out.println("已发送对象: " + user);
    }

    // 3. 发送 Direct 消息
    public void sendDirect(String message, String routingKey) {
        rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, routingKey, message);
        System.out.println("已发送 Direct 消息: " + message + " (路由键: " + routingKey + ")");
    }

    // 4. 发送 Topic 消息
    public void sendTopic(String message, String routingKey) {
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, routingKey, message);
        System.out.println("已发送 Topic 消息: " + message + " (路由键: " + routingKey + ")");
    }

    // 5. 发送 Fanout 消息
    public void sendFanout(String message) {
        rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", message);
        System.out.println("已发送 Fanout 广播消息: " + message);
    }
}

user

package com.example.lab4rabbitmq;

import java.io.Serializable;

// 必须实现 Serializable 接口
public class User implements Serializable {
    private static final long serialVersionUID = 1L;

    private String name;
    private Integer age;

    // 必须有无参构造
    public User() {}

    public User(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    // Getter 和 Setter
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public Integer getAge() { return age; }
    public void setAge(Integer age) { this.age = age; }

    @Override
    public String toString() {
        return "User{name='" + name + "', age=" + age + "}";
    }
}

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>4.0.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>lab4-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>lab4-rabbitmq</name>
    <description>lab4-rabbitmq</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webmvc</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webmvc-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 放在 <dependencies> 标签里面 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-json</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

依赖解释

       <!-- Spring AMQP Starter -->
        <!-- 作用:这是 Spring 对 AMQP(高级消息队列协议)的标准实现。 -->
        <!-- 它自动配置了 RabbitMQ 的连接工厂、RabbitTemplate(用于发送/接收消息)以及监听容器。 -->
        <!-- 没有它,你就得手动写一堆代码来连接 RabbitMQ。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

        <!-- Spring Web MVC Starter -->
        <!-- 作用:构建 Web 应用的核心依赖。 -->
        <!-- 它包含了 Spring MVC 框架(用于处理 HTTP 请求)、Tomcat 嵌入式服务器(默认)、以及 JSON 转换器。 -->
        <!-- 这让你能通过 @RestController 写接口,接收前端请求。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>

        <!-- Spring AMQP Test Starter -->
        <!-- 作用:专门用于测试的依赖(仅在测试阶段生效 scope="test")。 -->
        <!-- 它提供了模拟(Mock)RabbitMQ 的工具,让你在不启动真实 RabbitMQ 服务的情况下也能运行单元测试。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp-test</artifactId>
<scope>test</scope>
</dependency>

        <!-- Spring Web Test Starter -->
        <!-- 作用:Web 层测试依赖。 -->
        <!-- 它包含了 Spring Test 模块和 JUnit,允许你进行 MockMvc 测试,模拟 HTTP 请求来测试你的 Controller 接口。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc-test</artifactId>
<scope>test</scope>
</dependency>

        <!-- Spring Boot JSON Starter -->
        <!-- 作用:JSON 处理支持。 -->
        <!-- 虽然 Web 依赖通常自带 JSON 转换,但显式引入这个依赖可以确保 Jackson 库被包含进来。 -->
        <!-- 在 RabbitMQ 中,我们通常不直接发送 Java 对象(会有反序列化安全问题),而是将对象转换为 JSON 字符串发送,这个依赖就是干这个的。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>

进阶依赖推荐清单

虽然你的项目现在能跑起来了,但为了开发更高效、功能更完善,以下依赖在实际开发中非常常用。你可以根据 2026 年的开发趋势(如云原生、响应式编程)进行选择:

1. 开发效率神器
  • Lombok
    • 作用:通过注解自动生成 gettersettertoString、构造函数等样板代码。
    • 场景:你的 User 类实体如果不用 Lombok,需要写几十行冗余代码;用了之后只需要几个注解。
    • 坐标:xml

      编辑

      <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <scope>provided</scope>
      </dependency>
2. 数据存储 (如果你需要存数据)
  • Spring Data JPA / Spring Data MongoDB
    • 作用:简化数据库操作。RabbitMQ 负责“传输”,通常需要数据库来“存储”最终结果。
    • 场景:接收消息后,将用户数据保存到 MySQL 或 MongoDB 中。
    • 坐标 (以 MySQL 为例):xml

      编辑

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-jpa</artifactId>
      </dependency>
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-j</artifactId>
      </dependency>
3. 测试增强 (现代开发推荐)
  • Testcontainers
    • 作用:在 Docker 容器中启动真实的数据库或中间件进行测试。
    • 场景:之前的 spring-boot-starter-amqp-test 是“模拟”RabbitMQ。如果你想做集成测试(Integration Test),确保代码能和真实的 RabbitMQ 交互,Testcontainers 是 2026 年的标准做法。
    • 坐标:xml

      编辑

      <dependency>
          <groupId>org.testcontainers</groupId>
          <artifactId>rabbitmq</artifactId>
          <scope>test</scope>
      </dependency>
4. 系统监控与运维
  • Spring Boot Actuator
    • 作用:为你的应用提供“健康检查”、“指标监控”、“环境信息”等端点。
    • 场景:在生产环境中,运维人员需要知道 RabbitMQ 连接是否正常、系统内存是否充足。
    • 坐标:xml

      编辑

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-actuator</artifactId>
      </dependency>
5. 缓存加速
  • Spring Data Redis
    • 作用:集成 Redis 缓存。
    • 场景:RabbitMQ 处理消息可能有延迟,或者消息内容是高频读取的配置信息。将处理结果或配置缓存到 Redis 中,可以极大提升前端接口(Web MVC)的响应速度。
    • 坐标:xml

      编辑

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>

application

# RabbitMQ 连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 消息确认配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

# 消息消费确认配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.amqp.deserialization.trust.all=true
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐