领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多Spring Cloud Stream 最近添加了一个功能,用于将函数定义组合到现有的 Spring Cloud Stream 应用程序中。在本博文中,我们将了解 Spring Cloud Data Flow 如何利用此功能在流式管道中组合函数。
在 Spring Cloud Data Flow 中,流式数据管道由 Spring Cloud Stream 应用程序组成。开发人员可以选择现成的流式应用程序,这些应用程序涵盖了许多常见的用例。开发人员还可以扩展这些现成的应用程序,或者通过使用 Spring Cloud Stream 框架创建自定义应用程序。
Spring Cloud Stream 2.1.0 GA 的发布包含了Spring Cloud Function 基于编程模型的集成,该模型允许业务逻辑表示为java.util.Function
、java.util.Consumer
和 java.util.Supplier
,分别表示Processor
、Sink
和 Source
的角色。鉴于这种灵活性,Spring Cloud Stream 框架现在支持一种简单而强大的函数组合方法。在此上下文中,组合可能是源和处理器组合成单个应用程序:“新的源”。或者,它可能是处理器 + 接收器组合成单个应用程序:“新的接收器”。这种灵活性为流应用程序开发人员带来了有趣的新机遇。
让我们考虑如何创建一个管道以使用三个应用程序执行简单的转换,然后看看如何使用两个使用函数组合的应用程序将其实现为管道。
对于第一个流,
我们将使用现成的http-source
、transform-processor
和 log-sink
应用程序。
第一步,启动 Spring Cloud Data Flow local
服务器
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar
现在,启动 Spring Cloud Data Flow shell
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar
现在,让我们注册使用 RabbitMQ 绑定器的 HTTP 源、转换器处理器和日志接收器应用程序
dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar
现在我们可以创建一个没有函数组合的简单流
dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"
然后我们可以部署流
dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://127.0.0.1:9000"
POST (text/plain) https://127.0.0.1:9000 friend
202 ACCEPTED
您可以在log
应用程序中看到以下日志消息
[sformer.hello-1] log-sink : Hello FRIEND
在此流中,我们将 http(源)、转换器(处理器)和日志(接收器)应用程序作为独立应用程序部署在目标平台(在本例中为local
)。对于某些用例,对于简单的有效负载转换逻辑,我们可能希望将Processor
应用程序与Source
或Sink
应用程序组合。例如,在源输出数据中屏蔽某些特定用户特定字段的转换方案不一定需要部署为单独的独立应用程序。相反,它可以在源或接收器应用程序中进行组合。
为了将处理器函数组合到源或接收器应用程序中,我们使用 Spring Cloud Stream 的函数组合支持。
Spring Cloud Stream 中的函数组合支持基于 Spring Cloud Function 的能力,允许将java.util.Supplier
、java.util.Consumer
和 java.util.Function
作为 Spring @Bean
定义进行注册。这些函数@Bean
定义可在运行时用于组合。
Spring Cloud Stream 引入了一个新属性,称为spring.cloud.stream.function.definition
,它对应于 Spring Cloud Function 中的函数定义 DSL。设置此属性时,所需的函数 bean 会在运行时自动链接。
函数组合按以下方式进行
当 Spring Cloud Stream 应用程序的类型为Source
时,组合函数将在源output
之后应用。
当 Spring Cloud Stream 应用程序的类型为Sink
时,组合函数将在接收器input
之前应用。
这使得能够将函数(在 Spring Cloud Function DSL 中定义)组合到现有的 Spring Cloud Stream 应用程序中,并随后由 Spring Cloud Data Flow 在流式数据管道中进行编排。
让我们创建一个并部署一个流,该流将先前示例中的转换器表达式组合到Source
应用程序本身中。转换器逻辑是通过使用两个java.util.Function
实现来完成的。
我们将创建一个新的源应用程序,我们将其称为http-transformer
,它扩展了现成的 http 源应用程序。新源应用程序的源代码可以在此处找到。
http-transformer
应用程序包含如下定义的upper
和 concat
函数 bean
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
public static void main(String[] args) {
SpringApplication.run(HttpSourceRabbitApplication.class, args);
}
}
克隆 github存储库后,您可以使用 maven 构建应用程序
cd function-composition/http-transformer ./mvnw clean package
现在使用 Data Flow Shell 注册http-transformer
应用程序。
注意
对于下面的应用程序注册
--uri
选项,请将工件的目录名和路径替换为您系统中适当的值。
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
现在让我们创建流
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"
部署流时,我们传递spring.cloud.stream.function.definition
属性以定义组合函数 DSL(如 Spring Cloud Function 中定义)。在本例中,它是
dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
上述部署将upper
和 concat
函数 bean 组合到http
源应用程序中。
然后我们可以将有效负载发送到http
应用程序
dataflow:>http post --data "friend" --target "https://127.0.0.1:9001"
> POST (text/plain) https://127.0.0.1:9001 friend
> 202 ACCEPTED
然后您可以在log
应用程序中看到输出,如下所示:
[helloComposed-1] log-sink : Hello FRIEND
注意
请注意,函数组合支持不适用于现成的 Spring Cloud Stream Processor
应用程序,因为在函数需要在现有处理器的应用程序逻辑之前还是之后应用存在歧义。
但是,您可以创建自己的处理器应用程序,这些应用程序使用标准 java.util.Function API 进行函数组合,如下例所示
@Configuration
public static class FunctionProcessorConfiguration {
@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
}
然后您需要使用以下属性进行部署:spring.cloud.stream.function.definition=upperAndConcat
另一个有趣的功能是 Spring Cloud Function 支持 Kotlin 函数的函数组合。这让我们可以将任何 Kotlin 函数 bean 添加到Source
或Sink
应用程序的可组合函数中。
为了查看它的工作原理,让我们使用我们示例 github存储库中的http-transformer-kotlin-processor
应用程序。
Kotlin 函数 bean 被配置为处理器。这里,Kotlin 函数 bean 是如下定义的transform
函数
@Bean
open fun transform(): (String) -> String {
return { "How are you ".plus(it) }
}
此外,此项目将spring-cloud-function-kotlin
作为依赖项,以应用 Kotlin 函数的函数配置支持,定义如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
cd function-composition/http-transformer-kotlin ./mvnw clean package
注意
对于下面的应用程序注册
--uri
选项,请将工件的目录名和路径替换为您系统中适当的值。
dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar
要使用此应用程序作为Source
创建一个流
dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"
与我们在http-transformer
示例中所做的一样,我们可以使用spring.cloud.stream.function.definition
属性指定任何有效的组合函数 DSL 来构建函数组合。在本例中,让我们将通过 Java 配置注册的函数 bean 与来自 Kotlin 处理器配置的函数 bean 组合起来。
dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"
这里,函数名transform
对应于 Kotlin 函数。
注意:我们可以执行 Kotlin 函数和 Java 函数之间的组合,因为 Kotlin 函数在内部被转换为java.util.Function
。
dataflow:>http post --data "friend" --target "https://127.0.0.1:9002"
> POST (text/plain) https://127.0.0.1:9002 friend
> 202 ACCEPTED
并且,您可以在log
应用程序中看到输出,如下所示:
[omposedKotlin-1] log-sink : Hello How are you FRIEND
在此示例中,http-transformer
还包含函数的源代码。但是,您可以通过在单独的工件中定义函数 bean 来使应用程序更加模块化。然后,您可以通过仅向项目添加 maven 依赖项并设置spring.cloud.stream.function.definition
属性来构建应用程序。这样,您可以将大部分业务逻辑编码为函数,并且可以根据需要将其与源或接收器组合。