Spring Cloud Data Flow 中的组合函数支持

工程 | Ilayaperumal Gopinathan | 2019年1月9日 | ...

Spring Cloud Stream 最近添加了一项功能,可以将函数定义组合到现有的 Spring Cloud Stream 应用程序中。在这篇博客中,我们将了解 Spring Cloud Data Flow 如何利用此功能在流式管道中组合函数。

有什么不同之处?

在 Spring Cloud Data Flow 中,流式数据管道由 Spring Cloud Stream 应用程序组成。开发人员可以选择现成的流式应用程序,这些应用程序涵盖了许多常见用例。开发人员还可以扩展这些现成的应用程序,或使用 Spring Cloud Stream 框架创建自定义应用程序。

随着Spring Cloud Stream 2.1.0 GA的发布,集成了Spring Cloud Function支持的编程模型,该模型允许业务逻辑表示为 java.util.Functionjava.util.Consumerjava.util.Supplier,分别代表 ProcessorSinkSource 的角色。鉴于这种灵活性,Spring Cloud Stream 框架现在支持一种简单但强大的函数组合方法。在这种情况下,组合可以是将源和处理器合并到一个应用程序中:一个“新源”。否则,它可以是将处理器+接收器合并到一个应用程序中:“新接收器”。这种灵活性为流应用程序开发人员打开了有趣的机遇。

让我们考虑一下如何使用三个应用程序创建一个用于执行简单转换的管道,然后看看如何使用两个使用函数组合的应用程序将其实现为管道。

三个应用程序的流式管道

对于第一个流,

我们将使用现成的 http-sourcetransform-processorlog-sink 应用程序。

首先,启动 Spring Cloud Data Flow local 服务器

java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar

现在,启动 Spring Cloud Data Flow shell

java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar

现在我们注册使用 RabbitMQ 绑定器的 HTTP 源、转换器处理器和日志接收器应用程序

dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar

现在我们可以创建一个简单的流,而无需进行函数组合

dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然后我们可以部署流

dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://:9000"
POST (text/plain) https://:9000 friend
202 ACCEPTED

您可以在 log 应用程序中看到以下日志消息

[sformer.hello-1] log-sink                                 : Hello FRIEND

在此流中,我们将 http (源)、转换器 (处理器) 和日志 (接收器) 应用程序作为独立应用程序部署在目标平台 (在本例中是 local) 上。对于某些用例,对于简单的有效负载转换逻辑,我们可能希望将 Processor 应用程序与 SourceSink 应用程序结合起来。例如,在源输出数据时掩盖某些特定用户字段的转换场景不一定需要部署为单独的独立应用程序。相反,它可以组合在源或接收器应用程序中。

要将处理器函数组合到源或接收器应用程序中,我们使用 Spring Cloud Stream 的函数组合支持。

Spring Cloud Stream 中的函数组合支持基于 Spring Cloud Function 允许将 java.util.Supplierjava.util.Consumerjava.util.Function 注册为 Spring @Bean 定义的能力。这些函数 @Bean 定义可在运行时进行组合。

Spring Cloud Stream 引入了一个名为 spring.cloud.stream.function.definition 的新属性,该属性对应于 Spring Cloud Function 中的函数定义 DSL。设置此属性后,所需的函数 Bean 将在运行时自动链接。

函数组合发生的方式如下

当 Spring Cloud Stream 应用程序的类型为 Source 时,组合函数将在源 output 之后应用。

当 Spring Cloud Stream 应用程序的类型为 Sink 时,组合函数将在接收器 input 之前应用。

这使得能够将函数 (在 Spring Cloud Function DSL 中定义) 组合到现有的 Spring Cloud Stream 应用程序中,并随后由 Spring Cloud Data Flow 在流式数据管道中进行编排。

将函数组合到流应用程序中

让我们创建并部署一个流,将前面示例中的转换器表达式组合到 Source 应用程序本身。转换器逻辑是通过使用两个 java.util.Function 实现来完成的。

我们将创建一个新的源应用程序,我们称之为 http-transformer,它扩展了现成的 http 源应用程序。新源应用程序的源可以在这里找到。

http-transformer 应用程序包含 upperconcat 函数 Bean,如下定义

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

	@Bean
	public Function<String, String> upper() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> concat() {
		return value -> "Hello "+ value;
	}


	public static void main(String[] args) {
		SpringApplication.run(HttpSourceRabbitApplication.class, args);
	}
}

克隆 github 仓库后,您可以使用 maven 构建应用程序

cd function-composition/http-transformer ./mvnw clean package

现在,使用 Data Flow Shell 注册 http-transformer 应用程序。

注意

对于下面的应用程序注册 --uri 选项,请将工件的目录名称和路径替换为适合您系统的名称和路径。

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar

现在我们来创建流

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"

在部署流时,我们传递 spring.cloud.stream.function.definition 属性来定义组合函数 DSL (在 Spring Cloud Function 中定义)。在这种情况下,它是

dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"

上面的部署将 upperconcat 函数 Bean 组合到 http 源应用程序中。

然后我们可以将有效负载发送到 http 应用程序

dataflow:>http post --data "friend" --target "https://:9001"
> POST (text/plain) https://:9001 friend
> 202 ACCEPTED

然后您可以在 log 应用程序中看到输出,如下所示

[helloComposed-1] log-sink                                 : Hello FRIEND

注意

请注意,函数组合支持不适用于现成的 Spring Cloud Stream Processor 应用程序,因为无法明确是应该在现有处理器应用程序逻辑之前还是之后应用函数。

但是,您可以创建自己的处理器应用程序,这些应用程序使用函数组合和标准的 java.util.Function API,如下例所示

@Configuration
public static class FunctionProcessorConfiguration {

@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

然后您需要使用以下属性进行部署: spring.cloud.stream.function.definition=upperAndConcat

Kotlin 支持

另一个有趣的功能是 Spring Cloud Function 支持 Kotlin 函数的函数组合。这使我们可以将任何 Kotlin 函数 Bean 添加到 SourceSink 应用程序的可组合函数中。

要查看此功能的工作原理,让我们使用我们示例 github 仓库中的 http-transformer-kotlin-processor 应用程序。

Kotlin 函数 Bean 被配置为处理器。这里,Kotlin 函数 Bean 是定义的 transform 函数如下

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,该项目还具有 spring-cloud-function-kotlin 作为依赖项,用于为 Kotlin 函数应用函数配置支持,定义如下

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

cd function-composition/http-transformer-kotlin ./mvnw clean package

注意

对于下面的应用程序注册 --uri 选项,请将工件的目录名称和路径替换为适合您系统的名称和路径。

dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar

要使用此应用程序创建流作为 Source

dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

与我们在 http-transformer 示例中所做的一样,我们可以使用 spring.cloud.stream.function.definition 属性来指定任何有效的组合函数 DSL 来构建函数组合。在这种情况下,它是

dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"

这里,函数名 transform 对应于 Kotlin 函数。

注意:我们可以执行 Kotlin 函数和 Java 函数之间的组合,因为 Kotlin 函数在内部会被转换为 java.util.Function

dataflow:>http post --data "friend" --target "https://:9002"
> POST (text/plain) https://:9002 friend
> 202 ACCEPTED

并且,您可以在 log 应用程序中看到输出,如下所示

[omposedKotlin-1] log-sink               : Hello How are you FRIEND

在此示例中,http-transformer 也包含函数的源代码。但是,您可以将应用程序定义为更模块化,将函数 Bean 定义在单独的工件中。然后,您可以通过仅向项目添加 maven 依赖项并通过设置 spring.cloud.stream.function.definition 属性来构建应用程序。这样,您就可以将大部分业务逻辑编码为函数,并在必要时将其与源或接收器进行组合。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有