领先一步
VMware 提供培训和认证,助您快速进步。
了解更多这是 6 部分系列文章中的第 4 部分,每周一和周四发布新文章,向 Spring 开发者介绍 Microsoft Azure。如果没有来自 Microsoft 的 Asir Vedamuthu Selvasingh、Yitao Dong、Bruno Borges、Brian Benz 和 Theresa Nguyen 的意见,我无法完成本文。您可以在 Github 上找到本系列的代码。在阅读时,如果您有任何反馈或问题,请在 Twitter (@starbuxman) 上联系我。您也可以在我的 Spring Tips (@SpringTipsLive) 文章 Bootiful Azure 中了解更多关于 Microsoft Azure 的信息。
以下是所有系列文章
Azure Service Bus 是一种云消息即服务和集成技术。就像 CosmosDB 一样,它尽可能灵活。它支持 AMQP 1.0 协议,类似于 RabbitMQ。AMQP 是一种灵活的网络协议。协议本身包含管理 Broker(除了与之交互之外)的说明。AMQP Broker 非常适合集成,因为它们与语言和平台无关。在 AMQP Broker 中,生产者将消息发送到 exchanges(交换器),然后交换器将消息路由到 queues(队列),消费者再从队列中读取消息。交换器负责决定消息应该发送到哪个队列。它通过多种方式实现这一点,但通常涉及查看消息头中的一个名为 routing key(路由键) 的键。
交换器和队列之间的这种间接性使得 AMQP 比基于 JMS 的 Broker 更灵活,在基于 JMS 的 Broker 中,生产者直接将消息发送到消费者从中读取的 Destination
对象。这意味着生产者和消费者通过选择 Destination
耦合在一起。此外,JMS 是一个针对 JVM 的 API,而不是一个网络协议。因此,生产者和消费者依赖于他们使用的库的版本是否正确。尽管如此,您也可以通过 JMS API 使用 Azure Service Bus。
正如我所说,Azure Service Bus 非常灵活!
AMQP 模型具有启发性,因为基本上,Azure Service Bus 的原生模型看起来就像 AMQP。在 Azure Service Bus 中,您有主题(topics)或队列(queues),您可以向它们发送消息。然后消息连接到订阅(subscriptions),消费者从订阅中读取。让我们构建一个简单的例子来发送和消费消息。我们将不使用 AMQP 或 JMS,只使用常规的 Microsoft Azure ServiceBus API。
您需要预置一个 servicebus namespace(命名空间)、一个 topic(主题,我们向其发送消息,多个消费者可以监听它)和一个 subscription(订阅,是主题或队列的消费者)来连接到主题。这是一个实现此目的的示例脚本。
#!/usr/bin/env bash
destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1
az servicebus namespace create --resource-group $rg \
--name ${namespace}
az servicebus topic create --resource-group $rg \
--namespace-name ${namespace} \
--name ${topic}
az servicebus topic subscription create --resource-group $rg \
--namespace-name ${namespace} --topic-name ${topic} \
--name ${subscription}
您需要一个连接字符串来将您的 Spring 应用程序连接到 servicebus。运行此命令并记下 primaryConnectionString
属性的值,以备后用。
az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey
将以下依赖项添加到您的构建中:com.microsoft.azure
: azure-servicebus-spring-boot-starter
。
我们将编写两个组件:一个生产者(producer)和一个消费者(consumer)。在实际应用中,这些东西自然会存在于独立的应用程序和独立的进程中。毕竟,消息传递是用来支持不同应用程序集成的。我们先来看看消费者。消费者需要在其他东西产生消息之前注册一个订阅者,所以我们将这些 bean 设置为有序的 - Spring 容器会根据我们给定的 Ordered
值来确定它们的初始化顺序。
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Log4j2
@Component
class ServiceBusConsumer implements Ordered {
private final ISubscriptionClient iSubscriptionClient;
ServiceBusConsumer(ISubscriptionClient isc) {
this.iSubscriptionClient = isc;
}
@EventListener(ApplicationReadyEvent.class)
public void consume() throws Exception {
this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
log.error("eeks!", exception);
}
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
当消息到达时,我们记录其 body
和 messageId
。
现在,我们来看看生产者。
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.time.Instant;
@Log4j2
@Component
class ServiceBusProducer implements Ordered {
private final ITopicClient iTopicClient;
ServiceBusProducer(ITopicClient iTopicClient) {
this.iTopicClient = iTopicClient;
}
@EventListener(ApplicationReadyEvent.class)
public void produce() throws Exception {
this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
很直观,对吧?类的核心代码在 consume()
和 produce()
方法中。消费者先运行,然后是生产者。如果您曾经使用过消息传递技术,可能会觉得没有提及任何形式的目的地——主题或队列——有点令人困惑。所有这些配置都存在于属性中(例如 application.properties
文件中的属性),并在自动配置 ITopicClient
和 ISubscriptionClient
时使用。如果您想从多个目的地发送或消费消息,只需自行定义相关的 bean,并确保不要在应用程序的属性中指定 azure.service-bus.connection-string
,否则默认的 Spring Boot 自动配置将生效并尝试为您创建这些 bean。