Spring Cloud Stream - 和 Spring Integration。

工程 | Artem Bilan | 2019 年 10 月 25 日 | ...

如果您在 Spring One Platform 2019 上听了 Oleg Zhurakousky 关于 Spring Cloud Stream & Functions 的演讲,或者阅读了他最近关于 简化的 Spring Cloud Stream函数式 Spring Cloud Stream 的博文,您可能会想:“等等!Spring Integration 支持发生了什么变化?我现在该如何处理我的 @ServiceActivatorIntegrationFlow?我过去习惯使用 Sink.input() 作为通道来使用一些 Spring Integration 逻辑来消费绑定器目标!” 正如 Oleg 在他的博文中提到的,使用现有的 @EnableBinding 等仍然是可能的,但我们正在逐渐放弃这种模型,那么我们如何在函数式 Spring Cloud Stream 的世界中仍然受益于 Spring Integration 的所有功能呢?

在这篇博文中,我将扩展 Spring Integration 上下文中 Spring Cloud Stream 的函数式功能,以及它在现代基于函数的流中的重要性!

Spring Integration 作为函数?!

是的,我们确实可以创建一个简单的 Function 桥,它将调用 MessageChannel.send(),但我们也可以使用 Spring Integration 中的消息网关抽象来实现,如下所示

@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
                  extends Function<Message<InputData>, Message<OutputData>> { }

鉴于结果 Bean 是 java.util.function.Function 的扩展,它是一个完全有效的 Spring Cloud Function 和 Spring Cloud Stream 绑定候选者。它的泛型输入/输出参数类型由 Spring Cloud Stream 用于在前后执行正确的有效负载转换。此外,标头从绑定器传递到下游集成流并返回。这很好,但我们仍然需要了解通道并提供一些 SI 特定的注解来将此类网关与我们的流连接起来(样板代码)。

使用 Spring Integration 的 Java DSL,我们可以更进一步,消除更多样板代码,同时获得使用函数式 Spring Cloud Stream 的好处。我们需要的是相同的 gateway 方法,但在 DSL 样式中。Oleg 博文中 uppercase 示例在 Spring Integration 中将如下所示

@SpringBootApplication
public class SampleApplication  {

    @Bean
    public IntegrationFlow uppercaseFlow() {
        return IntegrationFlows.from(Function.class,
                             gateway -> gateway.beanName("uppercase"))
                   .<String, String>transform(String::toUpperCase)
                   .get();
    }
}

使用 Spring Integration 实现大写转换用例非常简单,但想象一下,我们需要执行一些复杂的逻辑,例如 splitscatter-gather(并行调用外部服务),然后 aggregate,进行一些审计,最后才将结果从我们的函数返回到输出目标。所有这些以及更多内容都可以使用 Spring Integration、其 EIP 支持、Java DSL 抽象以及前面提到的函数包装器来实现。

java.util.function.Consumerjava.util.function.Supplier 接口可以以类似的方式使用,并在网关代理周围根据其约定实现相应的逻辑。

您可以在 Spring Integration 的 参考手册 中找到有关函数支持的更多信息。

响应式流呢?

我们之前展示的所有内容都是关于命令式函数的,这些函数会针对每个事件触发。响应式函数仅通过将整个事件流作为 Flux 传递到函数中来触发一次。Spring Integration 中的响应式流支持可帮助您编写响应式 Spring Integration 流,这些流可以作为 Spring Cloud Stream 中的函数公开。

以下示例展示了如何在响应式 Spring Integration 调用周围构建响应式函数包装器

public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
                           ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows.from(FluxFunction.class,
                        gateway -> gateway.beanName("uppercase"))
            .handle(RSockets.outboundGateway("/uppercase")
                    .command(RSocketOutboundGateway.Command.requestStreamOrChannel)
                    .expectedResponseType(String.class)
                    .clientRSocketConnector(clientRSocketConnector))
            .get();
}

虽然通过 RSocket 实现 uppercase 仍然很傻,但此示例的目的是让您了解如何使用 Spring Integration 解决更复杂的用例。

在这里,我们获得一个传递到函数中的 Flux 并将其传播到 RSocket 请求者,用于 request channel 交互模型。结果 Flux 通过 Spring Integration 中的 replyChannel 标头内部传递回函数返回。

另一个响应式示例可能是将数据从推送模型传输到拉取模型。换句话说,将事件流表示为 Supplier

@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
    return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
                    Publisher<Message<byte[]>> httpRequestPublisher) {
    return () -> Flux.from(httpRequestPublisher);
}

这样,传入的 HTTP 请求就会进入一个源 Flux,以便输出绑定器目标可以将其拉取到下游,并遵守背压和其他响应式流要求。

有关 Spring Integration 中响应式流支持的更多信息,请参阅 参考手册

总结

Spring Integration 仍然是 Spring Cloud Stream 微服务开发中不可或缺的一部分。它的函数式支持允许将属于企业集成模式类别的复杂用例公开为 Java 函数,从而在 Spring Cloud Stream 中提供一致的执行模型。事实上,通过使用此基础,Spring Cloud Stream 应用启动器 最终将被函数实现所取代。

请随时提供您的反馈!

附注:对于那些急于使用 Kotlin 的人,我想分享一个最近启动的 Spring Integration Kotlin DSL 项目。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部