领先一步
VMware 提供培训和认证,为您的进步加速。
了解更多这是关于 Spring Cloud Stream 应用的 Java 函数系列博客的第 4 部分。
系列中的其他部分。
第 3 部分 - Supplier 函数和 Source 应用
在本系列的上一篇博客中,我们探讨了如何使用 java.util.function.Supplier
来生成 Spring Cloud Stream source。在本新版中,我们将看到如何使用 java.util.function.Consumer
和 java.util.function.Function
开发和测试消费函数。稍后,我们将简要解释如何从该 consumer 生成 Spring Cloud Stream sink 应用。
编写 consumer 的思路相对简单。我们从外部源消费数据,并将其传递给 consumer 中的业务逻辑。正如我们在之前的博客中看到的 Supplier
一样,操作发生在业务逻辑实现内部。如果我们使用库来帮助我们完成所有繁重的工作,例如 Spring Integration,那么就只需通过适当的 API 将接收到的数据委托给库即可。然而,如果没有可用的此类库,我们需要自己编写所有代码。让我们举一个具体的例子来演示这一点。
Apache Pulsar 是一个流行的消息中间件系统。让我们假设一下,我们要编写一个通用的 Java Consumer
,它从某个地方接收数据,然后将其转发到 Pulsar。不去深究细节,下面是一个实现此目的的简单 Consumer
。基本实现代码取自这里。
@Bean
public org.apache.pulsar.client.api.Producer producer() {
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
String topic = "persistent://sample/standalone/ns1/my-topic";
return client.createProducer(topic);
}
@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
return payload -> {
producer.send(payload);
};
}
再次强调,这仅用于说明目的,可能不是将数据发送到 Apache Pulsar 的完整实现。尽管如此,这演示了我们想要传达的概念。查看 consumer,我们可以看到代码很简单;我们在 lambda 表达式中所做的只是调用 Apache Pulsar Producer
上的 send
方法。
我们可以将上述 consumer 注入到应用程序中,并通过编程方式调用其 accept
方法,提供数据。正如我们在上一篇博客中看到的,下图展示了独立运行函数或作为数据编排管道一部分运行在诸如 Spring Cloud Data Flow 等平台上的想法。
好的,我们可能会认为那个 consumer 相当简单。那么如果我们要做的事情有点复杂呢?下面我们将这样做。
RSocket 是一种双向二进制协议,Spring Framework 为其提供了出色的支持。RSocket 提供了一种即发即忘(fire and forget)模型,允许我们向 RSocket 服务器发送消息而不接收响应。我们希望使用 TCP 为此模型编写一个 consumer,该 consumer 接收外部数据,然后将其推送到 RSocket 服务器。RSocket 的Java 实现基于Project Reactor。因此,当我们编写 consumer 时,需要使用响应式类型和模式(类似于上一篇博客中的响应式 feed supplier)。
当使用即发即忘策略时,RSocket 返回 Mono<Void>
,我们的 consumer 需要从函数中返回它。然而,对于 java.util.function.Consumer
,我们无法返回任何东西。因此,我们必须编写一个具有签名 Function<String, Mono<Void>> rsocketConsumer()
的函数。由于该函数返回一个 Mono<Void>
,这在语义上等同于编写一个 consumer。函数的使用者需要获取 Mono
的引用并订阅它。开箱即用的 consumer 中也使用了类似模式,我们已经为MongoDB 和Cassandra 提供了这样的 consumer。
设置项目时,请包含以下 maven 依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
来自 Spring Boot 的此 starter 依赖项会将所有 RSocket 依赖项传递地带到我们的项目中。
在编写函数代码之前,让我们编写一个 ConfigurationProperties
类来定义函数所需的一些核心属性。
@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {
private String host = "localhost";
private int port = 7000;
private String route;
…
}
正如我们所见,使用前缀 rsocket.consumer
,我们定义了三个属性 - host
和 port
用于 RSocket 服务器,而 route
是服务器上的一个端点。
现在我们有了配置属性,让我们创建一个 Configuration
类来配置我们的函数 bean。
@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {
@Bean
public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
RsocketConsumerProperties rsocketConsumerProperties) {
final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
rsocketConsumerProperties.getPort());
return input -> rSocketRequester
.flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
.data(input)
.send());
}
}
我们将一个来自 Spring Boot 自动配置的 builder 注入到函数中,该 builder 帮助我们创建 RSocketRequester
。使用此 builder,我们通过 TCP 连接创建一个 Mono<RSocketRequester>
。connectTcp
API 方法接受 RSocket 的主机和端口信息。一旦我们获得了 RSocketRequester
的句柄,我们就在函数中提供的 lambda 中使用它。
我们在 Mono<RSocketRequester>
上调用 flatMap
,对于每个传入消息,我们在调用最终将数据推送到 RSocket 服务器的 send
方法之前指定需要发送的 route
和数据。
这就是编写一个消费数据然后使用即发即忘交互模型将其发送到 RSocket 服务器的函数的全部过程。请记住,由于 Spring Framework 在底层提供了各种 RSocket
支持和抽象,这段代码看起来非常简单。
让我们快速编写一个测试来验证该函数是否按预期工作。
正如我们在上一篇博客中对响应式 supplier 所做的那样,将以下依赖项添加到项目中。这有助于我们测试响应式组件。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
以下是带有其他必要组件的测试。
@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {
@Autowired
Function<Message<?>, Mono<Void>> rsocketConsumer;
@Autowired
TestController controller;
@Test
void testRsocketConsumer() {
rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
.subscribe();
StepVerifier.create(this.controller.fireForgetPayloads)
.expectNext("Hello RSocket")
.thenCancel()
.verify();
}
@SpringBootApplication
@ComponentScan
static class RSocketConsumerTestApplication{}
@Controller
static class TestController {
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
@MessageMapping("test-route")
void someMethod(String payload) {
this.fireForgetPayloads.onNext(payload);
}
}
}
测试组件的快速说明。
我们在 SpringBootApplication
上提供了属性 spring.rsocket.server.port
。这允许 Spring Boot 为测试自动配置一个默认的 RSocket 服务器。这里将端口硬编码为 7000
,因为这是 Spring Boot 在自动配置组件时使用的默认端口。这与我们在上面属性中使用的默认值相同。我们还指定了我们想在测试中使用的 route
。
提供了一个带有 MessageMapping
注解的方法的 Controller
,它拦截到达我们在测试中指定的路由的消息。到达服务器上该路由的每个传入记录都传递到一个 Flux
中,稍后可以在测试中断言期间重放。
在测试中,我们调用了注入的 RSocket
consumer(我们之前编写的)上的 apply
方法,并为其提供了一个测试消息。
最后,我们使用 StepVerifier
来验证消息已成功发送到 RSocket
服务器。
在上一篇博客中,我们详细介绍了如何从 Supplier 函数生成 Spring Cloud Stream source 应用。您可以遵循我们那里使用的相同模式,从上面编写的 RSocket 函数生成一个 sink 应用。我们在这里不再重复所有涉及的细节。可以使用这里提供的许多不同的 sink 应用作为模板。当我们使用 Spring Cloud Stream 中的测试 binder 测试函数时,将消息发送到 InputDestination
。Spring Cloud Stream 会将其下游发送到 RSocket 服务器。然后我们可以使用我们在上面的单元测试中使用的相同验证策略。有关使用测试 binder 测试 Spring Cloud Stream 组件的更多信息,请参阅此处。
在本篇博客文章中,我们看到了如何编写一个简单的 consumer 来消费数据并对其进行处理,以 Apache Pulsar 为例。然后,我们探讨了如何以 Function<String, Mono<Void>>
的形式开发响应式 consumer,并以 RSocket 的即发即忘策略作为指导。我们还演示了如何对这个响应式 consumer 进行单元测试。请遵循本文中概述的步骤来编写您自己的数据 consumer,如果您这样做,请考虑贡献一个 pull request。
在接下来的几周里,我们将带来更多深入的焦点话题。在本系列的下一篇博客中,我们将开始一系列案例研究,探索已经存在的函数和应用程序。