Spring Cloud Stream - 函数式与响应式

工程 | Oleg Zhurakousky | 2019 年 10 月 17 日 | ...

上一篇文章中,我试图说明我们在Spring Cloud Stream (SCSt) 中转向函数式编程模型的理由。它带来了更少的代码更少的配置。然而,最重要的是,您的代码与 SCSt 的内部实现完全解耦且独立

在这篇文章中,我将更深入地探讨并总结我们的函数式支持的核心特性,特别是其响应式特性。

重要提示:使用 @StreamListener/@EnableBinding 可以完成的任何事情,在不使用它们的情况下也可以完成。换句话说,函数式支持现在与基于注解的支持功能兼容。

虽然下面描述的所有特性都是 Spring Cloud Function (SCF)(SCSt 的依赖项)的特性,但在理解函数在 SCSt 上下文中的附加含义时,需要注意一些细微之处。

Supplier, Function 和 Consumer

SCSt 将任何类型为 SupplierFunctionConsumer 的 bean,或任何可以映射到 SupplierFunctionConsumer 的 bean(例如 POJO 函数、Kotlin Lambda 等)视为消息处理程序FunctionConsumer)或消息源Supplier)。根据使用的函数式策略类型,输入和输出绑定会通过以下命名约定自动生成:<函数名>-<in/out>-<索引>

考虑以下示例

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

前面的函数被视为一个消息处理程序,它从 uppercase-in-0 绑定消费并发送到 uppercase-out-0 绑定。其余现有的流属性可以照常使用。例如 --spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain

点击这里了解更多详情和配置选项。

###命令式还是响应式 函数可以是命令式的或响应式的。命令式函数在每个单独的事件上触发,而响应式函数触发一次,并传递对由Project Reactor提供的整个事件流抽象(例如 FluxMono)的引用。

考虑以下两组示例

命令式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

响应式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

除了提供一种不同的(monadic)编程风格来处理事件(这很容易被视为个人偏好问题)之外,响应式编程对于某些用例增加了额外价值,否则实现这些用例会相当复杂。虽然本文不会详细讨论这些用例或响应式模式,但状态管理用例(例如窗口化、聚合和分组)以及流的拆分或多个流的合并用例仍然值得提及,我将在下一节中讨论这些内容。

关于响应式函数的绑定和命名规则,它们与前一节中解释的相同。

注意:虽然前面的示例使用了 Function,但相同的规则也适用于 SupplierConsumer。用户指南的Spring Cloud Function 支持部分以及Reactor 文档提供了更多详情。

###函数元数 有时需要对数据流进行分类和组织。例如,考虑一个经典的大数据用例,处理包含“订单”和“发票”等未组织数据,您希望将它们分别存储到不同的数据存储中。这时,函数元数(具有多个输入和输出的函数)的支持就派上用场了。

让我们看一个这类函数的示例(完整的实现细节可在此处找到),

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

鉴于 Project Reactor 是 SCF 的核心依赖,我们使用了它的 Tuple 库。Tuple 通过向我们传达基数类型信息,提供了独特的优势。这两点在 SCSt 的上下文中都极其重要。基数让我们知道需要创建多少输入和输出绑定,并将它们绑定到函数的相应输入和输出。了解类型信息确保了正确的类型转换。

此外,绑定名称命名约定的“索引”部分也在这里发挥作用,因为在这个函数中,两个输出绑定的名称是 organise-out-0organise-out-1

重要提示:目前,函数元数仅支持以复杂事件处理为中心的响应式函数(Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>),在此场景下,对事件汇合进行评估和计算通常需要查看事件流,而不是单个事件。

有关最新详情,请阅读用户指南中具有多个输入和输出参数的函数一节。

订阅 Spring 资讯

订阅 Spring 资讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅,提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部