使用 RabbitMQ 进行消息传递

本指南将引导您完成创建发布和订阅 RabbitMQ AMQP 服务器的 Spring Boot 应用程序的过程。

您将构建的内容

您将构建一个应用程序,该应用程序使用 Spring AMQP 的RabbitTemplate发布消息,并使用MessageListenerAdapter在 POJO 上订阅该消息。

您需要什么

  • 大约 15 分钟

  • 您喜欢的文本编辑器或 IDE

  • Java 17 或更高版本

如何完成本指南

与大多数 Spring 入门指南 一样,您可以从头开始并完成每个步骤,也可以通过查看此存储库中的代码直接跳转到解决方案。

在本地环境中查看最终结果,您可以执行以下操作之一

设置 RabbitMQ 代理

在构建消息传递应用程序之前,您需要设置一个服务器来处理接收和发送消息。本指南假设您使用Spring Boot Docker Compose 支持。这种方法的先决条件是您的开发机器拥有可用的 Docker 环境,例如Docker Desktop。添加一个依赖项spring-boot-docker-compose,它执行以下操作:

  • 在您的工作目录中搜索compose.yml和其他常见的 compose 文件名。

  • 使用发现的compose.yml调用docker compose up

  • 为每个受支持的容器创建服务连接 Bean。

  • 应用程序关闭时调用docker compose stop

要使用 Docker Compose 支持,您只需遵循本指南即可。根据您引入的依赖项,Spring Boot 会找到正确的compose.yml文件,并在您运行应用程序时启动您的 Docker 容器。

如果您选择自己运行 RabbitMQ 服务器而不是使用 Spring Boot Docker Compose 支持,您可以选择以下几种方法:

  • 下载服务器并手动运行它。

  • 如果您使用的是 Mac,请使用 Homebrew 安装。

  • 使用docker-compose up手动运行compose.yaml文件。

如果您选择这些替代方法中的任何一种,则应从 Maven 或 Gradle 构建文件中删除spring-boot-docker-compose依赖项。您还需要向application.properties文件添加配置,这将在准备构建应用程序部分中更详细地描述。如前所述,本指南假设您在 Spring Boot 中使用 Docker Compose 支持,因此此时无需对application.properties进行其他更改。

从 Spring Initializr 开始

您可以使用此预初始化项目并单击“生成”以下载 ZIP 文件。此项目已配置为适合本指南中的示例。

要手动初始化项目:

  1. 导航到start.spring.io。此服务会引入应用程序所需的所有依赖项,并为您完成大部分设置工作。

  2. 选择 Gradle 或 Maven 以及您要使用的语言。本指南假设您选择了 Java。

  3. 单击依赖项并选择用于 RabbitMQ 的 SpringDocker Compose 支持

  4. 单击生成

  5. 下载生成的 ZIP 文件,这是一个使用您的选择配置的应用程序的存档。

如果您的 IDE 集成了 Spring Initializr,则可以从您的 IDE 中完成此过程。

创建 RabbitMQ 消息接收器

对于任何基于消息的应用程序,您都需要创建一个接收器来响应已发布的消息。以下列表(来自src/main/java/com/example/messagingrabbitmq/Receiver.java)显示了如何执行此操作。

package com.example.messagingrabbitmq;

import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

  private CountDownLatch latch = new CountDownLatch(1);

  public void receiveMessage(String message) {
    System.out.println("Received <" + message + ">");
    latch.countDown();
  }

  public CountDownLatch getLatch() {
    return latch;
  }

}

Receiver是一个 POJO,它定义了一种接收消息的方法。当您注册它以接收消息时,您可以随意命名它。

为方便起见,此 POJO 还具有CountDownLatch。这使它可以发出已收到消息的信号。这在生产应用程序中不太可能实现。

注册监听器并发送消息

Spring AMQP 的RabbitTemplate提供了使用 RabbitMQ 发送和接收消息所需的一切。但是,您需要:

  • 配置消息侦听器容器。

  • 声明队列、交换机以及它们之间的绑定。

  • 配置一个组件以发送一些消息来测试侦听器。

Spring Boot 自动创建一个连接工厂和一个 RabbitTemplate,从而减少了您需要编写的代码量。

您将使用RabbitTemplate发送消息,并将使用消息侦听器容器注册一个Receiver来接收消息。连接工厂驱动两者,使它们能够连接到 RabbitMQ 服务器。以下列表(来自src/main/java/com/example/messagingrabbitmq/MessagingRabbitmqApplication.java)显示了如何创建应用程序类。

package com.example.messagingrabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

  static final String topicExchangeName = "spring-boot-exchange";

  static final String queueName = "spring-boot";

  @Bean
  Queue queue() {
    return new Queue(queueName, false);
  }

  @Bean
  TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }

  @Bean
  Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
  }

  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
      MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(Receiver receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
  }

  public static void main(String[] args) throws InterruptedException {
    SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
  }

}

@SpringBootApplication注释提供了许多好处,如参考文档中所述。

listenerAdapter()方法中定义的 Bean 在容器(在container()中定义)中注册为消息侦听器。它侦听spring-boot队列上的消息。因为Receiver类是 POJO,所以需要将其包装在MessageListenerAdapter中,您可以在其中指定它调用receiveMessage

JMS 队列和 AMQP 队列具有不同的语义。例如,JMS 将排队消息仅发送给一个使用者。虽然 AMQP 队列执行相同操作,但 AMQP 生产者不会直接将消息发送到队列。相反,消息将发送到交换机,交换机可以转到单个队列或扇出到多个队列,从而模拟 JMS 主题的概念。

消息侦听器容器和接收器 Bean 是您侦听消息所需的一切。要发送消息,您还需要一个 Rabbit 模板。

queue()方法创建一个 AMQP 队列。exchange()方法创建一个主题交换机。binding()方法将这两个绑定在一起,定义当RabbitTemplate发布到交换机时发生的行为。

Spring AMQP 要求将QueueTopicExchangeBinding声明为顶级 Spring Bean,以便正确设置。

在这种情况下,我们使用主题交换机,并且队列使用路由键foo.bar.#绑定,这意味着使用以foo.bar.开头的路由键发送的任何消息都将路由到队列。

发送测试消息

在此示例中,测试消息由CommandLineRunner发送,该CommandLineRunner还会等待接收器中的闩锁并关闭应用程序上下文。以下列表(来自src/main/java/com.example.messagingrabbitmq/Runner.java)显示了它的工作方式。

package com.example.messagingrabbitmq;

import java.util.concurrent.TimeUnit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

  private final RabbitTemplate rabbitTemplate;
  private final Receiver receiver;

  public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
    this.receiver = receiver;
    this.rabbitTemplate = rabbitTemplate;
  }

  @Override
  public void run(String... args) throws Exception {
    System.out.println("Sending message...");
    rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
    receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
  }

}

请注意,模板使用路由键foo.bar.baz将消息路由到交换机,这与绑定匹配。

在测试中,您可以模拟运行器,以便可以隔离测试接收器。

运行应用程序

main()方法通过创建 Spring 应用程序上下文来启动该过程。这将启动消息侦听器容器,该容器开始侦听消息。有一个Runner Bean,然后自动运行它。它从应用程序上下文中检索RabbitTemplate并在spring-boot队列上发送Hello from RabbitMQ!消息。最后,它关闭 Spring 应用程序上下文,应用程序结束。

您可以通过 IDE 运行 main 方法。请注意,如果您从解决方案存储库克隆了项目,您的 IDE 可能会在错误的位置查找compose.yaml文件。您可以配置 IDE 以查找正确的位置,也可以使用命令行运行应用程序。./gradlew bootRun./mvnw spring-boot:run命令将启动应用程序并自动查找 compose.yaml 文件。

准备构建应用程序

要运行没有 Spring Boot Docker Compose 支持的代码,您需要运行一个本地 RabbitMQ 版本才能连接。为此,您可以使用 Docker Compose,但必须首先对compose.yaml文件进行两处更改。首先,将compose.yaml中的ports条目修改为'5672:5672'。其次,添加container_name

compose.yaml现在应该是:

services:
  rabbitmq:
    container_name: 'guide-rabbit'
    image: 'rabbitmq:latest'
    environment:
      - 'RABBITMQ_DEFAULT_PASS=secret'
      - 'RABBITMQ_DEFAULT_USER=myuser'
    ports:
      - '5672:5672'

现在您可以运行docker-compose up来启动RabbitMQ服务。现在您应该拥有一个可以接受请求的外部RabbitMQ服务器。

此外,您需要告诉Spring如何连接到RabbitMQ服务器(这在使用Spring Boot Docker Compose支持时会自动处理)。将以下代码添加到src/main/resources目录下的新application.properties文件中。

spring.rabbitmq.password=secret
spring.rabbitmq.username=myuser

构建应用程序

本节介绍运行本指南的不同方法。

无论您选择哪种方式运行应用程序,输出都应该相同。

要运行应用程序,您可以将应用程序打包为可执行jar文件。./gradlew clean build命令会将应用程序编译成可执行jar文件。然后,您可以使用java -jar build/libs/messaging-rabbitmq-0.0.1-SNAPSHOT.jar命令运行该jar文件。

或者,如果您有可用的Docker环境,您可以使用buildpacks直接从Maven或Gradle插件创建Docker镜像。使用Cloud Native Buildpacks,您可以创建可在任何地方运行的与Docker兼容的镜像。Spring Boot直接为Maven和Gradle包含buildpack支持。这意味着您可以键入单个命令,并快速将一个合理的镜像放入本地运行的Docker守护进程中。要使用Cloud Native Buildpacks创建Docker镜像,请运行./gradlew bootBuildImage命令。启用Docker环境后,您可以使用docker run --network container:guide-rabbit docker.io/library/messaging-rabbitmq:0.0.1-SNAPSHOT命令运行应用程序。

--network标志告诉Docker将我们的guide容器附加到我们的外部容器正在使用的现有网络。您可以在Docker文档中找到更多信息。

无论您选择如何构建和运行应用程序,您都应该看到以下输出。

    Sending message...
    Received <Hello from RabbitMQ!>

总结

恭喜!您刚刚使用Spring和RabbitMQ开发了一个简单的发布-订阅应用程序。您可以使用Spring和RabbitMQ做更多的事情,但这本指南应该提供一个良好的开端。

另请参阅

以下指南可能也有帮助:

想编写新的指南或为现有指南贡献代码?请查看我们的贡献指南

所有指南均采用ASLv2许可证发布代码,并采用署名-非衍生作品创作共用许可证发布文本。

获取代码

免费

云端工作

在Spring Academy的云端完成本指南。

前往Spring Academy