创建用于消费数据和生成 Spring Cloud Stream Sink 应用程序的函数

工程 | Soby Chacko | 2020年8月3日 | ...

这是本系列博客的第4部分,我们将介绍用于 Spring Cloud Stream 应用程序的 Java 函数。

本系列的其他部分。

第一部分 - 概览

第二部分 - 函数组合

第3部分 - 供应商函数和源应用程序

在本系列的上一篇博客中,我们看到了如何使用 java.util.function.Supplier 来生成 Spring Cloud Stream 源。在这个新版本中,我们将看到如何使用 java.util.function.Consumerjava.util.function.Function 开发和测试消费函数。稍后,我们将简要解释如何从该消费者生成 Spring Cloud Stream 接收器应用程序。

编写消费者

编写消费者的思想相对简单。我们从某个外部源消费数据,并将其交给消费者中的业务逻辑。正如我们在上一篇博客中看到的 Supplier 一样,动作发生在业务逻辑实现内部。如果我们使用诸如 Spring Integration 等库来帮助我们完成所有繁重的工作,那么就变成了简单地通过适当的 API 将接收到的数据委托给库。但是,如果没有可用的此类库,我们需要自己编写所有这些代码。让我们举一个具体的例子来演示这一点。

为 Apache Pulsar 编写消费者

Apache Pulsar 是一个流行的消息中间件系统。让我们假设我们想编写一个通用的 Java Consumer,它从某个地方接收数据,然后将其转发到 Pulsar。不必深入细节,这里有一个简单的 Consumer 可以实现这一点。基本实现代码取自此处

@Bean
public org.apache.pulsar.client.api.Producer producer() {
  String pulsarBrokerRootUrl = "pulsar://:6650";
  PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
  String topic = "persistent://sample/standalone/ns1/my-topic";
  return client.createProducer(topic);
}

@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
  return payload -> {
     producer.send(payload);
  };
}

再次声明,这仅用于说明目的,可能不是将数据发送到 Apache Pulsar 的完整实现。尽管如此,这演示了我们想要传达的概念。查看消费者,我们可以看到代码很简单;我们在 lambda 表达式中所做的只是调用 Apache Pulsar Producer 上的 send 方法。

我们可以将上述消费者注入到应用程序中,并通过编程方式调用其 accept 方法,提供数据。正如我们在上一篇博客中看到的,下图表达了独立运行函数或作为 Spring Cloud Data Flow 等平台上数据编排管道一部分的想法。

Stream Applications Layered Architecture for Functions

好吧,那个消费者相当简单,我们可能会这样想。如果我们想做一些更复杂的事情呢?下面,我们将准确地做这件事。

为 RSocket 编写消费函数

RSocket 是一种双向二进制协议,Spring Framework 为其提供了出色的支持。RSocket 提供了一种“即发即忘”模型,允许我们向 RSocket 服务器发送消息而无需接收响应。我们希望使用 TCP 为此模型编写一个消费者,其中消费者接收外部数据然后将其推送到 RSocket 服务器。RSocket 的 Java 实现基于Project Reactor。因此,当我们编写消费者时,我们需要使用响应式类型和模式(类似于上一篇博客中的响应式数据源)。

当使用“即发即忘”策略时,RSocket 返回一个 Mono<Void>,我们的消费者需要从函数中返回它。然而,在 java.util.function.Consumer 的情况下,我们不能返回任何东西。因此,我们必须编写一个签名为 Function<String, Mono<Void>> rsocketConsumer() 的函数。由于该函数返回一个 Mono<Void>,这在语义上等同于编写一个消费者。函数的使用者需要获取 Mono 的引用并订阅它。我们为 MongoDBCassandra 提供的开箱即用消费者中也使用了类似的模式。

设置项目时,请包含以下 Maven 依赖项。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

这个来自 Spring Boot 的 starter 依赖将传递地将所有 RSocket 依赖项带入我们的项目。

在编写函数代码之前,让我们编写一个 ConfigurationProperties 类来定义函数所需的一些核心属性。

@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {

  private String host = "localhost";

  private int port = 7000;

  private String route;
…
}

正如我们所看到的,使用前缀 rsocket.consumer,我们定义了三个属性——hostport 用于 RSocket 服务器,route 是服务器上的一个端点。

现在我们有了配置属性,让我们创建一个 Configuration 类来配置我们的函数 Bean。

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

  @Bean
  public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
                                            RsocketConsumerProperties rsocketConsumerProperties) {
     final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
           rsocketConsumerProperties.getPort());

     return input -> rSocketRequester
                 .flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
                       .data(input)
                       .send());
  }
}

我们将来自 Spring Boot 自动配置的构建器注入到函数中,该构建器帮助我们创建 RSocketRequester。使用此构建器,我们创建了一个具有 TCP 连接的 Mono<RSocketRequester>connectTcp API 方法接收 RSocket 主机和端口信息。一旦我们获得了 RSocketRequester 的句柄,我们就会在函数中提供的 lambda 表达式中使用它。

我们对 Mono<RSocketRequester> 调用 flatMap,对于每个传入消息,我们指定 route 和需要发送的数据,然后调用 send 方法,最终将数据推送到 RSocket 服务器。

这就是编写一个消费数据然后使用“即发即忘”交互模型将其发送到 RSocket 服务器的函数所需的全部内容。请记住,由于 Spring Framework 在底层提供了各种 RSocket 支持和抽象,此代码看起来非常简单。

让我们快速编写一个测试,以验证函数是否按预期工作。

正如我们在上一篇博客中对响应式数据源所做的那样,将以下依赖项添加到项目中。这有助于我们测试响应式组件。

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

以下是带有其他必要组件的测试。

@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {

  @Autowired
  Function<Message<?>, Mono<Void>> rsocketConsumer;

  @Autowired
  TestController controller;

  @Test
  void testRsocketConsumer() {

     rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
           .subscribe();

     StepVerifier.create(this.controller.fireForgetPayloads)
           .expectNext("Hello RSocket")
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  @ComponentScan
  static class RSocketConsumerTestApplication{}

  @Controller
  static class TestController {
     final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();

     @MessageMapping("test-route")
     void someMethod(String payload) {
        this.fireForgetPayloads.onNext(payload);
     }
  }
}

对测试组件的快速解释。

  • 我们在 SpringBootApplication 上提供了属性 spring.rsocket.server.port。这允许 Spring Boot 为测试自动配置一个默认的 RSocket 服务器。这里将端口硬编码为 7000,因为这是 Spring Boot 在自动配置组件时使用的默认端口。这与我们上面属性中使用的默认值相同。我们还指定了我们想要在测试中使用的 route

  • 提供了一个带有 MessageMapping 注解方法的 Controller,它拦截到达我们在测试中指定路由的消息。服务器上路由上的每个传入记录都会传递到一个 Flux 中,以便稍后在测试断言期间进行重播。

  • 在测试中,我们调用了注入的 RSocket 消费者(我们之前编写的)上的 apply 方法,并为其提供了一个测试消息。

  • 最后,我们使用 StepVerifier 来验证消息是否已成功发送到 RSocket 服务器。

从 RSocket 消费者生成 Spring Cloud Stream Sink 应用程序

上一篇博客中,我们详细介绍了如何从 Supplier 函数生成 Spring Cloud Stream 源应用程序。您可以遵循我们那里使用的相同模式,从我们上面编写的 RSocket 函数生成一个 sink 应用程序。我们在这里不再重复所有细节。使用此处提供的许多不同的 sink 应用程序作为模板。当我们在 Spring Cloud Stream 中使用测试绑定器测试函数时,将消息发送到 InputDestination。Spring Cloud Stream 会将其发送到下游的 RSocket 服务器。然后我们可以使用与上面单元测试中相同的验证策略。有关使用测试绑定器测试 Spring Cloud Stream 组件的更多信息,请参阅此处

结论

在这篇博客文章中,我们看到了如何编写一个简单的消费者,该消费者消费数据并对其进行操作,以 Apache Pulsar 为例。然后我们探讨了如何以 Function<String, Mono<Void>> 的形式开发一个响应式消费者,并以 RSocket 的“即发即忘”策略作为指导。我们还演示了如何对这个响应式消费者进行单元测试。请遵循本文中列出的步骤来编写您自己的数据消费者,如果您这样做,请考虑贡献一个拉取请求。

敬请期待…​

在接下来的几周里,请期待更多深入的专题。本系列的下一篇博客将开始一系列案例研究,我们将探讨已有的函数和应用程序。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有