揭秘使用 Apache Kafka 分区的 Spring Cloud Stream 生产者

工程 | Soby Chacko | 2021 年 2 月 3 日 | ...

在本篇博文中,我们将深入探讨如何编写使用 Apache Kafka 的 Spring Cloud Stream 生产者,以及它如何处理 Kafka 中的原生分区。

Spring Cloud Stream 具有一个与中间件无关的分区概念。在可能的情况下,Spring Cloud Stream 会利用中间件的原生分区功能(如果中间件具有此类功能,例如 Apache Kafka)。本博文探讨了 Spring Cloud Stream 开发人员在编写发布数据到 Kafka 的生产者应用程序时如何处理分区。在后续文章中,我们将探讨消费者如何在基于 Kafka 的 Spring Cloud Stream 应用程序中处理分区。

分区是 Apache Kafka 中扩展和并行的基本单元。使用正确的分区策略可以让您的应用程序以最小的延迟处理海量数据。Kafka 生产者可以并行写入不同的分区,这通常意味着它可以实现更高的吞吐量。虽然分区具有这些明显的优势,但还需要仔细考虑其他各种因素。在分区本身内,吞吐量可能会受到诸如批处理大小、使用的压缩算法、确认类型、复制因子等因素的进一步限制。此外,分区越多,打开的文件句柄就越多(因为分区映射到代理上的目录,并且每个分区内的日志段都需要一个索引文件和一个数据文件)。网上有很多关于如何为 Kafka 应用程序确定正确分区数量的资源,在部署基于 Kafka 的企业生产者应用程序之前,您可能需要熟悉这些资源。

Spring Cloud Stream Kafka 绑定器

Spring Cloud Stream Kafka 绑定器有一个主题供应器,它处理各种应用程序级别的主题需求。除其他事项外,创建和修改分区数量是供应器能够执行的操作。供应器本身不会执行这些操作,而是从 Kafka 集群调用正确的管理员 API。

通常,在编写 Spring Cloud Stream Kafka 应用程序时,可能会出现两种与主题创建相关的场景。大多数企业会锁定对 Kafka 集群的访问,并且只有管理员才能进行诸如创建主题、添加分区等操作更改。在这种情况下,应用程序无法直接创建或修改主题。另一种情况是,企业在授予对 Kafka 集群的访问权限方面非常宽松,应用程序可以自由创建或修改主题。让我们进一步考虑其中的一些内容。

场景 1:应用程序对 Kafka 集群具有完全的管理员权限

在这种情况下,应用程序对 Kafka 集群具有完全的管理员访问权限。您正在编写一个 Spring Cloud Stream 生产者,它将消息发布到 Kafka 主题。为了便于讨论,我们假设此主题不存在,并且您的应用程序将创建它。您还希望确保该主题已配置为具有一定数量的分区。

有几种方法可以告诉 Spring Cloud Stream 您希望主题配置多少个分区。每个方法都有其优缺点。让我们来看看它们。

  • 使用绑定器范围的属性指定分区数。使用此方法,您创建的任何主题都将具有相同的分区数。如果您的应用程序创建了多个主题,并且它们都希望具有相同数量的分区,那么这是创建分区的理想方法。这种方法的缺点是,除非覆盖,否则每个绑定都不能配置。您在绑定器级别使用的属性如下。

spring.cloud.stream.kafka.binder.min-partition-count

  • 另一种选择是在绑定级别指定分区数。使用这种方法,您可以在同一个应用程序中配置多个主题,并为它们配置不同的分区数。以下是属性

spring.cloud.stream.bindings.<binding-name>.producer.partition-count

鉴于前面的全局属性强制执行最小值(它可能更大),这两个值中较大的一个将对特定绑定生效。

  • 如果未使用上述任何选项,则主题将使用基于代理 num.partitions 属性(默认值:1)的分区数创建。

场景 2:Kafka 集群被锁定,并且不允许应用程序执行任何管理员操作。

在这种情况下,您作为应用程序开发人员的选择非常有限。由于 Kafka 集群被锁定,应用程序将无法创建或更改现有主题。如果主题事先未创建,则您的应用程序将在启动期间抛出异常并失败。为了避免这种情况,您必须确保主题已创建并具有正确数量的分区,并使用绑定器属性禁用自动主题供应(将 spring.cloud.stream.kafka.binder.auto-create-topics 设置为 false)。

场景 3:应用程序对 Kafka 集群具有完全的管理员权限,并且主题已存在,但您希望在下一次应用程序启动时增加分区。

这是可能的。假设您的主题已配置为具有 64 个分区,现在由于某些更高的容量需求,您希望将其加倍到 128。您可以使用场景 1 中讨论的任何属性(spring.cloud.stream.kafka.binder.min-partition-countspring.cloud.stream.bindings.<binding-name>.producer.partition-count)让绑定器知道这一点。

在这种情况下,绑定器检测到主题已存在。如果主题当前的分区大小小于请求的大小,则绑定器会检查属性 spring.cloud.stream.kafka.binder.autoAddPartitions。默认情况下,此属性设置为 false。因此,如果应用程序需要增加分区,则必须将其显式设置为 true。如果将其设置为 true,则供应器将请求 Kafka 管理员 API 增加分区数。如果未将其设置为 true,并且新请求的分区数大于现有分区数,则在生产者的情况下,绑定器将抱怨它无法容忍代理上的较低分区数,并抛出供应异常。如果发生这种情况,您必须手动增加分区或将 autoAddPartitions 属性设置为 true

这里需要注意的一点是,绑定器不允许您通过 Spring Cloud Stream 减少 Kafka 主题分区的数量。

请记住,增加或减少分区(使用任何机制)可能会破坏分区内的严格排序(如果这是需要考虑的因素),具体取决于您的分区策略(请参见下文)。

选择分区

现在我们已经了解了主题是如何分区的,我们需要讨论如何为特定记录选择分区。

有三种机制可以选择分区

原生 Kafka 分区选择

要使用原生分区,请在绑定器级别配置自定义分区器,使用 spring.cloud.stream.kafka.binder.producer-properties.partitioner.class 属性,或在绑定级别使用 spring.cloud.stream.kafka.bindings.<binding>.producer.configuration.partitioner.class 属性。

直接设置分区头

当使用默认的 Kafka 分区器时,应用程序可以直接将 KafkaHeaders.PARTITION_ID 头设置为所需的分区。

Spring Cloud Stream 分区选择

当使用 Spring Cloud Stream 分区时,让 Kafka 分区器使用其默认分区器,它只会简单地使用绑定器在生产者记录中设置的分区。在以下部分,我们将看到 Spring Cloud Stream 提供的此支持的详细信息。

Spring Cloud Stream 生产者如何确定分配哪个分区?

生产者如何使用 Spring Cloud Stream 将记录分配到正确的分区?在 Spring Cloud Stream 中有哪些可用于执行此操作的控件?这篇博文的其余部分将重点介绍这些问题。

确定分区键

Spring Cloud Stream 提供了两种机制供应用程序确定分区键。

1. 分区键表达式

一种简单的方法是将分区键作为 SpEL 表达式属性提供。这是一个示例。

spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression: headers['partitionKey']

然后,您的应用程序在发布消息时,可以添加一个名为 partitonKey 的头。Spring Cloud Stream 将在评估上述表达式以分配分区键时使用此头的值。这是一个示例生产者代码

@Bean
public Supplier<Message<?>> generate() {
  return () -> {
     String value = “random payload”;
    	return MessageBuilder.withPayload(value)
           .setHeader("partitionKey", value.length() % 4)
           .build();
  };
}
2. 分区键提取器策略

Spring Cloud Stream 提供了一个名为 PartitionKeyExtractorStrategy 的 API,它有一个需要实现的单一方法 - Object extractKey(Message<?> message)

您可以实现此接口并将其配置为 bean。然后提供一个属性 spring.cloud.stream.bindings.<binding-name>.producer.parition-key-extractor-name

然后提供 bean 名称。

如果您只有一个这样的 bean,那么您可以忽略将其作为属性提供。Spring Cloud Stream 将简单地选择此 bean 作为分区提取器策略。

使用分区键提取器策略设置键是默认机制。如果未给出提取器策略,Spring Cloud Stream 将只查找分区键表达式。

请记住,我们在这里讨论的分区键可能与记录最终落到的最终分区不同。为此,我们需要使用一个正在使用此键的分区选择器。

选择实际分区

我们选择了一个分区键,现在它如何选择 Kafka 主题上的实际分区?

好的,现在我们让 Spring Cloud Stream 决定使用哪个分区键。但是,如何根据此键实际选择分区呢?与分区键选择选项类似,Spring Cloud Stream 提供了两种不同的机制来使用给定键选择分区。

1. 使用分区选择器策略

再次强调,这是一个函数式接口,只有一个方法 - int selectPartition(Object key, int partitionCount)

您可以实现此方法并将其作为 bean 提供。如果您只有一个这样的 bean,则不需要任何其他属性。如果有多个,则可以使用属性 spring.cloud.stream.bindings.<binding-name>.producer.parition-selector-name 为每个绑定定义它。

2. 分区选择器表达式

如果您不想实现分区选择器策略,您还可以提供一个针对键进行评估的 SpEL 表达式。

如果没有提供这些选项中的任何一个,则 Spring Cloud Stream 将使用默认的分区选择器策略,该策略基于获取键的哈希码,然后对主题上的总分区数执行模运算。除非您有复杂的需求,否则此默认策略在大多数情况下都能正常工作。

绑定器为什么提供两种不同的抽象?

您可能想知道为什么我们有这两种不同的抽象。首先是分区键,然后是分区选择器。分区键可以是任何东西 - 例如,它可以是整数、字符串(可能是任意长度的文本)或其他某种类型。分区选择器将根据分区键表达式选择一个键。选择器还确保所选分区绑定在可用分区数量内。默认实现通过对分区键的哈希码和总分区数进行模除来实现。因此,当您有如下所示的分区用例时,必须在生产者上指定 partittionCount 属性。总而言之,PartitionKey 是 PartitionSelector 用于选择实际分区的数据片段。

让我们来看一个具体的例子。假设您正在编写一个处理信用卡交易的应用程序。此应用程序使用信用卡号作为分区键 - 一个包含 x 位数字的长随机数。假设根据信用卡的前 4 位数字,您希望将该交易发送到主题中的特定分区。您该怎么做呢?首先,您通过解析卡号以提取前 4 位数字来设置您的 partitionKeyExpression(或提供分区键提取器策略)。然后,您需要提供一个分区选择器策略实现,其中,根据键和分区数,您选择键。如果您不提供此策略或针对键的分区键选择器表达式,则默认分区选择器策略将为您选择一个。假设您的前 4 位数字是 1234,并且主题上有 10 个分区。假设散列也计算为 1234。那么,这将落在分区 1234 % 10 = 4 中。如果您出于某种原因希望此交易进入分区 8,则必须在分区选择器策略类或表达式中显式实现这一点。

以下是这两个不同层如何组合在一起的流程图表示。

kafka producer partitions blog

分区键和消息键之间的混淆

有时,思考分区键和实际的消息键(通过实际的 Kafka 主题发送到 Kafka 记录中用作键)会让人感到困惑。这是通过不同的机制完成的。上述分区键和选择器只是确保选择了一个分区键,并根据该分区键,在 Kafka 主题上选择一个实际的分区。但是,在生产时,如何将键与记录一起发送?在这里,您也可以从两个选项中选择。一种是简单地将标头附加到传出消息中。这是一个示例。

@Bean Supplier<Message<String>> process() {
   return () -> MessageBuilder.withPayload("foo")
     .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()) .build(); }

您也可以在 Kafka 绑定器上使用消息键 SpEL 表达式,如下所示。

spring.cloud.stream.kafka.binder.messageKeyExpression: headers['messageKey']

然后在传出消息上附加此标头。

需要牢记的一些注意事项

如果您不提供分区键表达式或分区键提取器 Bean,则 Spring Cloud Stream 将完全退出为您做出任何分区决策的业务。在这种情况下,如果主题有多个分区,则将触发 Kafka 的默认分区机制。默认情况下,Kafka 使用 DefaultPartitioner,如果消息具有键(请参见上文),则使用此键的散列来计算分区。如果消息没有键,则将使用轮询策略分配。从 Kafka 客户端 2.4 开始,有一些额外的复杂性需要牢记。如果记录不包含分区信息(本博文的主题讨论)或如果记录缺少键,则从 Kafka 2.4 开始,它将使用粘性分区而不是轮询策略。简而言之,粘性分区用于通过将记录粘贴到分区或一组分区来最大程度地减少延迟。有关粘性分区的更多信息,请参阅 KIP-480 https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner

结论

在本博文中,我们讨论了 Spring Cloud Stream 在编写基于生产者的应用程序时如何帮助处理 Kafka 分区。我们看到了 Spring Cloud Stream 提供给应用程序开发人员配置分区各种细微差别的多种方法。我们了解了分区键、分区选择器和消息键之间的区别。我们讨论了如何将消息键添加到 Kafka 记录中。最后,我们了解了 Spring Cloud Stream 生产者如何完全退出分区业务并让 Kafka 直接处理它。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部