领先一步
VMware 提供培训和认证,助您加速进步。
了解更多这是6部分系列文章的第4部分,新文章将于周一和周四发布,旨在为 Spring 开发者介绍 Microsoft Azure。如果没有 Microsoft 的 Asir Vedamuthu Selvasingh、Yitao Dong、Bruno Borges、Brian Benz 和 Theresa Nguyen 的贡献,我将无法完成这篇文章。您可以在 Github 上找到本系列文章的代码。在阅读本系列文章时,如果您有任何反馈或疑问,请通过 Twitter (@starbuxman) 与我联系。您也可以在我 Spring Tips (@SpringTipsLive) 的系列文章 Bootiful Azure 中了解更多关于 Microsoft Azure 的信息。
以下是所有安装内容
Azure Service Bus 是一种云消息传递即服务和集成技术。与 CosmosDB 一样,它具有最大的灵活性。它 支持 AMQP 1.0 协议,类似于 RabbitMQ。AMQP 是一种灵活的线路协议。协议本身包含了管理代理的指令,而不仅仅是与之交互。AMQP 代理非常适合集成,因为它们与语言和平台无关。在 AMQP 代理中,生产者将消息发送到 交换器,交换器然后将消息路由到 队列,消费者随后从队列中读取消息。交换器负责决定消息应发送到哪个队列。它通过多种方式来实现这一点,但通常涉及查看消息头中称为 路由键 的键。
交换器和队列之间的这种间接性使得 AMQP 比基于 JMS 的代理更灵活,在 JMS 代理中,生产者直接将消息发送到消费者从中读取的 Destination 对象。这意味着生产者和消费者通过它们选择的 Destination 而耦合。此外,JMS 是 JVM 的 API,而不是线路协议。因此,生产者和消费者依赖于他们使用的库的版本是否正确。话虽如此,您也可以通过 JMS API 使用 Azure Service Bus。
正如我所说,Azure Service Bus 极其灵活!
AMQP 模型很有启发性,因为 Azure Service Bus 的原生模型基本上就像 AMQP。在 Azure Service Bus 中,您拥有主题或队列,您向其发送消息。然后,消息连接到订阅,消费者从中读取。让我们构建一个发送和消耗消息的简单示例。我们将不使用 AMQP 或 JMS,只使用普通的 Microsoft Azure ServiceBus API。
您需要提供一个 servicebus 命名空间、一个主题(我们向其发送消息,并且可以有多个消费者收听)和一个订阅(用于连接到主题的主题或队列的消费者)。这是一个执行此操作的示例脚本。
#!/usr/bin/env bash
destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1
az servicebus namespace create --resource-group $rg \
--name ${namespace}
az servicebus topic create --resource-group $rg \
--namespace-name ${namespace} \
--name ${topic}
az servicebus topic subscription create --resource-group $rg \
--namespace-name ${namespace} --topic-name ${topic} \
--name ${subscription}
您需要一个连接字符串才能将您的 Spring 应用程序连接到 servicebus。运行此命令并记下 primaryConnectionString 属性值以供将来使用。
az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey
将以下依赖项添加到您的构建中:com.microsoft.azure : azure-servicebus-spring-boot-starter。
我们将编写两个组件:一个是生产者,另一个是消费者。在 实际 应用中,这些东西自然会存在于单独的应用程序和单独的进程中。毕竟,消息传递服务于集成不同的应用程序。我们先看消费者。消费者需要在其他东西生成消息 之前 注册订阅者,所以我们将使这些 bean 有序 - Spring 容器将根据我们给它的 Ordered 值来确定它们的初始化顺序。
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Log4j2
@Component
class ServiceBusConsumer implements Ordered {
private final ISubscriptionClient iSubscriptionClient;
ServiceBusConsumer(ISubscriptionClient isc) {
this.iSubscriptionClient = isc;
}
@EventListener(ApplicationReadyEvent.class)
public void consume() throws Exception {
this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
log.error("eeks!", exception);
}
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
当消息到达时,我们记录其 body 和 messageId。
现在,让我们看看生产者。
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.time.Instant;
@Log4j2
@Component
class ServiceBusProducer implements Ordered {
private final ITopicClient iTopicClient;
ServiceBusProducer(ITopicClient iTopicClient) {
this.iTopicClient = iTopicClient;
}
@EventListener(ApplicationReadyEvent.class)
public void produce() throws Exception {
this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
很简单,对吧?类的核心部分在 consume() 和 produce() 方法中。消费者先运行,然后是生产者。如果您曾经使用过消息传递技术,您可能会觉得没有提到任何类型的 目标 - 主题或队列 - 有点令人困惑。所有这些配置都存在于属性中(例如,您 application.properties 文件中的那些),并在自动配置 ITopicClient 和 ISubscriptionClient 时使用。如果您想发送消息或从多个目标消耗消息,只需自行定义相关 bean,并确保 不 在应用程序的属性中指定 azure.service-bus.connection-string,否则默认的 Spring Boot 自动配置将生效,并尝试为您创建这些 bean。