领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多如果您在 Spring One Platform 2019 上听了 Oleg Zhurakousky 关于 Spring Cloud Stream & Functions 的演讲,或者阅读了他最近关于 简化的 Spring Cloud Stream 和 函数式 Spring Cloud Stream 的博文,您可能会想:“等等!Spring Integration 支持发生了什么变化?我现在该如何处理我的 @ServiceActivator
或 IntegrationFlow
?我过去习惯使用 Sink.input()
作为通道来使用一些 Spring Integration 逻辑来消费绑定器目标!” 正如 Oleg 在他的博文中提到的,使用现有的 @EnableBinding
等仍然是可能的,但我们正在逐渐放弃这种模型,那么我们如何在函数式 Spring Cloud Stream 的世界中仍然受益于 Spring Integration 的所有功能呢?
在这篇博文中,我将扩展 Spring Integration 上下文中 Spring Cloud Stream 的函数式功能,以及它在现代基于函数的流中的重要性!
是的,我们确实可以创建一个简单的 Function
桥,它将调用 MessageChannel.send()
,但我们也可以使用 Spring Integration 中的消息网关抽象来实现,如下所示
@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
extends Function<Message<InputData>, Message<OutputData>> { }
鉴于结果 Bean 是 java.util.function.Function
的扩展,它是一个完全有效的 Spring Cloud Function 和 Spring Cloud Stream 绑定候选者。它的泛型输入/输出参数类型由 Spring Cloud Stream 用于在前后执行正确的有效负载转换。此外,标头从绑定器传递到下游集成流并返回。这很好,但我们仍然需要了解通道并提供一些 SI 特定的注解来将此类网关与我们的流连接起来(样板代码)。
使用 Spring Integration 的 Java DSL,我们可以更进一步,消除更多样板代码,同时获得使用函数式 Spring Cloud Stream 的好处。我们需要的是相同的 gateway
方法,但在 DSL 样式中。Oleg 博文中 uppercase
示例在 Spring Integration 中将如下所示
@SpringBootApplication
public class SampleApplication {
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(Function.class,
gateway -> gateway.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.get();
}
}
使用 Spring Integration 实现大写转换用例非常简单,但想象一下,我们需要执行一些复杂的逻辑,例如 split
、scatter-gather
(并行调用外部服务),然后 aggregate
,进行一些审计,最后才将结果从我们的函数返回到输出目标。所有这些以及更多内容都可以使用 Spring Integration、其 EIP 支持、Java DSL 抽象以及前面提到的函数包装器来实现。
java.util.function.Consumer
和 java.util.function.Supplier
接口可以以类似的方式使用,并在网关代理周围根据其约定实现相应的逻辑。
您可以在 Spring Integration 的 参考手册 中找到有关函数支持的更多信息。
我们之前展示的所有内容都是关于命令式函数的,这些函数会针对每个事件触发。响应式函数仅通过将整个事件流作为 Flux
传递到函数中来触发一次。Spring Integration 中的响应式流支持可帮助您编写响应式 Spring Integration 流,这些流可以作为 Spring Cloud Stream 中的函数公开。
以下示例展示了如何在响应式 Spring Integration 调用周围构建响应式函数包装器
public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows.from(FluxFunction.class,
gateway -> gateway.beanName("uppercase"))
.handle(RSockets.outboundGateway("/uppercase")
.command(RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
虽然通过 RSocket 实现 uppercase
仍然很傻,但此示例的目的是让您了解如何使用 Spring Integration 解决更复杂的用例。
在这里,我们获得一个传递到函数中的 Flux
并将其传播到 RSocket 请求者,用于 request channel
交互模型。结果 Flux
通过 Spring Integration 中的 replyChannel
标头内部传递回函数返回。
另一个响应式示例可能是将数据从推送模型传输到拉取模型。换句话说,将事件流表示为 Supplier
@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
.toReactivePublisher();
}
@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
Publisher<Message<byte[]>> httpRequestPublisher) {
return () -> Flux.from(httpRequestPublisher);
}
这样,传入的 HTTP 请求就会进入一个源 Flux
,以便输出绑定器目标可以将其拉取到下游,并遵守背压和其他响应式流要求。
有关 Spring Integration 中响应式流支持的更多信息,请参阅 参考手册。
Spring Integration 仍然是 Spring Cloud Stream 微服务开发中不可或缺的一部分。它的函数式支持允许将属于企业集成模式类别的复杂用例公开为 Java 函数,从而在 Spring Cloud Stream 中提供一致的执行模型。事实上,通过使用此基础,Spring Cloud Stream 应用启动器 最终将被函数实现所取代。
请随时提供您的反馈!
附注:对于那些急于使用 Kotlin 的人,我想分享一个最近启动的 Spring Integration Kotlin DSL 项目。