Bootiful Azure:与 Azure Service Bus 集成 (4/6)

工程 | Josh Long | 2019年1月14日 | ...

这是由6部分组成的系列文章的第4部分,每周一和周四发布新文章,介绍面向 Spring 开发人员的 Microsoft Azure。如果没有微软的 Asir Vedamuthu Selvasingh、Yitao Dong、Bruno Borges、Brian Benz 和 Theresa Nguyen 的投入,我将无法完成这项工作。您可以在Github 上找到此系列文章的代码。在阅读各部分内容时,如有任何反馈或问题,请在Twitter (@starbuxman)上与我联系。您还可以通过我的Spring Tips (@SpringTipsLive)文章Bootiful Azure了解更多关于 Microsoft Azure 的信息。

以下是所有部分

Azure Service Bus 是一种云消息服务和集成技术。它与 CosmosDB 一样,尽可能地灵活。它支持 AMQP 1.0 协议,就像 RabbitMQ 一样。AMQP 是一种灵活的线协议。协议本身包含用于管理代理的指令,而不仅仅是与之交互。AMQP 代理非常适合集成,因为它们与语言和平台无关。在 AMQP 代理中,生产者将消息发送到交换机,然后交换机将消息路由到队列,消费者从队列中读取消息。交换机负责决定应将消息发送到哪个队列。它可以通过多种方式执行此操作,但通常涉及查看消息头中名为路由键的键。

交换机和队列之间的这种间接性使得 AMQP 比基于 JMS 的代理更灵活,在基于 JMS 的代理中,生产者直接将消息发送到消费者从中读取的Destination对象。这意味着生产者和消费者通过他们选择的Destination耦合在一起。此外,JMS 是 JVM 的 API,它不是线协议。因此,生产者和消费者依赖于他们使用的库版本是否正确。也就是说,您也可以通过 JMS API 使用 Azure Service Bus

就像我说的,Azure Service Bus 非常灵活!

AMQP 模型具有说明性,因为 Azure Service Bus 的本机模型基本上类似于 AMQP。在 Azure Service Bus 中,您拥有主题或队列,您可以将消息发送到这些主题或队列。然后,消息连接到订阅,消费者从中读取。让我们构建一个简单的示例,发送然后使用消息。我们不会使用 AMQP 或 JMS,只使用常规的 Microsoft Azure ServiceBus API。

在 Microsoft Azure 上配置 Azure Service Bus

您需要预配一个 Service Bus 命名空间、一个主题(我们向其发送消息,多个消费者可以从中侦听)和一个订阅(主题或队列的消费者)以连接到主题。这是一个执行此操作的示例脚本。

#!/usr/bin/env bash

destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1

az servicebus namespace create --resource-group $rg \
    --name ${namespace}

az servicebus topic create --resource-group $rg \
    --namespace-name ${namespace} \
    --name ${topic}

az servicebus topic subscription create --resource-group $rg  \
    --namespace-name ${namespace} --topic-name ${topic} \
    --name ${subscription}

您需要一个连接字符串才能将 Spring 应用程序连接到 Service Bus。运行此命令并记下primaryConnectionString属性值以备后用。

az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey

将 Azure Service Bus 引入您的 Spring 应用程序

将以下依赖项添加到您的构建中:com.microsoft.azure : azure-servicebus-spring-boot-starter

我们将编写两个组件:一个生产者和一个消费者。在一个真实的应用程序中,这些组件自然会存在于不同的应用程序和不同的进程中。毕竟,消息传递用于支持不同应用程序的集成。我们首先看看消费者。消费者需要在其他任何东西产生消息之前注册一个订阅者,因此我们将这些 bean 设置为有序的 - Spring 容器将根据我们给它的Ordered值,按顺序初始化它们。

package com.example.bootifulazure;

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Log4j2
@Component
class ServiceBusConsumer implements Ordered {

    private final ISubscriptionClient iSubscriptionClient;

    ServiceBusConsumer(ISubscriptionClient isc) {
        this.iSubscriptionClient = isc;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void consume() throws Exception {

        this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {

            @Override
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
                return CompletableFuture.completedFuture(null);
            }

            @Override
            public void notifyException(Throwable exception, ExceptionPhase phase) {
                log.error("eeks!", exception);
            }
        });

    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

当消息到达时,我们会记录其bodymessageId

现在,让我们看看生产者。

package com.example.bootifulazure;

import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.time.Instant;

@Log4j2
@Component
class ServiceBusProducer implements Ordered {

    private final ITopicClient iTopicClient;

    ServiceBusProducer(ITopicClient iTopicClient) {
        this.iTopicClient = iTopicClient;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void produce() throws Exception {
        this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}

非常简单,对吧?类的核心内容在consume()produce()方法中。消费者先运行,然后是生产者。如果您以前使用过任何消息传递技术,您可能会发现没有提及任何类型的目标(主题或队列)有点令人费解。所有这些配置都存在于属性(例如application.properties文件中的属性)中,并在自动配置ITopicClientISubscriptionClient时使用。如果您想从多个目标发送消息或使用消息,只需自行定义相关的 bean,并确保不要在应用程序的属性中指定azure.service-bus.connection-string,否则默认的 Spring Boot 自动配置将启动并尝试为您创建这些 bean。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部