Spring Cloud Stream - 函数式和响应式

工程 | Oleg Zhurakousky | 2019年10月17日 | ...

上一篇博文中,我尝试为我们在Spring Cloud Stream (SCSt) 中转向函数式编程模型提供理由。它代码更少配置更少。但最重要的是,您的代码与 SCSt 的内部完全解耦和独立

在这篇博文中,我将深入探讨并总结我们函数式支持的核心功能,特别是围绕其响应式功能。

重要提示:您可以使用@StreamListener/@EnableBinding完成的所有操作,也可以在没有它们的情况下完成。换句话说,函数式支持现在与基于注释的支持在功能上兼容。

虽然下面描述的所有功能都是Spring Cloud Function (SCF)(SCSt 的一个依赖项)的功能,但必须注意某些细微差别才能理解函数在 SCSt 上下文中附加的含义。

Supplier、Function 和 Consumer

任何类型为SupplierFunctionConsumer的 Bean,或任何可以映射到SupplierFunctionConsumer的 Bean(例如 POJO 函数、Kotlin lambda 等),都被 SCSt 视为消息处理程序FunctionConsumer)或消息源Supplier)。根据所使用的函数式策略类型,使用以下命名约定<function-name>-<in/out>-<index>自动生成输入和输出绑定。

考虑以下示例

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

前面的函数被视为一个消息处理程序,它从uppercase-in-0消费并发送到uppercase-out-0绑定。其余现有的流属性可以像以前一样使用。例如--spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain

点击此处以获取更多详细信息和配置选项。

###命令式或响应式函数可以是命令式响应式。命令式函数在每个单独事件上触发,而响应式函数触发一次,将对整个事件流抽象(例如FluxMono)的引用传递给由Project Reactor提供。

考虑以下一对示例

命令式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

响应式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

除了为您提供一种不同的(单子)编程风格来处理事件(这很容易被视为偏好问题)之外,响应式编程还为某些用例增加了额外的价值,否则这些用例的实现将非常复杂。虽然讨论这些用例或响应式模式的细节超出了本文的范围,但状态管理用例(例如窗口化、聚合和分组)以及拆分流或合并多个流的用例仍然值得一提,我将在下一节中讨论这些用例。

关于响应式函数的绑定和命名规则,与上一节中解释的相同。

注意:虽然前面的示例使用Function作为示例,但相同的规则也适用于SupplierConsumer。用户指南中的Spring Cloud Function 支持部分以及Reactor 文档提供了更多详细信息。

###函数元数有时需要对数据流进行分类和组织。例如,考虑一个处理非组织数据的经典大数据用例,例如包含“订单”和“发票”,并且希望每个都进入一个单独的数据存储。这就是函数元数(具有多个输入和输出的函数)支持发挥作用的地方。

让我们看一个此类函数的示例(完整的实现细节可在此处找到),

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用其 Tuple 库。元组通过向我们传达基数类型信息来提供独特的优势。两者在 SCSt 的上下文中都非常重要。基数让我们知道需要创建多少个输入和输出绑定以及绑定到函数的相应输入和输出。了解类型信息可确保正确的类型转换。

此外,这就是绑定名称命名约定中“索引”部分发挥作用的地方,因为在此函数中,两个输出绑定名称为organise-out-0organise-out-1

重要提示:目前,函数元数仅支持以复杂事件处理为中心的响应式函数(Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>),其中对事件汇合的评估和计算通常需要查看事件流而不是单个事件。

有关最新详细信息,请阅读用户指南中的具有多个输入和输出参数的函数部分。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部