领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多这是由6部分组成的系列文章的第4部分,每周一和周四发布新文章,介绍面向 Spring 开发人员的 Microsoft Azure。如果没有微软的 Asir Vedamuthu Selvasingh、Yitao Dong、Bruno Borges、Brian Benz 和 Theresa Nguyen 的投入,我将无法完成这项工作。您可以在Github 上找到此系列文章的代码。在阅读各部分内容时,如有任何反馈或问题,请在Twitter (@starbuxman)上与我联系。您还可以通过我的Spring Tips (@SpringTipsLive)文章Bootiful Azure了解更多关于 Microsoft Azure 的信息。
以下是所有部分
Azure Service Bus 是一种云消息服务和集成技术。它与 CosmosDB 一样,尽可能地灵活。它支持 AMQP 1.0 协议,就像 RabbitMQ 一样。AMQP 是一种灵活的线协议。协议本身包含用于管理代理的指令,而不仅仅是与之交互。AMQP 代理非常适合集成,因为它们与语言和平台无关。在 AMQP 代理中,生产者将消息发送到交换机,然后交换机将消息路由到队列,消费者从队列中读取消息。交换机负责决定应将消息发送到哪个队列。它可以通过多种方式执行此操作,但通常涉及查看消息头中名为路由键的键。
交换机和队列之间的这种间接性使得 AMQP 比基于 JMS 的代理更灵活,在基于 JMS 的代理中,生产者直接将消息发送到消费者从中读取的Destination
对象。这意味着生产者和消费者通过他们选择的Destination
耦合在一起。此外,JMS 是 JVM 的 API,它不是线协议。因此,生产者和消费者依赖于他们使用的库版本是否正确。也就是说,您也可以通过 JMS API 使用 Azure Service Bus。
就像我说的,Azure Service Bus 非常灵活!
AMQP 模型具有说明性,因为 Azure Service Bus 的本机模型基本上类似于 AMQP。在 Azure Service Bus 中,您拥有主题或队列,您可以将消息发送到这些主题或队列。然后,消息连接到订阅,消费者从中读取。让我们构建一个简单的示例,发送然后使用消息。我们不会使用 AMQP 或 JMS,只使用常规的 Microsoft Azure ServiceBus API。
您需要预配一个 Service Bus 命名空间、一个主题(我们向其发送消息,多个消费者可以从中侦听)和一个订阅(主题或队列的消费者)以连接到主题。这是一个执行此操作的示例脚本。
#!/usr/bin/env bash
destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1
az servicebus namespace create --resource-group $rg \
--name ${namespace}
az servicebus topic create --resource-group $rg \
--namespace-name ${namespace} \
--name ${topic}
az servicebus topic subscription create --resource-group $rg \
--namespace-name ${namespace} --topic-name ${topic} \
--name ${subscription}
您需要一个连接字符串才能将 Spring 应用程序连接到 Service Bus。运行此命令并记下primaryConnectionString
属性值以备后用。
az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey
将以下依赖项添加到您的构建中:com.microsoft.azure
: azure-servicebus-spring-boot-starter
。
我们将编写两个组件:一个生产者和一个消费者。在一个真实的应用程序中,这些组件自然会存在于不同的应用程序和不同的进程中。毕竟,消息传递用于支持不同应用程序的集成。我们首先看看消费者。消费者需要在其他任何东西产生消息之前注册一个订阅者,因此我们将这些 bean 设置为有序的 - Spring 容器将根据我们给它的Ordered
值,按顺序初始化它们。
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Log4j2
@Component
class ServiceBusConsumer implements Ordered {
private final ISubscriptionClient iSubscriptionClient;
ServiceBusConsumer(ISubscriptionClient isc) {
this.iSubscriptionClient = isc;
}
@EventListener(ApplicationReadyEvent.class)
public void consume() throws Exception {
this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
log.error("eeks!", exception);
}
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
当消息到达时,我们会记录其body
和messageId
。
现在,让我们看看生产者。
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.time.Instant;
@Log4j2
@Component
class ServiceBusProducer implements Ordered {
private final ITopicClient iTopicClient;
ServiceBusProducer(ITopicClient iTopicClient) {
this.iTopicClient = iTopicClient;
}
@EventListener(ApplicationReadyEvent.class)
public void produce() throws Exception {
this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
非常简单,对吧?类的核心内容在consume()
和produce()
方法中。消费者先运行,然后是生产者。如果您以前使用过任何消息传递技术,您可能会发现没有提及任何类型的目标(主题或队列)有点令人费解。所有这些配置都存在于属性(例如application.properties
文件中的属性)中,并在自动配置ITopicClient
和ISubscriptionClient
时使用。如果您想从多个目标发送消息或使用消息,只需自行定义相关的 bean,并确保不要在应用程序的属性中指定azure.service-bus.connection-string
,否则默认的 Spring Boot 自动配置将启动并尝试为您创建这些 bean。