案例研究:聚合器函数和处理器

工程 | Artem Bilan | 2020年10月26日 | ...

本文是博客系列文章的一部分,该系列探讨了基于 Java 函数重新设计的 Spring Cloud Stream 应用。在本篇文章中,我们将深入研究聚合器函数及其与拆分器函数的关系。我们将了解如何自定义默认行为。我们还将探讨为聚合器配置共享消息存储的重要性。

以下是本博客系列的所有先前部分。

聚合器函数

聚合器函数是 Spring Integration 中 [AggregatingMessageHandler](https://docs.springframework.org.cn/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的基础,继承了其大部分功能,并将常用的聚合器选项暴露为配置属性。有关更多信息,请参阅 AggregatorFunctionProperties(或下一节)。聚合器函数是完全响应式的,被定义为 Function<Flux<Message<?>>, Flux<Message<?>>。这是因为聚合器逻辑不需要立即生成回复。相反,它将当前消息存储在消息存储中,与其他消息分组在一起,以便收集或归约为某个结果,直到满足释放结果所需的条件。这样一来,将入站消息作为流(Flux)处理,并通过聚合器将它们组合成出站流(也是一个 Flux),感觉非常自然。因此,我们只需要对聚合器函数的结果进行 subscribe() 即可启动流程。事实上,当我们在 Spring Cloud Stream 应用中使用此类响应式函数时,这正是自动发生的:框架会为我们构建一个来自输入目标的 Flux 消息流,并在输出目标上处理生成的 Flux

用法

聚合器通常与 Splitter(拆分器) 结合使用,拆分器将单个入站消息转换为多个出站消息,包括一些序列详细信息头。经过一些单独的项目处理(转换、丰富等)后,我们添加一个聚合器将这些项目重新组合成单个消息。上述的序列详细信息头被用作默认的关联和释放策略,用于将消息存储在组中,并决定何时以及如何组合并生成单个消息。使用函数组合来构建这种处理逻辑感觉很自然,我们稍后会讨论。但现在,让我们(为简单起见)假设我们有一些想要组合成单个消息的数据!

首先,我们需要在 Spring Boot 项目中为聚合器函数添加依赖项

<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>aggregator-function</artifactId>
</dependency>

就这么简单!聚合器函数 bean 会被自动配置,以便我们可以将其自动注入到代码中使用。

@Autowired
Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;
...
Flux<Message<?>> input =
        Flux.just(MessageBuilder.withPayload("2")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build(),
         MessageBuilder.withPayload("1")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build());

Flux<Message<?>> output = this.aggregatorFunction.apply(input);

output.as(StepVerifier::create)
            .assertNext((message) ->
                    assertThat(message)
                            .extracting(Message::getPayload)
                            .isInstanceOf(List.class)
                            .asList()
                            .hasSize(2)
                            .contains("1", "2"))
            .thenCancel()
            .verify();

在这段代码片段中,我们演示了如何将两个具有预定义序列详细信息的简单消息组合成一个 List<String>。所有繁重的工作都在 aggregatorFunction 及其默认的关联和释放策略中完成。这也包括默认的组组合器选项,它从释放组中的消息构建一个 payload 列表。

更复杂的用例和配置选项将在下一节中介绍。

持久化状态存储

我们在应用中处理和操作的数据和信息确实是应用最重要的部分。关于何时将数据保存在内存中而不是某些外部存储中,我们需要三思。在大多数情况下,我们会使用某些数据库作为状态存储和/或消息中间件,以防止生产者和消费者之间的数据丢失。作为额外的好处,这使得集群中的不同实例能够访问共享存储,以实现流畅的分布式计算。

聚合器函数并非必须使用持久化状态存储才能工作,但在生产环境中,它是必要的,以避免数据丢失并确保故障转移。

配置

聚合器函数的配置选项(AggregatorFunctionProperties)非常直观,并且与 [AggregatingMessageHandler](https://docs.springframework.org.cn/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的高级选项完全一致。它们如下:

  • correlation - 一个 SpEL 表达式,用于从入站消息中确定关联键(组 ID)。如果提供此表达式,它将为底层的 AggregatingMessageHandler 构建一个 ExpressionEvaluatingCorrelationStrategy。默认情况下(未提供时),AggregatingMessageHandler 使用 HeaderAttributeCorrelationStrategy,它基于 IntegrationMessageHeaderAccessor.CORRELATION_ID - 这个头可以由上游的拆分器、PublishSubscribeChannel 或 recipient-list router 填充。

  • release - 一个 SpEL 表达式,用于确定存储的消息组是否应该被释放并作为输出消息发出。如果提供此表达式,它将为底层的 AggregatingMessageHandler 构建一个 ExpressionEvaluatingReleaseStrategy。默认情况下(未提供时),AggregatingMessageHandler 使用 SimpleSequenceSizeReleaseStrategy,它基于存储的组大小和 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE - 这个头可以由上游的拆分器、PublishSubscribeChannel 或 recipient-list router 填充。

  • aggregation - 一个 SpEL 表达式,用于从已释放的消息组构建输出结果。如果提供此表达式,它将对底层的 AggregatingMessageHandlerExpressionEvaluatingMessageGroupProcessor 起作用。默认情况下(未提供时),AggregatingMessageHandler 使用 DefaultAggregatingMessageGroupProcessor,它只是将组中消息的 payload 组合成一个 List 并合并它们的头。

  • groupTimeout - 一个 SpEL 表达式,用于安排一个后台任务,当没有更多消息到达组时使其过期。有关此选项的更多信息,请参阅 Spring Integration

  • messageStoreType - 来自 AggregatorFunctionProperties.MessageStoreType 常量类的一个值,用于指示使用哪种 MessageGroupStore 实现来存储消息,直到组被释放。支持的 MessageGroupStore 包括:ConfigurableMongoDbMessageStoreRedisMessageStoreGemfireMessageStoreJdbcMessageStore 和默认的 SimpleMessageStore,它将消息存储在内存中。这是最重要的选项,应根据目标环境和可用的持久化存储来选择。当聚合器函数作为集群实例部署时(例如,在 Spring Cloud Data Flow 中作为 aggregator-processor 的一部分使用时),它的价值更大,因为通过共享状态,你可以在一个实例上将消息发送到聚合器,但它们可以在另一个实例上被释放。这样,在应用崩溃时就不会丢失消息。MessageGroupStore 实现的依赖项被打包到最终的函数 uber jar 中,并根据此选项进行自动配置。唯一的区别在于 JDBC,我们需要根据目标环境要求提供合适的驱动程序。有关 MessageGroupStore 抽象的更多信息,请参阅 Spring Integration System Management 以及之前关于如何提供 JDBC 驱动程序的博客文章。这些持久化存储的所有配置选项与 Spring Boot 为我们提供的自动配置方式相同。

  • messageStoreEntity - 此选项仅特定于某些 MessageGroupStore 实现:它指的是 Gemfire/Geode 的客户端区域;JDBC 的表前缀;MongoDB 的集合名称。对于其他实现,此选项将被忽略。

有关这些组件的更多信息,请参阅 Spring Integration 以及 Stream Applications 项目中相应的函数实现(如果有)。

因此,如果我们要运行一个聚合器函数(作为独立应用、Spring Cloud Stream 的 processor(处理器) 或 Spring Cloud Data Flow 流定义的一部分),并使用一些自定义属性并连接共享的 MongoDB 存储,我们可以这样声明:

java -jar aggregator-processor-kafka-3.0.0-SNAPSHOT.jar --aggregator.correlation=T(Thread).currentThread().id --aggregator.release=!messages.?[payload == 'bar'].empty --aggregator.aggregation=#this.?[payload == 'foo'].![payload] --aggregator.messageStoreType=mongodb --aggregator.message-store-entity=aggregatorTest --spring.data.mongodb.uri=mongodb://localhost/test

其中这些属性的值如下:

  • aggregator.correlation - 消费者线程 ID,用作消息分组的键;

  • aggregator.release - 一个针对消息组的 SpEL 表达式,仅当 bar payload 到达时才释放该组;

  • aggregator.aggregation - 一个 SpEL 表达式,用于选择和投影消息组集合,其中只有带有 foo payload 的消息被组合到最终结果中;

  • aggregator.messageStoreType - 使用 MongoDb MessageGroupStore 实现;

  • aggregator.message-store-entity - MongoDb 数据库中的集合名称;

  • spring.data.mongodb.uri - MongoDb 数据库连接。

即使我们在自定义 Spring Boot 应用中将此函数与其他函数组合,相同的配置属性集也保持不变。有关更多信息,请参阅下一节。

组合

聚合器函数本身在生产解决方案中可能没有意义。当然,在大多数情况下,它会与其他上游和下游函数结合使用。如前所述,通常会将聚合器与 splitter(拆分器) 预处理组合。可以通过 Java API 使用 Function.andThen()Function.compose() 以编程方式组合这些函数,但是,由于这些方法对类型非常严格,我们需要进行一些中间转换以满足函数的输入和输出。有了 Spring Cloud Function 库的支持,我们可以绕过编写各种转换的繁重编程工作,同时保留所需的序列详细信息消息头。我们宁愿依赖框架中的类型推断和开箱即用的转换能力,并尽可能保持组合的简单性。

假设我们有以下输入 JSON:

{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      {
        "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      {
        "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ]
  }
}

我们的任务是将书名以逗号分隔的单个字符串形式提供。

我们只需要将三个开箱即用的函数组合成一个 Spring Cloud Function(或 Stream)应用。这些函数的依赖项是:splitter-functionspel-function 和我们的 aggregator-function。此类应用的配置属性可能如下所示:

spring.cloud.function.definition=splitterFunction|spelFunction|aggregatorFunction splitter.expression=#jsonPath(payload,'$.store.book') spel.function.expression=title aggregator.aggregation=T(org.springframework.util.StringUtils).collectionToCommaDelimitedString(#this.![payload])

我们可以使用类似的流定义和 Spring Cloud Data Flow 配置。唯一的区别是,函数之间的消息将通过 binder 传输,使用预构建的处理器应用。实际上,你可以在诸如 Mongo DB source(MongoDB 源) 之类的场景中使用这种组合。使用 Spring Cloud Data Flow 时还需要记住一点,聚合器函数是类型无关的,并接收带有 byte[] payload 的消息。如果你计划对 payload 执行一些复杂的逻辑,如上述表达式所示,你可能需要将其与上游的某个函数组合,将 byte[] payload 转换为域对象或其他兼容类型,如 HashMap。如果 payload 是 JSON 表示形式,则始终可以使用我们上面为拆分器表达式展示的 #jsonPath() SpEL 函数来访问它。

有关此系列先前博客文章中关于函数组合的更多信息,请参阅其中一篇。

结论

本文详细介绍了聚合器 Function 以及它如何在 Spring Cloud Stream Aggregator Processor 中使用。我们还探讨了如何使用此函数的配置属性。然后,我们深入研究了在独立应用中使用聚合器的几种变体,并在此过程中探索了各种功能。最后,我们了解了如何在聚合器中轻松切换 MessageGroupStore 实现,以实现消息间的持久化状态。

敬请关注

本系列将继续。在接下来的几周内,我们将探讨更多的函数和应用。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

提升自我

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一次简单订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部