揭秘使用 Apache Kafka 分区的 Spring Cloud Stream 生产者

技术 | Soby Chacko | February 03, 2021 | ...

在这篇博客中,我们将深入探讨如何编写使用 Apache Kafka 的 Spring Cloud Stream 生产者,以及它如何处理 Kafka 中的原生分区。

Spring Cloud Stream 有一个与中间件无关的分区概念。在可能的情况下,如果中间件具有原生分区能力(例如 Apache Kafka),Spring Cloud Stream 会利用这种能力。这篇博客着眼于 Spring Cloud Stream 开发者在编写发布数据到 Kafka 的生产者应用时如何处理分区。在后续文章中,我们将探讨消费者在基于 Kafka 的 Spring Cloud Stream 应用中如何处理分区。

分区是 Apache Kafka 中扩展和并行化的基本单位。使用正确的分区策略可以让你的应用以最小的延迟处理海量数据。Kafka 生产者可以并行写入不同的分区,这通常意味着它可以实现更高的吞吐量。虽然分区具有这些显而易见的优点,但还需要仔细考虑其他各种因素。在分区内部,吞吐量可能受到批量大小、使用的压缩算法、确认类型、复制因子等因素的进一步限制。此外,拥有更多分区意味着更多的打开文件句柄(因为分区映射到 broker 上的一个目录,并且分区内的每个日志段都需要一个索引文件和一个数据文件)。网上有很多关于如何为 Kafka 应用确定合适分区数量的资源,你在部署基于 Kafka 的企业级生产者应用之前可能需要熟悉它们。

Spring Cloud Stream Kafka Binder 的 Provisioner

Spring Cloud Stream Kafka binder 有一个主题 Provisioner,用于处理各种应用级主题需求。Provisioner 的能力之一是创建和修改分区数量。Provisioner 本身不执行这些操作,而是调用 Kafka 集群中正确的管理 API。

在编写 Spring Cloud Stream Kafka 应用时,通常会遇到两种处理主题创建的场景。大多数企业会限制对 Kafka 集群的访问,只有管理员才能进行创建主题、添加分区等操作性更改。在这种场景下,应用无法直接创建或修改主题。另一种场景是,企业对 Kafka 集群的访问权限比较宽松,应用可以自由创建或修改主题。让我们进一步考虑其中的一些事项。

场景 1:应用在 Kafka 集群上拥有完整的管理权限

在这种场景下,应用拥有对 Kafka 集群的完整管理访问权限。你正在编写一个 Spring Cloud Stream 生产者,将消息发布到 Kafka 主题。为了讨论方便,我们假设这个主题不存在,并且你的应用会创建它。你还希望确保主题配置了特定数量的分区。

有几种方式可以告诉 Spring Cloud Stream,你希望主题配置多少个分区。每种方式都有优缺点。让我们来看看它们。

  • 使用 binder 范围的属性指定分区计数。通过这种方式,你创建的任何主题都将具有相同的分区计数。如果你的应用创建多个主题,并且它们都希望具有相同数量的分区,这是一种创建分区的理想方式。这种方法的缺点是,除非覆盖,否则不能针对每个绑定进行配置。你在 binder 级别使用的属性如下。

spring.cloud.stream.kafka.binder.min-partition-count

  • 另一种选择是在绑定级别指定分区计数。使用这种方法,你可以在同一个应用中为多个主题配置不同的分区计数。属性如下

spring.cloud.stream.bindings.<binding-name>.producer.partition-count

考虑到之前的全局属性强制执行最小值(可能更大),两者中较大的一个将对特定绑定生效。

  • 如果以上两种选项都没有使用,则主题将根据 broker 的 num.partitions 属性(默认值为 1)创建相应数量的分区。

场景 2:Kafka 集群被锁定,应用不允许执行任何管理操作。

在这种场景下,作为应用开发者的选择非常有限。由于 Kafka 集群被锁定,应用将无法创建或更改现有主题。如果主题没有预先创建,你的应用将在启动时抛出异常并失败。为了避免这种情况,你必须确保主题以正确数量的分区创建,并使用 binder 属性禁用自动主题供应(将 spring.cloud.stream.kafka.binder.auto-create-topics 设置为 false)。

场景 3:应用在 Kafka 集群上拥有完整的管理权限,主题已存在,但你希望在下次应用启动时增加分区数量。

这是可能的。假设你的主题已经配置了 64 个分区,现在由于更高的容量需求,你希望将其翻倍到 128 个。你可以通过使用场景 1 中讨论的任一属性来告知 binder。(spring.cloud.stream.kafka.binder.min-partition-countspring.cloud.stream.bindings.<binding-name>.producer.partition-count

在这种情况下,binder 会检测到主题已经存在。如果主题的当前分区数量少于请求的数量,则 binder 会检查属性 spring.cloud.stream.kafka.binder.autoAddPartitions。默认情况下,此属性设置为 false。因此,如果应用需要增加分区,必须将其显式设置为 true。如果设置为 true,provisioner 将请求 Kafka 管理 API 增加分区数量。如果未设置为 true,并且新请求的分区数量高于现有分区数量,则对于生产者而言,binder 将会抱怨无法容忍 broker 上较少的分区数量,并抛出供应异常。如果发生这种情况,你必须手动增加分区,或者将 autoAddPartitions 属性设置为 true

特别需要注意的一点是,binder 不允许你通过 Spring Cloud Stream 减少 Kafka 主题的分区数量。

请记住,增加或减少分区(使用任何机制)可能会破坏分区内的严格顺序(如果这是需要考虑的因素),具体取决于你的分区策略(见下文)。

选择分区

现在我们了解了主题是如何分区的,接下来需要讨论如何为特定记录选择分区。

有三种机制来选择分区

原生 Kafka 分区选择

要使用原生分区,可以在 binder 级别使用属性 spring.cloud.stream.kafka.binder.producer-properties.partitioner.class 或在 binding 级别使用属性 spring.cloud.stream.kafka.bindings.<binding>.producer.configuration.partitioner.class 配置自定义 Partitioner。

直接设置分区头

使用默认的 Kafka Partitioner 时,应用可以直接将 KafkaHeaders.PARTITION_ID 头设置为所需的分区。

Spring Cloud Stream 分区选择

使用 Spring Cloud Stream 分区时,让 Kafka partitioner 使用其默认分区器,它将简单地使用 binder 在生产者记录中设置的分区。在后续章节中,我们将看到 Spring Cloud Stream 提供的此支持的详细信息。

Spring Cloud Stream 生产者如何确定分配哪个分区?

生产者如何使用 Spring Cloud Stream 将记录分配到正确的分区?Spring Cloud Stream 中有哪些可用于执行此操作的控件?本博客的其余部分将重点讨论这些问题。

决定分区键

Spring Cloud Stream 提供了两种机制,供应用决定分区键。

1. 分区键表达式

一个简单的方法是将分区键作为 SpEL 表达式属性提供。示例如下。

spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression: headers['partitionKey']

然后你的应用在发布消息时,可以添加一个名为 partitonKey 的头。Spring Cloud Stream 在评估上述表达式时将使用此头的值来分配分区键。这里是一个生产者代码示例

@Bean
public Supplier<Message<?>> generate() {
  return () -> {
     String value = “random payload”;
    	return MessageBuilder.withPayload(value)
           .setHeader("partitionKey", value.length() % 4)
           .build();
  };
}
2. 分区键提取策略

Spring Cloud Stream 提供了一个名为 PartitionKeyExtractorStrategy 的 API,它有一个需要实现的单一方法 - Object extractKey(Message<?> message)

你可以实现此接口并将其配置为 bean。然后提供属性 spring.cloud.stream.bindings.<binding-name>.producer.parition-key-extractor-name

然后提供 bean 名称。

如果你只有一个这样的 bean,则可以忽略将其作为属性提供。Spring Cloud Stream 将简单地选择此 bean 作为分区提取策略。

使用分区键提取策略设置键是默认机制。只有在未提供提取策略时,Spring Cloud Stream 才会查找分区键表达式。

请注意,我们这里讨论的分区键可能与记录最终落地的分区不一定相同。为此,我们需要使用一个利用此键的分区选择器。

选择实际分区

我们已经选择了分区键,现在它如何选择 Kafka 主题上的实际分区?

好的,现在我们让 Spring Cloud Stream 决定使用哪个分区键。但是,如何根据此键实际选择分区呢?与分区键选择选项类似,Spring Cloud Stream 提供了两种不同的机制来选择给定键的分区。

1. 使用分区选择策略`

同样,这是一个函数式接口,只有一个方法需要实现 - int selectPartition(Object key, int partitionCount)

你可以实现此方法并将其作为 bean 提供。如果你只有一个这样的 bean,则无需任何额外的属性。如果有多个,则可以使用属性 spring.cloud.stream.bindings.<binding-name>.producer.parition-selector-name 按绑定进行定义。

2. 分区选择表达式

如果你不想实现分区选择策略,也可以提供一个针对键进行评估的 SpEL 表达式。

如果以上两种选项都没有提供,则 Spring Cloud Stream 将使用默认的分区选择策略,该策略基于获取键的哈希码,然后与主题的总分区计数进行取模运算。除非你有复杂的需求,否则此默认策略在大多数情况下都有效。

为什么 binder 提供了两种不同的抽象?

你可能想知道为什么我们有两种不同的抽象。首先是分区键,然后是分区选择器。分区键可以是任何东西——例如,它可以是一个整数、一个字符串(可能是一个任意长度的文本)或其他类型。分区选择器将基于分区键表达式选择一个键。选择器还会确保选定的分区位于可用分区数量之内。默认实现通过对分区键的哈希码与主题总分区数量进行取模运算来实现这一点。因此,当你遇到这样的分区用例时,必须在生产者上指定 partittionCount 属性。总而言之,PartitionKey 是 PartitionSelector 用于选择实际分区的一块数据。

让我们举一个具体的例子。假设你正在编写一个处理信用卡交易的应用。该应用使用信用卡号作为分区键——一个包含 x 位数字的长随机数。想象一下,你想根据信用卡的前 4 位数字将该交易发送到主题中的特定分区。你如何实现呢?首先,你通过解析卡号提取前 4 位数字(或提供一个分区键提取策略)来设置你的 partitionKeyExpression。然后,你需要提供一个分区选择策略实现,其中根据键和分区数量来选择键。如果你不提供此实现或针对键的分区键选择表达式,则默认的分区选择策略将为你选择一个。假设你的前 4 位数字是 1234,主题中有 10 个分区。假设哈希值计算也为 1234。那么,它将落在分区 1234 % 10 = 4 中。如果你出于某种原因希望此交易到达分区 8,那么你必须在分区选择策略类或表达式中明确实现这一点。

以下是表示这两个不同层如何协同工作的流程图。

kafka producer partitions blog

分区键与消息键之间的混淆

有时会混淆分区键与实际通过 Kafka 主题发送到 Kafka 记录中用作键的消息键。这是通过不同的机制完成的。上述分区键和选择器只是确保选择了一个分区键,并基于该分区键在 Kafka 主题上选择了一个实际分区。但是,在生产时如何随记录发送一个键呢?这里同样,你可以选择两种方法。一种是简单地在出站消息中附加一个头。示例如下。

@Bean Supplier<Message<String>> process() {
   return () -> MessageBuilder.withPayload("foo")
     .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()) .build(); }

你还可以在 Kafka binder 上使用消息键 SpEL 表达式,如下所示。

spring.cloud.stream.kafka.binder.messageKeyExpression: headers['messageKey']

然后在出站消息上附加此头。

需要注意的一些事项

如果你没有提供分区键表达式或分区键提取器 bean,那么 Spring Cloud Stream 将完全不为你做任何分区决策。在这种情况下,如果主题有多个分区,将触发 Kafka 的默认分区机制。默认情况下,Kafka 使用 DefaultPartitioner,如果消息包含键(见上文),则使用该键的哈希值计算分区。如果消息没有键,则将使用轮询策略分配分区。从 Kafka 客户端 2.4 及以后版本开始,还有一些额外的复杂性需要注意。如果记录不包含分区信息(本博客的主要讨论内容)或记录缺少键,那么从 Kafka 2.4 开始,它将使用粘性分区(sticky partitions)而不是轮询策略。简而言之,粘性分区用于通过将记录粘附到一个或一组分区来最小化延迟。有关粘性分区的更多信息,请参阅 KIP-480 https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner

结论

在这篇博客中,我们讨论了在使用 Spring Cloud Stream 编写生产者应用时,它如何帮助处理 Kafka 分区。我们看到了 Spring Cloud Stream 提供了多种方式,让应用开发者能够配置分区的各种细节。我们探讨了分区键、分区选择器和消息键之间的区别。我们讨论了如何将消息键添加到 Kafka 记录中。最后,我们研究了 Spring Cloud Stream 生产者如何完全不参与分区业务,并让 Kafka 直接处理分区。

订阅 Spring 电子报

订阅 Spring 电子报,保持联系

订阅

抢先一步

VMware 提供培训和认证,助力你快速发展。

了解更多

获取支持

Tanzu Spring 通过简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区的所有即将举行的活动。

查看全部