案例研究:聚合函数和处理器

工程 | Artem Bilan | 2020年10月26日 | ...

本文是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用程序。在本期中,我们正在研究聚合函数及其与拆分函数的关系。我们将了解如何自定义默认行为。我们还将了解为聚合器配置共享消息存储的重要性。

以下是此博客系列的所有先前部分。

聚合函数

聚合函数是 Spring Integration 中的 [AggregatingMessageHandler](https://docs.springframework.org.cn/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的基础,继承了其大部分功能,并将常用聚合器选项作为配置属性公开。有关更多信息,请参阅 AggregatorFunctionProperties(或下一节)。聚合函数是完全响应式的,定义为 Function<Flux<Message<?>>, Flux<Message<?>>。这是因为聚合逻辑不需要它立即产生回复。相反,它将当前消息存储在消息存储中,与其他要收集或减少为某些结果的消息分组,直到满足释放结果所需的条件。这样,将传入消息作为流(Flux)并通过聚合器将其组合到输出流(也是Flux)中会感觉很自然。因此,我们只需要对聚合函数的结果进行 subscribe() 即可启动流程。实际上,当我们在 Spring Cloud Stream 应用程序中使用此类响应式函数时,框架会自动为我们执行此操作:框架会从输入目标构建消息的 Flux,并在输出目标上处理结果 Flux

用法

通常,聚合器与 拆分器 结合使用,拆分器将单个传入消息转换为多个传出消息,包括一些序列详细信息标头。在对一些单独的项目进行处理(转换、丰富等)后,我们添加一个聚合器以将这些项目组合回单个消息。提到的序列详细信息标头用作默认的相关性和释放策略,以将消息存储在组中并在何时以及如何组合和生成单个消息时做出决策。使用函数组合构建此类处理逻辑感觉很自然,我们将在后面讨论。但现在让我们假设(为简单起见)我们有一些数据要组合到单个消息中!

首先,我们需要在 Spring Boot 项目中添加聚合函数的依赖项

<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>aggregator-function</artifactId>
</dependency>

就是这样!聚合函数 bean 将自动配置到足以让我们将函数自动装配到我们的代码中并使用它。

@Autowired
Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;
...
Flux<Message<?>> input =
        Flux.just(MessageBuilder.withPayload("2")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build(),
         MessageBuilder.withPayload("1")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build());

Flux<Message<?>> output = this.aggregatorFunction.apply(input);

output.as(StepVerifier::create)
            .assertNext((message) ->
                    assertThat(message)
                            .extracting(Message::getPayload)
                            .isInstanceOf(List.class)
                            .asList()
                            .hasSize(2)
                            .contains("1", "2"))
            .thenCancel()
            .verify();

在此代码片段中,我们演示了如何将两条具有预定义序列详细信息的简单消息组合成单个 List<String>。所有繁重的工作都在 aggregatorFunction 及其默认的相关性和释放策略中完成。这也包括一个默认的组组合器选项,该选项从已释放组中的消息构建有效负载列表。

我们将在下一节中回顾更复杂的用例和配置选项。

持久性状态存储

我们在应用程序中处理和操作的数据和信息确实是应用程序最重要的部分。我们需要仔细考虑何时将数据保存在内存中而不是某些外部存储中。在大多数情况下,我们会使用某些数据库作为状态存储和/或消息传递中间件,以防止生产者和消费者之间的数据丢失。作为额外的好处,这使集群中的不同实例能够访问共享存储,以便进行流畅的分布式计算。

聚合函数不需要持久性状态存储即可工作,但在生产环境中需要持久性状态存储以避免数据丢失并确保故障转移。

配置

聚合函数的配置选项 (AggregatorFunctionProperties) 非常简单,并且完全与 [AggregatingMessageHandler](https://docs.springframework.org.cn/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的高级选项保持一致。它们是

  • correlation - 用于从传入消息确定相关键(组 ID)的 SpEL 表达式。此类表达式(如果提供)将为底层的 AggregatingMessageHandler 构建 ExpressionEvaluatingCorrelationStrategy。默认情况下(如果未提供),AggregatingMessageHandler 使用基于 IntegrationMessageHeaderAccessor.CORRELATION_IDHeaderAttributeCorrelationStrategy - 一个标头,可以由拆分器、PublishSubscribeChannel 或收件人列表路由器在上游填充。

  • release - 用于确定是否应释放存储的消息组并将其作为输出消息发出的 SpEL 表达式。此类表达式(如果提供)将为底层的 AggregatingMessageHandler 构建 ExpressionEvaluatingReleaseStrategy。默认情况下(如果未提供),AggregatingMessageHandler 使用基于存储组大小和 IntegrationMessageHeaderAccessor.SEQUENCE_SIZESimpleSequenceSizeReleaseStrategy - 一个标头,可以由拆分器、PublishSubscribeChannel 或收件人列表路由器在上游填充。

  • aggregation - 用于从已释放的消息组构建输出结果的 SpEL 表达式。此表达式(如果提供)将有助于底层 AggregatingMessageHandlerExpressionEvaluatingMessageGroupProcessor。默认情况下(如果未提供),AggregatingMessageHandler 使用 DefaultAggregatingMessageGroupProcessor,它仅将组中消息的有效负载组合到 List 中并合并其标头。

  • groupTimeout - 用于安排后台任务以在没有更多消息到达组时使其过期的 SpEL 表达式。有关此选项的更多信息,请参阅 Spring Integration

  • messageStoreType - 来自 AggregatorFunctionProperties.MessageStoreType 常量类的值,用于指示要使用哪个 MessageGroupStore 实现来存储消息,直到释放其组。支持的 MessageGroupStore 为:ConfigurableMongoDbMessageStoreRedisMessageStoreGemfireMessageStoreJdbcMessageStoreSimpleMessageStore(默认值),它将消息存储在内存中。这是最重要的选项,应根据目标环境和可用的持久性存储进行选择。当聚合函数作为集群实例部署时(例如,通过 Spring Cloud Data Flow 作为 aggregator-processor 的一部分使用时),它具有更大的价值,因此共享状态可能会在一个实例上向聚合器生成消息,但它们可能会在另一个实例上释放。这样,当应用程序崩溃时,您就不会丢失消息。MessageGroupStore 实现的依赖项打包到最终的函数 uber jar 中,并根据此选项自动配置。唯一的区别在于 JDBC,我们必须根据目标环境的要求提供合适的驱动程序。有关 MessageGroupStore 抽象的更多信息,请参阅 Spring Integration 系统管理,以及上一篇博客文章中如何 提供 JDBC 驱动程序。这些持久性存储的所有配置选项与 Spring Boot 为我们自动配置它们提供的选项完全相同。

  • messageStoreEntity - 此选项仅特定于某些 MessageGroupStore 实现:它引用 Gemfire/Geode 的客户端区域;JDBC 的表前缀;MongoDB 的集合名称。对于其余实现,它将被忽略。

有关这些组件的更多信息,请参阅 Spring IntegrationStream Applications 项目中(如果有)的相应函数实现。

因此,如果我们想运行聚合函数(作为独立函数、作为 Spring Cloud Stream 处理器 或作为 Spring Cloud Data Flow 流定义的一部分),并使用一些自定义属性和共享 MongoDB 存储,我们可以这样声明它

java -jar aggregator-processor-kafka-3.0.0-SNAPSHOT.jar --aggregator.correlation=T(Thread).currentThread().id --aggregator.release=!messages.?[payload == 'bar'].empty --aggregator.aggregation=#this.?[payload == 'foo'].![payload] --aggregator.messageStoreType=mongodb --aggregator.message-store-entity=aggregatorTest --spring.data.mongodb.uri=mongodb://127.0.0.1/test

其中这些属性的值为

  • aggregator.correlation - 作为消息分组键的消费者线程 ID;

  • aggregator.release - 用于释放消息组的 SpEL 表达式,仅当 bar 有效负载已到达时才释放;

  • aggregator.aggregation - 用于选择和投影消息组集合的 SpEL 表达式,其中仅将具有 foo 有效负载的消息组合到最终结果中;

  • aggregator.messageStoreType - 使用 MongoDB 的 MessageGroupStore 实现;

  • aggregator.message-store-entity - MongoDB 数据库中的集合名称;

  • spring.data.mongodb.uri - MongoDB 数据库连接。

即使我们在自定义 Spring Boot 应用程序中将此函数与其他函数组合,相同的配置属性设置也保持不变。有关更多信息,请参阅下一节。

组合

聚合器函数本身在生产解决方案中可能没有意义。当然,在大多数情况下,它与其他上游和下游函数结合使用。如前所述,聚合器通常与 分割器 预处理组合使用。可以使用 Java API 通过 Function.andThan()Function.compose() 以编程方式组合这些函数,但是,由于这些方法类型非常严格,我们需要执行一些中间转换以满足函数的输入和输出。使用 Spring Cloud Function 库,我们可以绕过编写各种转换的繁琐编程工作,同时保留所需的顺序详细信息消息头。我们更愿意依赖框架中的类型推断和开箱即用的转换功能,并使我们的组合尽可能简单。

假设我们有一个这样的输入 JSON

{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      {
        "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      {
        "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ]
  }
}

我们的任务是将图书名称作为单个逗号分隔的字符串提供。

我们只需要将三个开箱即用的函数组合到一个 Spring Cloud Function(或 Stream)应用程序中。这些函数的依赖项为:splitter-functionspel-function 和我们的 aggregator-function。此类应用程序的配置属性可能如下所示

spring.cloud.function.definition=splitterFunction|spelFunction|aggregatorFunction splitter.expression=#jsonPath(payload,'$.store.book') spel.function.expression=title aggregator.aggregation=T(org.springframework.util.StringUtils).collectionToCommaDelimitedString(#this.![payload])

我们可以使用类似的流定义和 Spring Cloud Data Flow 的配置。唯一的区别是函数之间发送/接收的消息将通过绑定器传输,使用预构建的处理器应用程序。您实际上可以在类似于 MongoDB 源 的内容中使用此组合。使用 Spring Cloud Data Flow 时,还需要记住的一点是,聚合器函数与类型无关,并使用具有 byte[] 负载的消息。如果您计划对负载执行一些复杂的逻辑,如上述表达式中所示,您可能需要将此函数与一个上游函数组合,以将 byte[] 负载转换为域对象或其他兼容类型,如 HashMap。如果负载是 JSON 表示形式,则始终可以使用我们上面为分割器表达式显示的 #jsonPath() SpEL 函数访问它。

有关此系列以前博客文章中 函数组合 的更多信息,请参阅。

结论

这篇博文介绍了聚合器 Function 的详细信息,以及它如何在 Spring Cloud Stream 聚合器处理器中使用。我们还了解了如何使用此函数的配置属性。然后,我们深入探讨了在独立应用程序中使用聚合器的几种变体,在此过程中探索了各种功能。最后,我们了解了如何在消息之间轻松切换聚合器中持久状态的 MessageGroupStore 实现。

敬请期待

本系列将继续。在接下来的几周里,我们将了解更多函数和应用程序。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部