Bootiful Azure: 与 Azure Service Bus 集成 (4/6)

工程 | Josh Long | 2019年1月14日 | ...

这是6部分系列文章的第4部分,新文章将于周一和周四发布,旨在为 Spring 开发者介绍 Microsoft Azure。如果没有 Microsoft 的 Asir Vedamuthu Selvasingh、Yitao Dong、Bruno Borges、Brian Benz 和 Theresa Nguyen 的贡献,我将无法完成这篇文章。您可以在 Github 上找到本系列文章的代码。在阅读本系列文章时,如果您有任何反馈或疑问,请通过 Twitter (@starbuxman) 与我联系。您也可以在我 Spring Tips (@SpringTipsLive) 的系列文章 Bootiful Azure 中了解更多关于 Microsoft Azure 的信息。

以下是所有安装内容

Azure Service Bus 是一种云消息传递即服务和集成技术。与 CosmosDB 一样,它具有最大的灵活性。它 支持 AMQP 1.0 协议,类似于 RabbitMQ。AMQP 是一种灵活的线路协议。协议本身包含了管理代理的指令,而不仅仅是与之交互。AMQP 代理非常适合集成,因为它们与语言和平台无关。在 AMQP 代理中,生产者将消息发送到 交换器,交换器然后将消息路由到 队列,消费者随后从队列中读取消息。交换器负责决定消息应发送到哪个队列。它通过多种方式来实现这一点,但通常涉及查看消息头中称为 路由键 的键。

交换器和队列之间的这种间接性使得 AMQP 比基于 JMS 的代理更灵活,在 JMS 代理中,生产者直接将消息发送到消费者从中读取的 Destination 对象。这意味着生产者和消费者通过它们选择的 Destination 而耦合。此外,JMS 是 JVM 的 API,而不是线路协议。因此,生产者和消费者依赖于他们使用的库的版本是否正确。话虽如此,您也可以通过 JMS API 使用 Azure Service Bus

正如我所说,Azure Service Bus 极其灵活!

AMQP 模型很有启发性,因为 Azure Service Bus 的原生模型基本上就像 AMQP。在 Azure Service Bus 中,您拥有主题或队列,您向其发送消息。然后,消息连接到订阅,消费者从中读取。让我们构建一个发送和消耗消息的简单示例。我们将不使用 AMQP 或 JMS,只使用普通的 Microsoft Azure ServiceBus API。

在 Microsoft Azure 上配置 Azure Service Bus

您需要提供一个 servicebus 命名空间、一个主题(我们向其发送消息,并且可以有多个消费者收听)和一个订阅(用于连接到主题的主题或队列的消费者)。这是一个执行此操作的示例脚本。

#!/usr/bin/env bash

destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1

az servicebus namespace create --resource-group $rg \
    --name ${namespace}

az servicebus topic create --resource-group $rg \
    --namespace-name ${namespace} \
    --name ${topic}

az servicebus topic subscription create --resource-group $rg  \
    --namespace-name ${namespace} --topic-name ${topic} \
    --name ${subscription}

您需要一个连接字符串才能将您的 Spring 应用程序连接到 servicebus。运行此命令并记下 primaryConnectionString 属性值以供将来使用。

az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey

将 Azure Service Bus 引入您的 Spring 应用程序

将以下依赖项添加到您的构建中:com.microsoft.azure : azure-servicebus-spring-boot-starter

我们将编写两个组件:一个是生产者,另一个是消费者。在 实际 应用中,这些东西自然会存在于单独的应用程序和单独的进程中。毕竟,消息传递服务于集成不同的应用程序。我们先看消费者。消费者需要在其他东西生成消息 之前 注册订阅者,所以我们将使这些 bean 有序 - Spring 容器将根据我们给它的 Ordered 值来确定它们的初始化顺序。

package com.example.bootifulazure;

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Log4j2
@Component
class ServiceBusConsumer implements Ordered {

    private final ISubscriptionClient iSubscriptionClient;

    ServiceBusConsumer(ISubscriptionClient isc) {
        this.iSubscriptionClient = isc;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void consume() throws Exception {

        this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {

            @Override
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
                return CompletableFuture.completedFuture(null);
            }

            @Override
            public void notifyException(Throwable exception, ExceptionPhase phase) {
                log.error("eeks!", exception);
            }
        });

    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

当消息到达时,我们记录其 bodymessageId

现在,让我们看看生产者。

package com.example.bootifulazure;

import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.time.Instant;

@Log4j2
@Component
class ServiceBusProducer implements Ordered {

    private final ITopicClient iTopicClient;

    ServiceBusProducer(ITopicClient iTopicClient) {
        this.iTopicClient = iTopicClient;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void produce() throws Exception {
        this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}

很简单,对吧?类的核心部分在 consume()produce() 方法中。消费者先运行,然后是生产者。如果您曾经使用过消息传递技术,您可能会觉得没有提到任何类型的 目标 - 主题或队列 - 有点令人困惑。所有这些配置都存在于属性中(例如,您 application.properties 文件中的那些),并在自动配置 ITopicClientISubscriptionClient 时使用。如果您想发送消息或从多个目标消耗消息,只需自行定义相关 bean,并确保 在应用程序的属性中指定 azure.service-bus.connection-string,否则默认的 Spring Boot 自动配置将生效,并尝试为您创建这些 bean。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有