Spring Cloud Stream 应用的 Java 函数介绍 - 第 1 部分

工程 | David Turanski | 2020年7月20日 | ...

Spring Cloud Stream 应用的 Java 函数介绍 - 第 1 部分

上周我们发布了 Spring Cloud Stream 应用的 Java 函数介绍 - 第 0 部分
宣布 Spring Cloud Stream 应用 2020.0.0-M2 版本发布。
在这里,我们将探讨 函数组合,这是第 0 部分中介绍的面向函数的架构启用的一项更强大的功能。如果您还没有时间阅读 第 0 部分,现在是好时机!

函数组合

函数组合在数学和计算机科学中拥有坚实的理论基础。
实际上,它是一种将一系列函数连接起来以创建更复杂函数的方法。

让我们来看一个使用 Java 函数的简单示例。我们有两个函数,reverseupper
每个函数都接受一个 String 作为输入并产生一个 String 作为输出。我们可以使用内置的 andThen 方法将它们组合起来。组合函数本身就是一个 Function<String, String>
如果运行此代码,它将打印 ESREVER

Function<String, String> reverse = s -> new StringBuilder(s).reverse().toString(); Function<String, String> upper = String::toUpperCase; Function<String, String> reverseUpper = reverse.andThen(upper); System.out.println(reverseUpper.apply("reverse"));

提示

除了 andThen 之外,java.util.Function 还包括 compose,它首先应用参数 (b),然后将 a 应用于结果。
因此,a.compose(b).apply(s) 等效于 a.apply(b.apply(s))

Spring Cloud Function 中的函数组合

Spring Cloud Function 包含一些很棒的功能,可以将 函数组合提升到另一个层次。

声明式组合

如果我们将上面示例中的函数定义为 Spring bean,

@Bean Function<String, String> reverse() { return s -> new StringBuilder(s).reverse().toString(); }

@Bean Function<String, String> upper() { return String::toUpperCase;

}

我们可以使用 spring.cloud.function.definition 属性 spring.cloud.function.definition=upper|reverse 来组合这些函数

这里 | 是一个组合运算符,它会生成一个自动配置的 bean 来实现组合函数,以及相关的资源,以便您可以无缝地调用组合函数。

使用 Supplier 和 Consumer 进行组合

Spring Cloud Function 将原生 Java 函数组合扩展到支持与 Supplier 和 Consumer 的组合。

这遵循隐式为真的概念

  • 一个函数与一个 Consumer 组合是一个 Consumer

  • 一个 Supplier 与一个 Function 组合是一个 Supplier

  • 一个 Supplier 与一个 Consumer 组合是一个有效的处理模型(没有输入或输出,这种组合形式不映射到函数接口,但类似于 Runnable

正如我们将看到的,Spring Cloud Stream 应用有效地利用了这些概念。

类型转换

使用函数组合时,我们必须考虑兼容的参数类型。
使用原生 Java 组合,我们可以将 Function<Integer,String> 与 Function<String, Integer> 组合成 Function<Integer, Integer>

Function<Integer, String> intToStr = String::valueOf; Function<String, Integer> doubleit = i -> Integer.parseInt(i) * 2; Function<Integer, Integer> composite = intToStr.andThen(doubleit); composite.apply(10);

运行 Spring 应用时,Spring Cloud Function 使用 Spring 的标准类型转换支持根据需要强制转换函数参数。
鉴于以下 Function bean 定义,函数定义 intToStr|doubleit 按预期工作,将 String 转换为 Integer。

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, String> intToStr() { return String::valueOf;

}

除了转换基本类型外,Spring 函数还可以转换 Message 和 POJO、JSON 字符串和 POJO 等。
例如,以下函数可以按任何顺序组合

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, Message> convertIntMessage() { return i -> MessageBuilder.withPayload(String.valueOf(i)).build();

}

Spring Cloud Stream 中的函数组合

Spring Cloud Stream 3.x 基于 Spring Cloud Function 完全支持 函数式编程模型。Spring Cloud Stream 的基本前提是它使函数能够在分布式环境中执行。绑定器将打包在 Spring Boot 应用中的函数的输入和输出绑定到已配置的消息代理目标,以便一个函数产生的输出作为另一个远程运行函数的输入使用。我们可以将数据流管道视为功能组件的分布式组合。

为了说明这一点,一个典型的 Spring Cloud Stream 管道,例如

source | processor1 | processor2 | processor3 | sink

在逻辑上等效于

supplier | function1 | function2 | function3 | sink

这个想法导致了一些有趣的架构选择,因为我们可以使用函数组合将这些组件中的部分或全部组合到一个应用程序中。

例如,我们可以将三个处理器的序列实现为单个应用程序,我们称之为 composed-processor,打包 function1function2function3,并由 spring.cloud.function.definition=function1|function2|function3 组合。现在管道可以部署为

source | composed-processor | sink

更简单的是,我们可以创建一个 composed-source 在源中执行所有处理

composed-source | sink

与以往一样,这里没有正确的答案。总是有权衡需要考虑

  • 函数组合导致更少的部署。这降低了成本、延迟、运营复杂性等等。

  • 单个部署是松散耦合的,可以独立扩展。

  • 消息代理提供保证投递。当一个简单的无状态应用程序宕机并重新启动时,它可以从中断的地方继续,处理上一步的待处理结果。

  • 执行复杂处理的单个应用程序更难以推理,并将中间处理结果保存在内存中,或者可能保存在临时数据存储中。当有状态应用程序发生故障时,会导致状态不一致,从而使恢复更加困难。

如果这些权衡看起来很熟悉,那是因为它们与任何微服务与单体架构的争论几乎相同。最终,选择最适合您的方案。

预打包源应用程序的函数组合

在某些情况下,函数组合是很简单的。从一开始,我们就提供了预打包的处理器来执行简单的转换,或者使用SpEL进行过滤。旧版架构在使用预打包的源或接收器时需要一个单独的处理器。用户常见的抱怨是:“为什么我只需要部署一个单独的应用程序来评估 SpEL 表达式?”为了解决这个问题,我们在早期版本中最初引入了对函数组合的支持形式。要将此功能与预打包的应用程序一起使用,需要分叉它们来修改代码或构建依赖项以提供函数。

当前版本为所有预打包的源提供了开箱即用的函数组合。具体来说,现在可以将源与预打包的函数组合以在本地执行以下任何操作:

  • 执行 SpEL 转换

  • 丰富消息头

  • 过滤事件

  • 生成任务启动请求

例如,我们可以将time源与一个消息头丰富器和过滤器组合起来,使用配置属性并将其作为独立的 Spring Boot 应用程序运行:

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=even --spring.cloud.function.definition=timeSupplier|headerEnricherFunction|filterFunction --header.enricher.headers=seconds=T(java.lang.Integer).valueOf(payload .substring(payload.length() - 2)) --filter.function.expression=headers[seconds]%2==0

这将每隔一秒(当秒数为偶数时)将时间(例如07/16/20 16:43:48)发布到配置的目的地even

这里我们使用的是 RabbitMQ 的预打包时间源,将输出绑定到名为even的主题交换机。如果交换机不存在,绑定器将创建它。函数定义扩展了供应商以提取秒数,将其转换为整数并将其存储在seconds消息头中,然后根据消息头值进行过滤。只有偶数值才能通过过滤器。

任务启动请求

2018 年,我们介绍了一个使用 Spring Cloud Data Flow 和 Spring Batch 运行文件导入的参考架构。为此,我们分叉了sftp源作为sftp-dataflow,专门实现一个生成任务启动请求的预打包源。任务启动请求是一个简单的值对象,呈现为 JSON,并由tasklauncher-sink使用。接收器充当 Data Flow 的客户端,根据请求启动批处理应用程序。我们最初选择 sftp 是因为它是最常用的文件处理协议。但是,我们意识到相同的模式可以应用于任何源。现在我们可以通过函数组合来做到这一点。除了标准的sftp 源之外,我们还可以从ftpfiles3等触发任务启动。甚至时间源也可以用于定期启动任务。

这个有点人为的例子会生成任务启动请求:

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=time-test --spring.cloud.stream.function.definition=timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction --spel.function.expression=payload.length() --header.enricher.headers=task-id=payload*2 --task.launch.request.task-name-expression="'task-'+headers['task-id']

有效负载(作为 JSON)是{"args":[],"deploymentProps":{},"name":"task-34"}

使用用户编写的代码进行函数组合

实际上,当用户开发 Spring Cloud Stream 管道时,他们可能会从我们预打包的Spring Cloud Stream 应用程序中选择源和接收器。处理器通常是用户编写的代码,实现特定的业务逻辑。如果您正在编写处理器,或者想要扩展源或接收器,任何函数都可供您使用。由于我们将函数作为单独的工件发布,因此您可以简单地将它们包含在您的依赖项中。您可以使用上面显示的声明式组合,也可以将它们注入到您的代码中并以编程方式调用它们。当然,您也可以轻松地集成您自己的函数。

如何贡献新的函数或应用程序?

如果您在现有的函数和应用程序目录中找不到您要查找的内容,请考虑贡献。这样,整个开源社区都将从中受益。在随后的文章中,我们将逐步介绍开发函数和流应用程序的实际示例。

我们鼓励社区参与这个项目。除了代码贡献外,我们也非常感谢文档改进和创建问题。

敬请期待……

这篇博客是系列文章中的第二篇,将涵盖许多相关主题。在接下来的几周内,我们将深入探讨更多内容并关注特定主题。我们将带您了解此存储库中包含的所有组件以及相关的流程。

获取 Spring 新闻通讯

关注 Spring 新闻通讯

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部