创建用于消费数据和生成 Spring Cloud Stream Sink 应用的函数

工程 | Soby Chacko | 2020 年 8 月 3 日 | ...

这是关于 Spring Cloud Stream 应用的 Java 函数系列博客的第 4 部分。

系列中的其他部分。

第 1 部分 - 一般介绍

第 2 部分 - 函数组合

第 3 部分 - Supplier 函数和 Source 应用

在本系列的上一篇博客中,我们探讨了如何使用 java.util.function.Supplier 来生成 Spring Cloud Stream source。在本新版中,我们将看到如何使用 java.util.function.Consumerjava.util.function.Function 开发和测试消费函数。稍后,我们将简要解释如何从该 consumer 生成 Spring Cloud Stream sink 应用。

编写 Consumer

编写 consumer 的思路相对简单。我们从外部源消费数据,并将其传递给 consumer 中的业务逻辑。正如我们在之前的博客中看到的 Supplier 一样,操作发生在业务逻辑实现内部。如果我们使用库来帮助我们完成所有繁重的工作,例如 Spring Integration,那么就只需通过适当的 API 将接收到的数据委托给库即可。然而,如果没有可用的此类库,我们需要自己编写所有代码。让我们举一个具体的例子来演示这一点。

为 Apache Pulsar 编写 Consumer

Apache Pulsar 是一个流行的消息中间件系统。让我们假设一下,我们要编写一个通用的 Java Consumer,它从某个地方接收数据,然后将其转发到 Pulsar。不去深究细节,下面是一个实现此目的的简单 Consumer。基本实现代码取自这里

@Bean
public org.apache.pulsar.client.api.Producer producer() {
  String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
  String topic = "persistent://sample/standalone/ns1/my-topic";
  return client.createProducer(topic);
}

@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
  return payload -> {
     producer.send(payload);
  };
}

再次强调,这仅用于说明目的,可能不是将数据发送到 Apache Pulsar 的完整实现。尽管如此,这演示了我们想要传达的概念。查看 consumer,我们可以看到代码很简单;我们在 lambda 表达式中所做的只是调用 Apache Pulsar Producer 上的 send 方法。

我们可以将上述 consumer 注入到应用程序中,并通过编程方式调用其 accept 方法,提供数据。正如我们在上一篇博客中看到的,下图展示了独立运行函数或作为数据编排管道一部分运行在诸如 Spring Cloud Data Flow 等平台上的想法。

Stream Applications Layered Architecture for Functions

好的,我们可能会认为那个 consumer 相当简单。那么如果我们要做的事情有点复杂呢?下面我们将这样做。

编写 RSocket 消费函数

RSocket 是一种双向二进制协议,Spring Framework 为其提供了出色的支持。RSocket 提供了一种即发即忘(fire and forget)模型,允许我们向 RSocket 服务器发送消息而不接收响应。我们希望使用 TCP 为此模型编写一个 consumer,该 consumer 接收外部数据,然后将其推送到 RSocket 服务器。RSocket 的Java 实现基于Project Reactor。因此,当我们编写 consumer 时,需要使用响应式类型和模式(类似于上一篇博客中的响应式 feed supplier)。

当使用即发即忘策略时,RSocket 返回 Mono<Void>,我们的 consumer 需要从函数中返回它。然而,对于 java.util.function.Consumer,我们无法返回任何东西。因此,我们必须编写一个具有签名 Function<String, Mono<Void>> rsocketConsumer() 的函数。由于该函数返回一个 Mono<Void>,这在语义上等同于编写一个 consumer。函数的使用者需要获取 Mono 的引用并订阅它。开箱即用的 consumer 中也使用了类似模式,我们已经为MongoDBCassandra 提供了这样的 consumer。

设置项目时,请包含以下 maven 依赖。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

来自 Spring Boot 的此 starter 依赖项会将所有 RSocket 依赖项传递地带到我们的项目中。

在编写函数代码之前,让我们编写一个 ConfigurationProperties 类来定义函数所需的一些核心属性。

@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {

  private String host = "localhost";

  private int port = 7000;

  private String route;
…
}

正如我们所见,使用前缀 rsocket.consumer,我们定义了三个属性 - hostport 用于 RSocket 服务器,而 route 是服务器上的一个端点。

现在我们有了配置属性,让我们创建一个 Configuration 类来配置我们的函数 bean。

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

  @Bean
  public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
                                            RsocketConsumerProperties rsocketConsumerProperties) {
     final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
           rsocketConsumerProperties.getPort());

     return input -> rSocketRequester
                 .flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
                       .data(input)
                       .send());
  }
}

我们将一个来自 Spring Boot 自动配置的 builder 注入到函数中,该 builder 帮助我们创建 RSocketRequester。使用此 builder,我们通过 TCP 连接创建一个 Mono<RSocketRequester>connectTcp API 方法接受 RSocket 的主机和端口信息。一旦我们获得了 RSocketRequester 的句柄,我们就在函数中提供的 lambda 中使用它。

我们在 Mono<RSocketRequester> 上调用 flatMap,对于每个传入消息,我们在调用最终将数据推送到 RSocket 服务器的 send 方法之前指定需要发送的 route 和数据。

这就是编写一个消费数据然后使用即发即忘交互模型将其发送到 RSocket 服务器的函数的全部过程。请记住,由于 Spring Framework 在底层提供了各种 RSocket 支持和抽象,这段代码看起来非常简单。

让我们快速编写一个测试来验证该函数是否按预期工作。

正如我们在上一篇博客中对响应式 supplier 所做的那样,将以下依赖项添加到项目中。这有助于我们测试响应式组件。

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

以下是带有其他必要组件的测试。

@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {

  @Autowired
  Function<Message<?>, Mono<Void>> rsocketConsumer;

  @Autowired
  TestController controller;

  @Test
  void testRsocketConsumer() {

     rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
           .subscribe();

     StepVerifier.create(this.controller.fireForgetPayloads)
           .expectNext("Hello RSocket")
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  @ComponentScan
  static class RSocketConsumerTestApplication{}

  @Controller
  static class TestController {
     final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();

     @MessageMapping("test-route")
     void someMethod(String payload) {
        this.fireForgetPayloads.onNext(payload);
     }
  }
}

测试组件的快速说明。

  • 我们在 SpringBootApplication 上提供了属性 spring.rsocket.server.port。这允许 Spring Boot 为测试自动配置一个默认的 RSocket 服务器。这里将端口硬编码为 7000,因为这是 Spring Boot 在自动配置组件时使用的默认端口。这与我们在上面属性中使用的默认值相同。我们还指定了我们想在测试中使用的 route

  • 提供了一个带有 MessageMapping 注解的方法的 Controller,它拦截到达我们在测试中指定的路由的消息。到达服务器上该路由的每个传入记录都传递到一个 Flux 中,稍后可以在测试中断言期间重放。

  • 在测试中,我们调用了注入的 RSocket consumer(我们之前编写的)上的 apply 方法,并为其提供了一个测试消息。

  • 最后,我们使用 StepVerifier 来验证消息已成功发送到 RSocket 服务器。

从 RSocket Consumer 生成 Spring Cloud Stream Sink 应用

上一篇博客中,我们详细介绍了如何从 Supplier 函数生成 Spring Cloud Stream source 应用。您可以遵循我们那里使用的相同模式,从上面编写的 RSocket 函数生成一个 sink 应用。我们在这里不再重复所有涉及的细节。可以使用这里提供的许多不同的 sink 应用作为模板。当我们使用 Spring Cloud Stream 中的测试 binder 测试函数时,将消息发送到 InputDestination。Spring Cloud Stream 会将其下游发送到 RSocket 服务器。然后我们可以使用我们在上面的单元测试中使用的相同验证策略。有关使用测试 binder 测试 Spring Cloud Stream 组件的更多信息,请参阅此处

结论

在本篇博客文章中,我们看到了如何编写一个简单的 consumer 来消费数据并对其进行处理,以 Apache Pulsar 为例。然后,我们探讨了如何以 Function<String, Mono<Void>> 的形式开发响应式 consumer,并以 RSocket 的即发即忘策略作为指导。我们还演示了如何对这个响应式 consumer 进行单元测试。请遵循本文中概述的步骤来编写您自己的数据 consumer,如果您这样做,请考虑贡献一个 pull request。

敬请期待…​

在接下来的几周里,我们将带来更多深入的焦点话题。在本系列的下一篇博客中,我们将开始一系列案例研究,探索已经存在的函数和应用程序。

获取 Spring 时事通讯

通过 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,为您的进步加速。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部