Bootiful GCP:与 Google Cloud Pub/Sub 集成 (4/8)

工程 | Josh Long | 2018年8月30日 | ...

各位 Spring 爱好者们大家好!在这个由 8 部分组成的简短系列中,我们将探讨 Google Cloud Platform 的 Spring Cloud 集成,称为 Spring Cloud GCP。Spring Cloud GCP 代表了 Google 和 Pivotal 之间的共同努力,旨在为使用 Google Cloud Platform 的 Spring Cloud 开发者提供一流的体验。Pivotal Cloud Foundry 用户将享受到与 GCP 服务代理更简单的集成。我写这些文章时,参考了 Google Cloud Developer Advocate 和我的朋友 Ray Tsang 的意见。你还可以在我们在 Google Next 2018 的会议 Bootiful Google Cloud Platform 中看到 Spring Cloud GCP 的讲解。谢谢我的朋友!一如既往,如果你有反馈,我很乐意听取

本系列共有八篇文章。完整列表如下:

](https://springframework.org.cn/blog/2018/09/06/bootiful-gcp-supporting-observability-with-spring-cloud-gcp-stackdriver-trace-6-8)

  • [Bootiful GCP:使用 Spring Cloud GCP 连接其他 GCP 服务 (7/8)

](https://springframework.org.cn/blog/2018/09/10/bootiful-gcp-use-spring-cloud-gcp-to-connect-to-other-gcp-services-7-8)

让我们看看使用 Google Cloud Pub/Sub 进行应用集成。Google Cloud Pub/Sub 支持许多在 Google 规模下的经典企业应用集成用例。以下是 Google Cloud 网站关于 Pub/Sub 列出的一些用例:

  • 平衡网络集群中的工作负载。例如,可以将大型任务队列有效地分配给多个 worker,如 Google Compute Engine 实例。

  • 实现异步工作流。例如,订单处理应用可以将订单放置到主题上,然后由一个或多个 worker 进行处理。

  • 分发事件通知。例如,接受用户注册的服务可以在新用户注册时发送通知,下游服务可以订阅以接收事件通知。

  • 刷新分布式缓存。例如,应用可以发布失效事件,以更新已更改对象的 ID。

  • 将日志记录到多个系统。例如,Google Compute Engine 实例可以将日志写入监控系统、写入数据库以供以后查询等等。

  • 从各种进程或设备进行数据流传输。例如,住宅传感器可以将数据流传输到云中托管的后端服务器。

  • 提高可靠性。例如,单区域 Compute Engine 服务可以通过订阅共同主题而在其他区域运行,以从区域或地区故障中恢复。

使用 Google Cloud Pub/Sub 的流程完全符合您的预期:消息被发送到 Pub/Sub 代理(由 GCP 在云中托管)中的一个主题,然后代理为您持久化该消息。订阅者可以接收推送的消息(通过 webhook),也可以从代理轮询消息。订阅者从代理接收消息并确认每条消息。当订阅者确认一条消息时,该消息将从订阅者的订阅队列中移除。任何能够使用 HTTPS 的客户端都可以使用此服务。无需其他 API。

如果您使用过任何其他消息系统(JMS、AMQP、Apache Kafka、Kestrel),领域模型就相当简单了:主题是消息发布到的地方。订阅表示来自特定主题、将要发送给特定客户端应用的消息流。一个主题可以有多个订阅。一个订阅可以有许多订阅者。如果您想将不同的消息分发给不同的订阅者,那么所有订阅者必须订阅同一个订阅。如果您想将相同的消息发布给所有订阅者,那么每个订阅者都需要订阅自己的订阅。

Pub/Sub 的交付语义是至少一次(at-least once)。因此,如果您不能多次处理同一条消息,则必须处理幂等性(idempotency)和/或对消息进行去重(de-duplicate)。

消息存储了数据和(可选)属性的组合,这些数据和属性由 Google Cloud Pub/Sub 从发布者传输到订阅者。消息属性,您可能更理解为头部(header),是消息中的键值对。您可能有一个描述负载语言的头部。您可能有一个描述内容类型的头部。

让我们将 Google Cloud Pub/Sub 添加到应用中,并将它们连接起来。

和以前一样,我们需要启用 Google Cloud Pub/Sub API 才能使用。

gcloud services enable pubsub.googleapis.com

然后,您需要创建一个新的主题,名为 reservations

gcloud pubsub topics create reservations

该主题代表我们将发送消息的地方。我们仍然需要创建一个订阅来消费来自该主题的消息。以下命令创建一个名为 reservations-subscription 的订阅,用于连接到 reservations 主题。

gcloud pubsub subscriptions create reservations-subscription --topic=reservations

这些部分就位后,我们就可以从应用中使用了。将 Spring Cloud GCP Pub/Sub Starter(org.springframework.cloud : spring-cloud-gcp-starter-pubsub)添加到构建中。这将引入 Google Cloud PubSubTemplate 的自动配置。如果您使用过 JmsTemplateKafkaTemplate,那么 PubSubTemplate 会让您感到熟悉。它是用于使用 Google Cloud Pub/Sub 生产和消费消息的易用客户端。如果您刚刚开始接触 GCP Pub/Sub 和消息传递,那么 Spring 领域中的 *Template 对象是一个不错的起点。

让我们看一个简单的示例,该示例在您向 Spring Boot 应用中运行的 HTTP 端点发出 HTTP POST 调用时发布消息。然后,我们将设置一个订阅者来消费发送的消息。

package com.example.gcp.pubsub.template;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
class PublisherConfig {

        private final PubSubTemplate template;
        private final String topic;

        PublisherConfig(PubSubTemplate template, @Value("${reservations.topic:reservations}") String t) {
                this.template = template;
                this.topic = t;
        }

        
        @PostMapping("/publish/{name}")
        void publish(@PathVariable String name) {
                this.template.publish(this.topic, "Hello " + name + "!");
        }
}
  • 我们使用注入的 PubSubTemplate 向配置的主题发送消息——一个字符串。

现在,让我们看一个简单的应用,它可能也运行在另一个节点上,该应用消费来自链接到该主题的订阅的消息。

package com.example.gcp.pubsub.template;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

@Slf4j
@Configuration
class SubscriberConfig {

        private final PubSubTemplate template;
        private final String subscription;

        SubscriberConfig(PubSubTemplate template, @Value("${reservations.subscription:reservations-subscription}") String s) {
                this.template = template;
                this.subscription = s;
        }

        @EventListener(ApplicationReadyEvent.class)
        public void start() {
                
                this.template.subscribe(this.subscription, (pubsubMessage, ackReply ) -> {
                        log.info("consumed new message: [" + pubsubMessage.getData().toStringUtf8() + "]");
                        ackReply.ack();
                });
        }
}
  • 应用启动并运行后,我们显式订阅,将我们的客户端连接到正确的端点。

这个示例(很好地)使用了 PubSubTemplate。它简单、简短、易懂。然而,随着集成的复杂性增加,将消息流中涉及的组件从一个系统解耦到另一个系统变得很有用。我们引入阶段——组件链中的链接——消息必须经过这些阶段才能到达下游组件。这种分阶段允许我们编写可以替换的处理代码,而无需关心给定消息的来源或目的地。这促进了测试,因为组件只需根据其直接的前置条件和后置条件来编写:一个组件可以说它只接受 Spring Framework 的 Message<File> 类型,别无其他。这种接口间接性非常有用,特别是当我们开始连接实际世界中可能以不同频率处理工作的系统时。在工作到达下游组件之前引入一个代理来缓冲工作变得轻而易举,否则下游组件可能会成为瓶颈。这种方法——隔离消息流中涉及的组件并引入缓冲区以保护下游组件——称为分阶段事件驱动架构(SEDA),在世界转向微服务和高度分布式系统的今天,它比以往任何时候都更有价值。

Spring Integration 是一个旨在促进这种间接性的框架。其核心概念是 MessageChannel,您可以将其视为一个内存中的 Queue;一个消息流过的管道。在 MessageChannel 的每一侧都坐着组件。您可以想象一个组件输出特定类型的消息并将其发送到这个 MessageChannel 中,而不知道它将流向何处。另一端是另一个消费特定类型消息的组件,完全不知道任何给定消息的来源。今天可能有一个服务参与消息的生产。明天可能有十个!上游和下游组件无需更改。这种间接性给了我们很多可能性。我们可以更改给定消息的路由,使其流经不同的服务,分割、聚合等等。我们可以转换其他数据源并将其适配到上游消息流中(这称为入站适配器)。我们可以为数据引入新的接收器,将 Spring Framework 的 Message<T> 适配成正确的类型(这称为出站适配器)。

让我们看看 Spring Integration 以及 Google Cloud Pub/Sub 的入站和出站适配器。我们将采用与之前相同的方法:HTTP 端点将发布消息,然后这些消息被发送到 Google Cloud Pub/Sub。代码可以在不同的节点中运行。为了使这个示例工作,您还需要在类路径中包含 Spring Integration 类型。将 org.springframework.boot : spring-boot-starter-integration 添加到构建中。

让我们看一个只要发出 HTTP POST 请求就发布消息的发布者。在本例中,发布者将请求发送到一个 MessageChannel 中,然后由该通道将其传递给一个 PubSubMessageHandler。今天它直接发送到 Pub/Sub,但明天它可以发送到数据库、FTP 服务器、XMPP、Salesforce 或任何其他地方,然后再发送到 Pub/Sub。

package com.example.gcp.pubsub.integration;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@Configuration
class PublisherConfig {

        private final String topic;
        private final PubSubTemplate template;

        public PublisherConfig(
            @Value("${reservations.topic:reservations}") String t,
            PubSubTemplate template) {
                this.topic = t;
                this.template = template;
        }

        @Bean
        IntegrationFlow publisherFlow() {
                return IntegrationFlows
                    .from(this.outgoing()) 
                    .handle(this.pubSubMessageHandler()) 
                    .get();
        }

        @PostMapping("/publish/{name}")
        void publish(@PathVariable String name) {
                
                outgoing().send(MessageBuilder.withPayload(name).build());
        }

        @Bean
        SubscribableChannel outgoing() {
                return MessageChannels.direct().get();
        }

        @Bean
        PubSubMessageHandler pubSubMessageHandler() {
                return new PubSubMessageHandler(template, this.topic);
        }
}
  • IntegrationFlow 描述了集成的。发送到 outgoing MessageChannel 中的消息会被传递给 PubSubMessageHandler,然后由它使用指定的主题写入 Google Cloud Pub/Sub。

  • 在 Spring MVC HTTP 端点中,我们获取对 MessageChannel 的引用,并在其中发布一条消息(我们使用 MessageBuilder 构建该消息)。注意:像我在此示例中那样调用 outgoing() 是可以的,因为 Spring 会缓存方法调用的结果;我总是会获得同一个预先实例化的 MessageChannel bean 的单例。

在消费者端,我们做同样的事情,但顺序相反,适配传入的消息,然后在 IntegrationFlow 中记录它们。

package com.example.gcp.pubsub.integration;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;

@Slf4j
@Configuration
class SubscriberConfig {

        private final String subscription;
        private final PubSubTemplate template;

        SubscriberConfig(
            @Value("${reservations.subscription:reservations-subscription}") String s,
            PubSubTemplate t) {
                this.subscription = s;
                this.template = t;
        }

        @Bean 
        public PubSubInboundChannelAdapter messageChannelAdapter() {
                PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(
                    template, this.subscription);
                adapter.setOutputChannel(this.incoming());
                adapter.setAckMode(AckMode.MANUAL);
                return adapter;
        }

        @Bean
        MessageChannel incoming() {
                return MessageChannels.publishSubscribe().get();
        }

        @Bean
        IntegrationFlow subscriberFlow() {
                return IntegrationFlows
                    .from(this.incoming()) 
                    .handle(message -> { 
                            log.info("consumed new message: [" + message.getPayload() + "]");
                            AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ACKNOWLEDGEMENT, AckReplyConsumer.class);
                            consumer.ack();
                    })
                    .get();
        }
}
  • PubSubInboundChannelAdapter 适配来自订阅的消息,并将它们发送到 incoming MessageChannel

  • IntegrationFlow 接收传入的消息,并将它们路由到一个 MessageHandler(我们使用 lambda 语法贡献了这个处理程序),该处理程序 a) 记录传入的消息,并且 b) 手动确认收到消息。

IntegrationFlow 在两个示例中的好处是您可以将调用链接在一起。在这里,我们只指定消息来自哪里(.from())以及由谁处理它(.handle()),但我们也可以在调用 .handle() 之后轻松地路由、分割、转换等等消息。一个组件(适配器、消息处理程序、转换器等)输出的消息成为任何下游组件的输入。

订阅 Spring 时事通讯

订阅 Spring 时事通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,为您的进步注入强大动力。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部