创建用于消费数据并生成 Spring Cloud Stream Sink 应用程序的函数

工程 | Soby Chacko | 2020年8月3日 | ...

这是博客系列的第4部分,我们将在其中介绍用于 Spring Cloud Stream 应用程序的 Java 函数。

系列中的其他部分。

第1部分 - 一般介绍

第2部分 - 函数组合

第3部分 - Supplier 函数和源应用程序

在本系列的上一篇博客中,我们了解了如何使用java.util.function.Supplier生成 Spring Cloud Stream 源。在本新版本中,我们将了解如何使用java.util.function.Consumerjava.util.function.Function开发和测试消费函数。稍后,我们将简要说明如何从此消费者生成 Spring Cloud Stream Sink 应用程序。

编写消费者

编写消费者的思路相对简单。我们从某个外部来源消费数据,并将其传递给消费者中的业务逻辑。与我们之前在上一篇博客中看到的Supplier一样,操作发生在业务逻辑实现内部。如果我们使用库来帮助我们完成所有繁重的工作,例如 Spring Integration,那么它就变成了简单地通过适当的 API 将接收到的数据委托给库的问题。但是,如果没有此类库可用,我们需要自己编写所有这些代码。让我们以一个具体的例子来演示这一点。

为 Apache Pulsar 编写消费者

Apache Pulsar 是一种流行的消息中间件系统。让我们假设我们想要编写一个通用的 Java Consumer,它从某个地方接收数据,然后将其转发到 Pulsar。无需深入细节,这里有一个微不足道的Consumer来实现这一点。基本实现代码取自此处

@Bean
public org.apache.pulsar.client.api.Producer producer() {
  String pulsarBrokerRootUrl = "pulsar://127.0.0.1:6650";
  PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
  String topic = "persistent://sample/standalone/ns1/my-topic";
  return client.createProducer(topic);
}

@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
  return payload -> {
     producer.send(payload);
  };
}

再次强调,这是为了说明目的而显示的,可能不是将数据发送到 Apache Pulsar 的完整实现。但是,这演示了我们想要传达的概念。查看消费者,我们可以看到代码很简单;在 lambda 表达式中,我们所做的只是在 Apache Pulsar Producer上调用send方法。

我们可以将上述消费者注入应用程序并以编程方式调用其accept方法,提供数据。正如我们在上一篇博客中看到的,下图表达了在Spring Cloud Data Flow等平台上独立运行函数或作为数据编排管道的一部分运行的思想。

Stream Applications Layered Architecture for Functions

好的,那个消费者非常简单,我们可能会这样想。如果我们想要做一些更复杂的事情会怎么样?下面,我们将正是这样做。

为 RSocket 编写消费函数

RSocket 是一种双向二进制协议,Spring Framework 为其提供了出色的支持。RSocket 提供了一种“fire and forget”模型,允许我们向 RSocket 服务器发送消息而不接收响应。我们希望使用 TCP 为此模型编写一个消费者,其中消费者接收外部数据,然后将其推送到 RSocket 服务器。RSocket 的 Java 实现基于Project Reactor。因此,当我们编写消费者时,我们需要使用响应式类型和模式(类似于上一篇博客中的响应式馈送提供程序)。

使用“fire and forget”策略时,RSocket 返回一个Mono<Void>,我们的消费者需要从函数中返回它。但是,在java.util.function.Consumer的情况下,我们不能返回任何内容。因此,我们必须编写一个签名为Function<String, Mono<Void>> rsocketConsumer()的函数。由于该函数返回一个Mono<Void>,因此在语义上等同于编写一个消费者。函数的用户需要获取对Mono的引用并订阅它。在现成的消费者中使用了类似的模式,我们已经为MongoDBCassandra提供了这些模式。

设置项目时,请包含以下 Maven 依赖项。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

来自 Spring Boot 的此启动程序依赖项将传递地将所有 RSocket 依赖项引入我们的项目。

在编写函数代码之前,让我们编写一个ConfigurationProperties类来定义函数需要的一些核心属性。

@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {

  private String host = "localhost";

  private int port = 7000;

  private String route;
…
}

如我们所见,使用前缀rsocket.consumer,我们定义了三个属性 - hostport用于 RSocket 服务器,route是服务器上的端点。

现在我们有了配置属性,让我们创建一个Configuration类来配置我们的函数 Bean。

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

  @Bean
  public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
                                            RsocketConsumerProperties rsocketConsumerProperties) {
     final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
           rsocketConsumerProperties.getPort());

     return input -> rSocketRequester
                 .flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
                       .data(input)
                       .send());
  }
}

我们将来自 Spring Boot 自动配置的构建器注入到函数中,该构建器可以帮助我们创建RSocketRequester。使用此构建器,我们使用 TCP 连接创建Mono<RSocketRequester>connectTcp API 方法获取 RSocket 主机和端口信息。一旦我们获得对RSocketRequester的句柄,然后我们在函数中提供的 lambda 中使用它。

我们在Mono<RSocketRequester>上调用flatMap,并且对于每条传入的消息,我们指定route和需要发送的数据,然后调用最终将数据推送到 RSocket 服务器的send方法。

这就是编写一个消费数据然后使用“fire and forget”交互模型将其发送到 RSocket 服务器的函数所需的一切。请记住,由于 Spring Framework 在底层为我们提供了各种RSocket支持和抽象,因此此代码看起来非常简单。

让我们编写一个快速测试以验证该函数是否按预期工作。

与我们在上一篇博客中对响应式提供程序所做的一样,将以下依赖项添加到项目中。这有助于我们测试响应式组件。

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

以下是测试和其他必要组件。

@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {

  @Autowired
  Function<Message<?>, Mono<Void>> rsocketConsumer;

  @Autowired
  TestController controller;

  @Test
  void testRsocketConsumer() {

     rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
           .subscribe();

     StepVerifier.create(this.controller.fireForgetPayloads)
           .expectNext("Hello RSocket")
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  @ComponentScan
  static class RSocketConsumerTestApplication{}

  @Controller
  static class TestController {
     final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();

     @MessageMapping("test-route")
     void someMethod(String payload) {
        this.fireForgetPayloads.onNext(payload);
     }
  }
}

测试组件的简要说明。

  • 我们在SpringBootApplication上提供属性spring.rsocket.server.port。这允许 Spring Boot 自动配置一个用于测试的默认 RSocket 服务器。此处将端口硬编码为7000,因为这是 Spring Boot 在自动配置组件时使用的默认端口。这与我们在上面的属性中使用的默认值相同。我们还指定了要在测试中使用的route

  • 提供了一个Controller,其中包含一个用MessageMapping注释的方法,该方法拦截到达我们在测试中指定的路由的消息。服务器在路由上接收到的每条传入记录都传递到Flux中,以便稍后在测试期间断言时重放。

  • 在测试中,我们正在调用我们之前编写的注入的RSocket消费者的apply方法,并为其提供测试消息。

  • 最后,我们使用StepVerifier来验证消息是否已成功发送到RSocket服务器。

从 RSocket 消费者生成 Spring Cloud Stream Sink 应用程序

上一篇博客中,我们详细介绍了如何从 Supplier 函数生成 Spring Cloud Stream 源应用程序。您可以遵循我们在那里用于从上面编写的 RSocket 函数生成 Sink 应用程序的相同模式。我们不会在此处重新详细介绍所有相关细节。使用此处提供的许多不同的 Sink 应用程序作为模板。当我们使用 Spring Cloud Stream 中的测试绑定器测试函数时,将消息发送到InputDestination。Spring Cloud Stream 将将其下游发送到 RSocket 服务器。然后,我们可以使用我们在上述单元测试中使用的相同验证策略。有关使用测试绑定器测试 Spring Cloud Stream 组件的更多信息,请参阅此处

结论

在这篇博文中,我们了解了如何编写一个简单的消费者来消费数据并对其进行操作,并以 Apache Pulsar 为例。然后,我们探讨了如何使用 RSocket 的 fire and forget 策略,以 Function<String, Mono<Void>> 的形式开发一个响应式消费者。我们还演示了如何对这个响应式消费者进行单元测试。请按照本文中列出的步骤编写您自己的数据消费者,如果您这样做,请考虑提交 pull 请求

敬请期待…​

在接下来的几周内,请关注更多深入探讨和重点关注的主题。在本系列的下一篇博文中,我们将开始一系列案例研究,其中我们将探索已经存在的函数和应用程序。

获取 Spring 新闻通讯

关注 Spring 新闻通讯

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部