Spring Cloud Data Flow 中的函数组合支持

工程 | Ilayaperumal Gopinathan | 2019 年 1 月 9 日 | ...

Spring Cloud Stream 最近添加了一个功能,用于将函数定义组合到现有的 Spring Cloud Stream 应用程序中。在本博文中,我们将了解 Spring Cloud Data Flow 如何利用此功能在流式管道中组合函数。

有什么不同?

在 Spring Cloud Data Flow 中,流式数据管道由 Spring Cloud Stream 应用程序组成。开发人员可以选择现成的流式应用程序,这些应用程序涵盖了许多常见的用例。开发人员还可以扩展这些现成的应用程序,或者通过使用 Spring Cloud Stream 框架创建自定义应用程序。

Spring Cloud Stream 2.1.0 GA 的发布包含了Spring Cloud Function 基于编程模型的集成,该模型允许业务逻辑表示为java.util.Functionjava.util.Consumerjava.util.Supplier,分别表示ProcessorSinkSource 的角色。鉴于这种灵活性,Spring Cloud Stream 框架现在支持一种简单而强大的函数组合方法。在此上下文中,组合可能是源和处理器组合成单个应用程序:“新的源”。或者,它可能是处理器 + 接收器组合成单个应用程序:“新的接收器”。这种灵活性为流应用程序开发人员带来了有趣的新机遇。

让我们考虑如何创建一个管道以使用三个应用程序执行简单的转换,然后看看如何使用两个使用函数组合的应用程序将其实现为管道。

使用三个应用程序的流式管道

对于第一个流,

我们将使用现成的http-sourcetransform-processorlog-sink 应用程序。

第一步,启动 Spring Cloud Data Flow local 服务器

java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar

现在,启动 Spring Cloud Data Flow shell

java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar

现在,让我们注册使用 RabbitMQ 绑定器的 HTTP 源、转换器处理器和日志接收器应用程序

dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar

现在我们可以创建一个没有函数组合的简单流

dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然后我们可以部署流

dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://127.0.0.1:9000"
POST (text/plain) https://127.0.0.1:9000 friend
202 ACCEPTED

您可以在log 应用程序中看到以下日志消息

[sformer.hello-1] log-sink                                 : Hello FRIEND

在此流中,我们将 http(源)、转换器(处理器)和日志(接收器)应用程序作为独立应用程序部署在目标平台(在本例中为local)。对于某些用例,对于简单的有效负载转换逻辑,我们可能希望将Processor 应用程序与SourceSink 应用程序组合。例如,在源输出数据中屏蔽某些特定用户特定字段的转换方案不一定需要部署为单独的独立应用程序。相反,它可以在源或接收器应用程序中进行组合。

为了将处理器函数组合到源或接收器应用程序中,我们使用 Spring Cloud Stream 的函数组合支持。

Spring Cloud Stream 中的函数组合支持基于 Spring Cloud Function 的能力,允许将java.util.Supplierjava.util.Consumerjava.util.Function 作为 Spring @Bean 定义进行注册。这些函数@Bean 定义可在运行时用于组合。

Spring Cloud Stream 引入了一个新属性,称为spring.cloud.stream.function.definition,它对应于 Spring Cloud Function 中的函数定义 DSL。设置此属性时,所需的函数 bean 会在运行时自动链接。

函数组合按以下方式进行

当 Spring Cloud Stream 应用程序的类型为Source 时,组合函数将在源output 之后应用。

当 Spring Cloud Stream 应用程序的类型为Sink 时,组合函数将在接收器input 之前应用。

这使得能够将函数(在 Spring Cloud Function DSL 中定义)组合到现有的 Spring Cloud Stream 应用程序中,并随后由 Spring Cloud Data Flow 在流式数据管道中进行编排。

将函数组合到流应用程序中

让我们创建一个并部署一个流,该流将先前示例中的转换器表达式组合到Source 应用程序本身中。转换器逻辑是通过使用两个java.util.Function 实现来完成的。

我们将创建一个新的源应用程序,我们将其称为http-transformer,它扩展了现成的 http 源应用程序。新源应用程序的源代码可以在此处找到。

http-transformer应用程序包含如下定义的upperconcat 函数 bean

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

	@Bean
	public Function<String, String> upper() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> concat() {
		return value -> "Hello "+ value;
	}


	public static void main(String[] args) {
		SpringApplication.run(HttpSourceRabbitApplication.class, args);
	}
}

克隆 github存储库后,您可以使用 maven 构建应用程序

cd function-composition/http-transformer ./mvnw clean package

现在使用 Data Flow Shell 注册http-transformer 应用程序。

注意

对于下面的应用程序注册--uri 选项,请将工件的目录名和路径替换为您系统中适当的值。

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar

现在让我们创建流

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"

部署流时,我们传递spring.cloud.stream.function.definition 属性以定义组合函数 DSL(如 Spring Cloud Function 中定义)。在本例中,它是

dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"

上述部署将upperconcat 函数 bean 组合到http 源应用程序中。

然后我们可以将有效负载发送到http 应用程序

dataflow:>http post --data "friend" --target "https://127.0.0.1:9001"
> POST (text/plain) https://127.0.0.1:9001 friend
> 202 ACCEPTED

然后您可以在log 应用程序中看到输出,如下所示:

[helloComposed-1] log-sink                                 : Hello FRIEND

注意

请注意,函数组合支持不适用于现成的 Spring Cloud Stream Processor 应用程序,因为在函数需要在现有处理器的应用程序逻辑之前还是之后应用存在歧义。

但是,您可以创建自己的处理器应用程序,这些应用程序使用标准 java.util.Function API 进行函数组合,如下例所示

@Configuration
public static class FunctionProcessorConfiguration {

@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

然后您需要使用以下属性进行部署:spring.cloud.stream.function.definition=upperAndConcat

Kotlin 支持

另一个有趣的功能是 Spring Cloud Function 支持 Kotlin 函数的函数组合。这让我们可以将任何 Kotlin 函数 bean 添加到SourceSink 应用程序的可组合函数中。

为了查看它的工作原理,让我们使用我们示例 github存储库中的http-transformer-kotlin-processor 应用程序。

Kotlin 函数 bean 被配置为处理器。这里,Kotlin 函数 bean 是如下定义的transform 函数

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,此项目将spring-cloud-function-kotlin 作为依赖项,以应用 Kotlin 函数的函数配置支持,定义如下

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

cd function-composition/http-transformer-kotlin ./mvnw clean package

注意

对于下面的应用程序注册--uri 选项,请将工件的目录名和路径替换为您系统中适当的值。

dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar

要使用此应用程序作为Source 创建一个流

dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

与我们在http-transformer 示例中所做的一样,我们可以使用spring.cloud.stream.function.definition 属性指定任何有效的组合函数 DSL 来构建函数组合。在本例中,让我们将通过 Java 配置注册的函数 bean 与来自 Kotlin 处理器配置的函数 bean 组合起来。

dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"

这里,函数名transform 对应于 Kotlin 函数。

注意:我们可以执行 Kotlin 函数和 Java 函数之间的组合,因为 Kotlin 函数在内部被转换为java.util.Function

dataflow:>http post --data "friend" --target "https://127.0.0.1:9002"
> POST (text/plain) https://127.0.0.1:9002 friend
> 202 ACCEPTED

并且,您可以在log 应用程序中看到输出,如下所示:

[omposedKotlin-1] log-sink               : Hello How are you FRIEND

在此示例中,http-transformer 还包含函数的源代码。但是,您可以通过在单独的工件中定义函数 bean 来使应用程序更加模块化。然后,您可以通过仅向项目添加 maven 依赖项并设置spring.cloud.stream.function.definition 属性来构建应用程序。这样,您可以将大部分业务逻辑编码为函数,并且可以根据需要将其与源或接收器组合。

获取 Spring 新闻

通过 Spring 新闻保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部