Spring Cloud Stream 应用的 Java 函数介绍 - 第 1 部分

工程 | David Turanski | 2020 年 7 月 20 日 | ...

Spring Cloud Stream 应用的 Java 函数介绍 - 第 1 部分

上周我们发布了 Spring Cloud Stream 应用的 Java 函数介绍 - 第 0 部分
以宣布 Spring Cloud Stream applications 2020.0.0-M2 的发布。
在这里,我们探讨 函数组合,这是第 0 部分介绍的面向函数架构带来的更强大的特性之一。如果您还没有机会阅读 第 0 部分,现在是很好的时机!

函数组合

函数组合在数学和计算机科学中拥有坚实的理论基础。
实际上,它是将一系列函数连接起来创建更复杂函数的一种方式。

我们来看一个使用 Java 函数的简单例子。我们有两个函数,reverseupper
每个函数都接受一个 String 作为输入,并产生一个 String 作为输出。我们可以使用内置的 andThen 方法来组合它们。组合后的函数本身是一个 Function<String, String>
如果您运行这段代码,它将打印 ESREVER

Function<String, String> reverse = s -> new StringBuilder(s).reverse().toString(); Function<String, String> upper = String::toUpperCase; Function<String, String> reverseUpper = reverse.andThen(upper); System.out.println(reverseUpper.apply("reverse"));

提示

除了 andThen 之外,java.util.Function 还包括 compose 方法,它首先应用参数 (b),然后将 a 应用到结果上。
因此,a.compose(b).apply(s) 等同于 a.apply(b.apply(s))

Spring Cloud Function 中的函数组合

Spring Cloud Function 包含一些出色的功能,可以将函数组合提升到新的水平。

声明式组合

如果我们将上面例子中的函数定义为 Spring bean,

@Bean Function<String, String> reverse() { return s -> new StringBuilder(s).reverse().toString(); }

@Bean Function<String, String> upper() { return String::toUpperCase;

}

我们可以使用 spring.cloud.function.definition 属性来组合这些函数,例如 spring.cloud.function.definition=upper|reverse

这里 | 是一个组合运算符,它会生成一个自动配置的 bean 来实现组合函数,并提供相关资源,让您可以无缝地调用组合函数。

与 Supplier 和 Consumer 的组合

Spring Cloud Function 扩展了原生的 Java 函数组合,支持与 Supplier 和 Consumer 的组合。

这源于一些隐含为真的概念

  • Function 与 Consumer 组合后是一个 Consumer

  • Supplier 与 Function 组合后是一个 Supplier

  • Supplier 与 Consumer 组合是一种有效的处理模型(没有输入或输出,这种形式的组合不映射到函数式接口,但类似于 Runnable

正如我们将看到的,Spring Cloud Stream Applications 有效地运用了这些概念。

类型转换

在使用函数组合时,我们必须考虑兼容的参数类型。
使用原生的 Java 组合,我们可以将 Function<Integer,String> 与 Function<String, Integer> 组合成一个 Function<Integer, Integer>

Function<Integer, String> intToStr = String::valueOf; Function<String, Integer> doubleit = i -> Integer.parseInt(i) * 2; Function<Integer, Integer> composite = intToStr.andThen(doubleit); composite.apply(10);

运行 Spring 应用程序时,Spring Cloud Function 使用 Spring 标准的类型转换支持,根据需要强制转换函数参数。
给定以下 Function bean 定义,函数定义 intToStr|doubleit 按预期工作,将 String 转换为 Integer。

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, String> intToStr() { return String::valueOf;

}

除了转换基本类型之外,Spring 函数还可以在 Message 和 POJO、JSON String 和 POJO 之间进行转换,等等。
例如,以下函数可以按任意顺序组合

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, Message> convertIntMessage() { return i -> MessageBuilder.withPayload(String.valueOf(i)).build();

}

Spring Cloud Stream 中的函数组合

Spring Cloud Stream 3.x 构建在 Spring Cloud Function 之上,以完全支持函数式编程模型。Spring Cloud Stream 的基本前提是它使得函数能够在分布式环境中执行。绑定器将打包在 Spring Boot 应用程序中的函数的输入和输出绑定到配置的消息代理目的地,这样,一个函数产生的输出就可以作为另一个远程运行函数的输入被消费。我们可以将数据流管道视为函数组件的分布式组合。

为了说明这一点,一个典型的 Spring Cloud Stream 管道,例如

source | processor1 | processor2 | processor3 | sink

在逻辑上等同于

supplier | function1 | function2 | function3 | sink

这个想法带来了一些有趣的架构选择,因为我们可以使用函数组合将部分或全部这些组件组合到一个应用程序中。

例如,我们可以将三个处理器序列实现为一个单一应用程序,我们称之为 composed-processor,它打包了 function1function2function3,并通过 spring.cloud.function.definition=function1|function2|function3 进行组合。现在管道可以部署为

source | composed-processor | sink

更简单的是,我们可以创建一个 composed-source,在 source 中完成所有处理

composed-source | sink

一如既往,这里没有唯一的正确答案。总是有权衡需要考虑

  • 函数组合减少了部署数量。这降低了成本、延迟、操作复杂性等等。

  • 单独部署是松耦合的,可以独立扩展。

  • 消息代理提供有保证的交付。当一个简单的无状态应用程序宕机并重新启动时,它可以从中断的地方继续,处理上一个处理步骤的待处理结果。

  • 执行复杂处理的单一应用程序更难理解,并且将中间处理结果保存在内存中,或者可能保存在临时数据存储中。当有状态应用程序失败时,可能导致状态不一致,从而使恢复更加困难。

如果这些权衡看起来很熟悉,那是因为它们与微服务与单体应用之间的争论大致相同。最终,做最适合您的选择。

与预打包 Source 应用的函数组合

在某些情况下,函数组合是显而易见的。从一开始,我们就提供了预打包的处理器来执行简单的转换或使用 SpEL 进行过滤。在使用预打包的 source 或 sink 时,传统架构需要一个单独的处理器。用户普遍抱怨“为什么我只需要评估一个 SpEL 表达式,却需要部署一个单独的应用程序?”为了解决这个问题,我们在早期版本中初步引入了一种对 函数组合 的支持形式。要将此功能与预打包的应用程序一起使用,需要 fork 它们以修改代码或构建依赖项来提供函数。

当前版本为所有预打包的 source 提供了开箱即用的函数组合。具体来说,现在可以将 source 与 预打包函数 组合,在本地执行以下任何操作

  • 执行 SpEL 转换

  • 丰富消息头

  • 过滤事件

  • 产生任务启动请求

例如,我们可以将 time source 与消息头丰富器和过滤器通过配置属性进行组合,并将其作为独立的 Spring Boot 应用程序运行

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=even --spring.cloud.function.definition=timeSupplier|headerEnricherFunction|filterFunction --header.enricher.headers=seconds=T(java.lang.Integer).valueOf(payload .substring(payload.length() - 2)) --filter.function.expression=headers[seconds]%2==0

这将每隔一秒(当秒数为偶数时)发布时间,例如 `07/16/20 16:43:48,到配置的目的地 even

这里我们使用一个预打包的 RabbitMQ time source,将输出绑定到一个名为 even 的 topic exchange。如果 exchange 不存在,绑定器将创建它。函数定义扩展了 supplier,以提取秒数,将其转换为整数并存储在 seconds 消息头中,然后根据消息头的值进行过滤。只有偶数值通过过滤器。

任务启动请求

2018 年,我们引入了一种参考架构,用于使用 Spring Cloud Data Flow 和 Spring Batch 运行文件摄取。为此,我们将 sftp source fork 为 sftp-dataflow,专门用于实现一个生成任务启动请求的预打包 source。任务启动请求是一个简单的值对象,渲染为 JSON,并由 tasklauncher-sink 消费。sink 作为 Data Flow 的客户端,根据请求启动批处理应用程序。我们最初选择 sftp 是因为它是在文件处理中最常用的协议。然而,我们意识到同样的模式可以应用于任何 source。现在我们可以通过函数组合来实现这一点。除了标准的 sftp source 之外,我们还可以从 ftpfiles3 等触发任务启动。甚至 time source 也可以用于定期启动任务。

这个有些刻意的例子生成任务启动请求

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=time-test --spring.cloud.stream.function.definition=timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction --spel.function.expression=payload.length() --header.enricher.headers=task-id=payload*2 --task.launch.request.task-name-expression="'task-'+headers['task-id']

JSON 格式的有效载荷是 {"args":[],"deploymentProps":{},"name":"task-34"}

与用户编写代码的函数组合

实际上,当用户开发 Spring Cloud Stream 管道时,很可能会从我们预打包的 Spring Cloud Stream Applications 中选择 source 和 sink。处理器通常是用户编写的代码,实现特定的业务逻辑。如果您正在编写处理器,或者想扩展 source 或 sink,任何 函数 都可以供您使用。由于我们将函数作为独立构件发布,您可以轻松地将它们包含在您的依赖项中。您既可以使用上面所示的声明式组合,也可以将它们注入到您的代码中并通过编程方式调用它们。当然,您也可以轻松地集成自己的函数。

如何贡献新的函数或应用程序?

如果您在现有函数和应用程序目录中找不到您需要的内容,请考虑贡献。这样,整个开源社区都会受益。在后续的文章中,我们将通过一个实际例子来介绍如何开发函数和 stream 应用程序。

我们鼓励社区参与到这个项目中。除了代码贡献,我们非常感谢文档改进和创建 issue。

敬请期待…​

这篇博客是系列文章中的第二篇,将涵盖许多相关主题。未来几周将有更多深入探讨和重点主题的文章。我们将带您全面了解此仓库中包含的组件及其周围的过程。

订阅 Spring 电子报

随时关注 Spring 电子报

订阅

先人一步

VMware 提供培训和认证,助您加速发展。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,一次简单订阅即可获得。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部