领先一步
VMware 提供培训和认证,助您加速进步。
了解更多本文是探索基于 Java Functions 的全新 Spring Cloud Stream 应用程序的博客系列的一部分。在本集中,我们将研究 Aggregator 函数及其与 Splitter 函数的关系。我们将看到如何自定义默认行为。我们还将探讨配置共享消息存储对聚合器的重要性。
以下是本博客系列的所有先前部分。
Aggregator 函数是 Spring Integration 中 [AggregatingMessageHandler](https://docs.springframework.org.cn/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的基础,继承了其大部分功能,并将常用的聚合器选项作为配置属性公开。有关更多信息,请参阅 AggregatorFunctionProperties(或下一节)。聚合器函数是完全响应式的,定义为 Function<Flux<Message<?>>, Flux<Message<?>>。这是因为聚合器逻辑不需要它立即产生回复。相反,它将当前消息存储在消息存储中,与其他消息分组以收集或归约到某个结果,直到满足释放结果的条件为止。这样,将入站消息视为流(Flux)并通过聚合器将其合并为输出流(也是 Flux)是很自然的。因此,我们只需要 subscribe() Aggregator 函数的结果即可启动流。事实上,当我们在 Spring Cloud Stream 应用程序中使用这种响应式函数时,这正是自动发生的:框架为我们构建了来自输入目标的邮件流,并在输出目标上处理生成的邮件流。
通常,聚合器与 Splitter 结合使用,Splitter 将单个入站消息转换为多个出站消息,包括一些序列详细信息头。在对各个项目进行一些处理(转换、丰富等)之后,我们添加一个聚合器以将这些项目重新组合成一条消息。上述序列详细信息头用作默认的相关性和释放策略,以将消息分组存储并决定何时以及如何组合和生成单个消息。通过函数组合来构建这种处理逻辑是很自然的,我们稍后将讨论这一点。但现在,让我们(为简单起见)想象一下,我们有一些数据想要合并成一条消息!
首先,我们需要在我们的 Spring Boot 项目中添加聚合器函数的依赖项
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>aggregator-function</artifactId>
</dependency>
就是这样!聚合器函数 bean 将被自动配置,足以让我们将函数自动装入代码并使用它
@Autowired
Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;
...
Flux<Message<?>> input =
Flux.just(MessageBuilder.withPayload("2")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build(),
MessageBuilder.withPayload("1")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build());
Flux<Message<?>> output = this.aggregatorFunction.apply(input);
output.as(StepVerifier::create)
.assertNext((message) ->
assertThat(message)
.extracting(Message::getPayload)
.isInstanceOf(List.class)
.asList()
.hasSize(2)
.contains("1", "2"))
.thenCancel()
.verify();
此代码片段演示了两个具有预定义序列详细信息的简单消息如何合并为单个 List<String>。所有繁重的工作都在 aggregatorFunction 及其默认的相关性和释放策略中完成。这还包括默认的组组合器选项,它从已释放组中的消息构建一个有效负载列表。
我们将在下一节中回顾更复杂的用例和配置选项。
我们在应用程序中处理和操作的数据和信息是应用程序最重要的部分。我们需要仔细考虑何时将数据保留在内存中,而不是保留在外部存储中。在大多数情况下,我们将使用某个数据库作为状态存储和/或消息中间件,以防止生产者和消费者之间丢失数据。作为额外的好处,这使得集群中的不同实例可以访问共享存储,以实现顺畅的分布式计算。
聚合器函数正常工作不需要持久状态存储,但在生产环境中是必需的,以避免数据丢失并确保故障转移。
聚合器函数的配置选项(AggregatorFunctionProperties)非常直接,并且与 [AggregatingMessageHandler](https://docs.springframework.org.cn/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的高级选项完全一致。它们如下:
correlation - 用于从入站消息确定相关键(组 ID)的 SpEL 表达式。Such an expression (if provided) builds an ExpressionEvaluatingCorrelationStrategy for the underlying AggregatingMessageHandler. By default (when not provided), the AggregatingMessageHandler uses a HeaderAttributeCorrelationStrategy which is based on the IntegrationMessageHeaderAccessor.CORRELATION_ID - a header which can be populated upstream by the splitter, or PublishSubscribeChannel or recipient-list router.
release - 用于确定是否应释放已存储的消息组并将其作为输出消息发出的 SpEL 表达式。Such an expression (if provided) builds an ExpressionEvaluatingReleaseStrategy for the underlying AggregatingMessageHandler. By default (when not provided), the AggregatingMessageHandler uses a SimpleSequenceSizeReleaseStrategy which is based on the stored group size and an IntegrationMessageHeaderAccessor.SEQUENCE_SIZE - a header which can be populated upstream by the splitter, or PublishSubscribeChannel or recipient-list router.
aggregation - 用于从已释放的消息组构建输出结果的 SpEL 表达式。This expression (if provided) contributes to the ExpressionEvaluatingMessageGroupProcessor for the underlying AggregatingMessageHandler. By default (when not provided), the AggregatingMessageHandler uses a DefaultAggregatingMessageGroupProcessor which just combines payloads of messages in group into the List and merge their headers.
groupTimeout - 用于安排后台任务以在没有更多消息到达组时使组过期的 SpEL 表达式。有关此选项的更多信息,请参阅 Spring Integration。
messageStoreType - 来自 AggregatorFunctionProperties.MessageStoreType 常量类的值,用于指示使用哪个 MessageGroupStore 实现来存储消息,直到组被释放。支持的 MessageGroupStore 包括:ConfigurableMongoDbMessageStore、RedisMessageStore、GemfireMessageStore、JdbcMessageStore 和 SimpleMessageStore(默认值,将消息存储在内存中)。这是最重要的选项,应根据目标环境和可用持久性存储进行选择。当聚合器函数作为集群实例部署时(例如,通过 Spring Cloud Data Flow 作为 aggregator-processor 的一部分使用),共享状态时,您可以在一个实例上将消息发送到聚合器,但可以在另一个实例上释放它们。这样,应用程序崩溃时就不会丢失消息。MessageGroupStore 实现的依赖项打包在最终的函数 uber jar 中,并根据这些选项进行自动配置。唯一不同的是 JDBC,我们必须根据目标环境的要求提供适当的驱动程序。有关 MessageGroupStore 抽象的更多信息,请参阅 Spring Integration System Management 和上一篇博客中如何 提供 JDBC 驱动程序。所有这些持久性存储的配置选项与 Spring Boot 为我们自动配置它们所提供的选项相同。
messageStoreEntity - 此选项仅适用于某些 MessageGroupStore 实现:它指的是 Gemfire/Geode 的客户端区域;JDBC 的表前缀;MongoDB 的集合名称。对于其他实现,它将被忽略。
有关这些组件的更多信息,请参阅 Spring Integration 和 Stream Applications 项目中的相应函数实现(如果有)。
因此,如果我们想运行一个聚合器函数(作为独立函数、Spring Cloud Stream 处理器 或作为 Spring Cloud Data Flow 流定义的一部分),并带有某些自定义属性,并且针对共享的 MongoDB 存储,我们可以这样声明:
java -jar aggregator-processor-kafka-3.0.0-SNAPSHOT.jar --aggregator.correlation=T(Thread).currentThread().id --aggregator.release=!messages.?[payload == 'bar'].empty --aggregator.aggregation=#this.?[payload == 'foo'].![payload] --aggregator.messageStoreType=mongodb --aggregator.message-store-entity=aggregatorTest --spring.data.mongodb.uri=mongodb:///test
其中这些属性的值是:
aggregator.correlation - 消费者线程 ID 作为消息分组的键;
aggregator.release - 一个针对消息组的 SpEL 表达式,只有当 bar 有效负载到达时才释放它;
aggregator.aggregation - 一个 SpEL 表达式,用于选择和投影消息组集合,其中只有有效负载为 foo 的消息才会被合并到最终结果中;
aggregator.messageStoreType - 使用 MongoDb MessageGroupStore 实现;
aggregator.message-store-entity - MongoDb 数据库中的集合名称;
spring.data.mongodb.uri - MongoDb 数据库连接。
即使我们将此函数与其他函数组合到自定义 Spring Boot 应用程序中,相同的配置属性集也保持不变。有关更多信息,请参阅下一节。
在生产解决方案中,Aggregator 函数本身可能没有意义。当然,在大多数情况下,它与其他上游和下游函数结合使用。如前所述,通常将聚合器与 splitter 进行预处理组合。可以使用 Java API 通过 Function.andThan() 和 Function.compose() 以编程方式组合这些函数,但由于这些方法对类型非常严格,我们需要进行一些中间转换才能满足函数的输入和输出。借助 Spring Cloud Function 库,我们可以绕过编写各种转换的繁重编程工作,同时保留所需的序列详细信息消息头。我们宁愿依赖框架中的类型推断和开箱即用的转换功能,使我们的组合尽可能简单。
假设我们有一个如下所示的输入 JSON:
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
]
}
}
我们的任务是将书名提供为单个逗号分隔的字符串。
我们只需要将三个开箱即用的函数组合成一个 Spring Cloud Function(或 Stream)应用程序。这些函数的依赖项是:splitter-function、spel-function 和我们的 aggregator-function。此类应用程序的配置属性可能如下所示:
spring.cloud.function.definition=splitterFunction|spelFunction|aggregatorFunction splitter.expression=#jsonPath(payload,'$.store.book') spel.function.expression=title aggregator.aggregation=T(org.springframework.util.StringUtils).collectionToCommaDelimitedString(#this.![payload])
我们可以在 Spring Cloud Data Flow 中使用类似的流定义和配置。唯一的区别是函数之间的消息将在绑定器上传输,使用预先构建的处理器应用程序。您实际上可以在 Mongo DB source 之类的内容中使用这种组合。在使用 Spring Cloud Data Flow 时需要注意的另一件事是,Aggregator 函数是类型无关的,并且使用具有 byte[] 有效负载的消息。如果您打算对有效负载执行一些复杂的逻辑,如上面的表达式所示,您可能需要将此函数与一个上游函数组合,将 byte[] 有效负载转换为域对象或其他兼容类型,如 HashMap。如果有效负载是 JSON 表示形式,则始终可以使用我们在上面为 splitter 表达式显示的 #jsonPath() SpEL 函数进行访问。
有关 函数组合 的更多信息,请参阅本系列之前的博客文章之一。
这篇博客详细介绍了 Aggregator Function 的细节及其在 Spring Cloud Stream Aggregator Processor 中的使用。我们还研究了如何使用此函数的配置属性。然后,我们深入探讨了在独立应用程序中使用聚合器的几种变体,并在此过程中探索了各种功能。最后,我们看到了如何轻松地在消息之间切换持久状态聚合器中的 MessageGroupStore 实现。
本系列将继续。在接下来的几周内,我们将介绍更多函数和应用程序。