先行一步
VMware 提供培训和认证,助您快速提升。
了解更多Spring Cloud Stream 最近新增了一项功能,可以将函数定义组合到现有的 Spring Cloud Stream 应用程序中。在本篇博客中,我们将了解 Spring Cloud Data Flow 如何利用此功能在流式管道中组合函数。
在 Spring Cloud Data Flow 中,流式数据管道由 Spring Cloud Stream 应用程序组成。开发人员可以选择并使用开箱即用的流应用程序,这些应用程序涵盖了许多常见用例。开发人员还可以扩展这些开箱即用的应用程序,或使用 Spring Cloud Stream 框架创建自定义应用程序。
Spring Cloud Stream 2.1.0 GA 版本的发布包含了基于Spring Cloud Function的编程模型的集成,该模型允许将业务逻辑表示为 java.util.Function
、java.util.Consumer
和 java.util.Supplier
,它们分别代表 Processor
、Sink
和 Source
的角色。鉴于这种灵活性,Spring Cloud Stream 框架现在支持一种简单但强大的函数组合方法。在这种上下文中,组合可以是将源和处理器组合成一个应用程序:一个“新源”。或者,它可以是将处理器 + 接收器组合成一个应用程序:“一个新接收器”。这种灵活性为流应用程序开发人员开辟了有趣的新机会。
让我们看看如何使用三个应用程序创建一个管道来执行简单的转换,然后再看看如何使用两个利用函数组合的应用程序来实现同样的管道。
对于第一个流,
我们将使用开箱即用的 http-source
、transform-processor
和 log-sink
应用程序。
第一步,启动 Spring Cloud Data Flow local
服务器
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar
现在,启动 Spring Cloud Data Flow shell
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar
现在让我们注册使用 RabbitMQ 绑定的 HTTP 源、转换器处理器和日志接收器应用程序
dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar
现在我们可以创建一个没有函数组合的简单流
dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"
然后我们可以部署这个流
dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "http://localhost:9000"
POST (text/plain) http://localhost:9000 friend
202 ACCEPTED
您可以在 log
应用程序中看到以下日志消息
[sformer.hello-1] log-sink : Hello FRIEND
在这个流中,我们将 http (source)、transformer (processor) 和 log (sink) 应用程序作为独立应用程序部署在目标平台(本例中是 local
)上。对于某些用例,例如简单的负载转换逻辑,我们可能希望将 Processor
应用程序与 Source
或 Sink
应用程序结合使用。例如,在源输出数据中屏蔽某些特定用户字段的转换场景不一定需要作为单独的独立应用程序部署。相反,它可以在 Source 或 Sink 应用程序中进行组合。
要将 Processor 函数组合到 Source 或 Sink 应用程序中,我们使用 Spring Cloud Stream 的函数组合支持。
Spring Cloud Stream 中的函数组合支持基于 Spring Cloud Function 的能力,即允许将 java.util.Supplier
、java.util.Consumer
和 java.util.Function
注册为 Spring @Bean
定义。这些函数 @Bean
定义在运行时可用于组合。
Spring Cloud Stream 引入了一个名为 spring.cloud.stream.function.definition
的新属性,该属性对应于 Spring Cloud Function 中的函数定义 DSL。设置此属性后,所需的函数 bean 会在运行时自动链接。
函数组合的发生方式如下:
当 Spring Cloud Stream 应用程序类型为 Source
时,组合函数应用于源的 output
之后。
当 Spring Cloud Stream 应用程序类型为 Sink
时,组合函数应用于接收器的 input
之前。
这使得可以将函数(在 Spring Cloud Function DSL 中定义)组合到现有的 Spring Cloud Stream 应用程序中,然后由 Spring Cloud Data Flow 在流式数据管道中进行编排。
让我们创建并部署一个流,将前面示例中的转换器表达式组合到 Source
应用程序本身中。转换逻辑通过使用两个 java.util.Function
实现来完成。
我们将创建一个新的源应用程序,称之为 http-transformer
,它扩展了开箱即用的 http source 应用程序。新源应用程序的源代码可以在这里找到。
http-transformer
应用程序包含 upper
和 concat
函数 bean,定义如下:
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
public static void main(String[] args) {
SpringApplication.run(HttpSourceRabbitApplication.class, args);
}
}
克隆 github 仓库后,您可以使用 maven 构建应用程序
cd function-composition/http-transformer ./mvnw clean package
现在使用 Data Flow Shell 注册 http-transformer
应用程序。
注意
对于下面的应用程序注册,在
--uri
选项中,将 artifact 的目录名和路径替换为您系统上的相应值。
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
现在让我们创建流
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"
部署流时,我们传递 spring.cloud.stream.function.definition
属性来定义组合函数 DSL(如 Spring Cloud Function 中定义)。在本例中,它是
dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
上述部署将 upper
和 concat
函数 bean 组合到 http
source 应用程序中。
然后我们可以将负载发送到 http
应用程序
dataflow:>http post --data "friend" --target "http://localhost:9001"
> POST (text/plain) http://localhost:9001 friend
> 202 ACCEPTED
然后在 log
应用程序中看到输出,如下所示:
[helloComposed-1] log-sink : Hello FRIEND
注意
请注意,函数组合支持不适用于开箱即用的 Spring Cloud Stream Processor
应用程序,因为在函数应该应用于现有处理器应用程序逻辑之前还是之后存在歧义。
但是,您可以使用标准的 java.util.Function API 创建利用函数组合的自己的处理器应用程序,如下例所示:
@Configuration
public static class FunctionProcessorConfiguration {
@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
}
然后您需要使用以下属性进行部署:spring.cloud.stream.function.definition=upperAndConcat
另一个有趣的功能是 Spring Cloud Function 支持 Kotlin 函数的函数组合。这允许我们将任何 Kotlin 函数 bean 添加到 Source
或 Sink
应用程序的可组合函数中。
为了展示其工作原理,让我们使用我们示例 github 仓库中的 http-transformer-kotlin-processor
应用程序。
Kotlin 函数 bean 配置为处理器。在这里,Kotlin 函数 bean 是下面定义的 transform
函数
@Bean
open fun transform(): (String) -> String {
return { "How are you ".plus(it) }
}
此外,该项目包含 spring-cloud-function-kotlin
依赖项,用于为 Kotlin 函数应用函数配置支持,定义如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
cd function-composition/http-transformer-kotlin ./mvnw clean package
注意
对于下面的应用程序注册,在
--uri
选项中,将 artifact 的目录名和路径替换为您系统上的相应值。
dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar
使用此应用程序作为 Source
创建流
dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"
正如我们在 http-transformer
示例中所做的那样,我们可以使用 spring.cloud.stream.function.definition
属性来指定任何有效的组合函数 DSL 来构建函数组合。在本例中,我们将通过 Java 配置注册的函数 bean 与来自 Kotlin 处理器配置的函数 bean 结合起来。
dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"
这里,函数名 transform
对应于 Kotlin 函数。
注意:我们可以进行 Kotlin 函数和 Java 函数之间的组合,因为 Kotlin 函数在内部被转换为 java.util.Function
。
dataflow:>http post --data "friend" --target "http://localhost:9002"
> POST (text/plain) http://localhost:9002 friend
> 202 ACCEPTED
并且,您可以在 log
应用程序中看到输出,如下所示:
[omposedKotlin-1] log-sink : Hello How are you FRIEND
在此示例中,http-transformer
也包含了函数的源代码。但是,您可以通过在单独的 artifact 中定义函数 bean 来使应用程序更加模块化。然后,您只需向项目中添加一个 maven 依赖项并设置 spring.cloud.stream.function.definition
属性即可构建应用程序。通过这种方式,您可以将大部分业务逻辑编写为函数,并在需要时将其与 Source 或 Sink 组合使用。