领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多第 1 部分 - 编程模型 第 2 部分 - 编程模型(续) 第 3 部分 - 数据反序列化和序列化
在本系列文章中,我们将继续探讨 Kafka Streams 的 Spring Cloud Stream 绑定器,在这篇博文中,我们将了解 Kafka Streams 绑定器中可用的各种错误处理策略。
Kafka Streams 中的错误处理主要集中在输入端反序列化期间和输出端生产期间发生的错误。
Kafka Streams 允许您注册反序列化异常处理程序。默认行为是,当发生反序列化异常时,它会记录该错误并使应用程序失败 (LogAndFailExceptionHandler
)。它还允许您记录并跳过记录并继续应用程序 (LogAndContinueExceptionHandler
)。通常,您将相应的类作为配置的一部分提供。通过使用绑定器,您可以将这些异常处理程序设置在绑定器级别(这将适用于整个应用程序)或绑定级别(这为您提供了更细粒度的控制)。
以下是在绑定器级别设置反序列化异常处理程序的方法
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue
如果您只有一个具有单个输入的处理器,那么在绑定器上设置反序列化异常处理程序是一种简单的方法,如上所示。如果您有多个处理器或输入,并且想要分别控制对它们的错误处理,则需要为每个输入绑定设置。以下是一个示例
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler=logAndContinue
请注意,处理程序实际上是在输入绑定 process-in-0
上设置的。如果您还有其他此类输入绑定,则必须显式设置。
除了 Kafka Streams 提供的两个异常处理程序外,绑定器还提供第三种选项:自定义处理程序,允许您将反序列化错误中的记录发送到特殊的 DLQ。为了激活此功能,您必须在绑定器或绑定级别选择加入,如上所述。
以下是如何操作
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq
请记住,当在绑定器上使用此设置时,它会在全局级别激活 DLQ,并且它将应用于所有通过其绑定输入的主题。如果您不希望发生这种情况,则必须为每个输入绑定启用它。
默认情况下,DLQ 名称命名为 error.<input-topic-name>.<application-id for kafka streams>
。
您可以将 <input-topic-name>
替换为实际的主题名称。请注意,这**不是**绑定名称,而是实际的主题名称。
如果输入主题是 topic-1,并且 Kafka Streams 应用程序 ID 是 my-application,则默认 DLQ 名称将为 error.topic-1.my-application
。
您可以重置默认 DLQ 名称,如下所示
spring.cloud.stream.bindings.process-in-0.consumer.dlqName=input-1-dlq
(将 process-in-0
替换为实际的绑定名称)
如果绑定器供应程序对代理具有所需的权限,它将创建所有必要的 DLQ 主题。如果不是这种情况,则必须在应用程序启动之前手动创建这些主题。
默认情况下,绑定器假设 DLQ 主题已配置与输入主题相同数量的分区。如果事实并非如此(即,如果 DLQ 主题配置了不同数量的分区),则必须通过使用 DlqPartitionFunction 实现来告诉绑定器要将记录发送到的分区,如下所示
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
应用程序中只能存在一个这样的 Bean。因此,在多个处理器或输入具有单独的 DLQ 主题的情况下,您必须使用组(在使用绑定器时与应用程序 ID 相同)过滤记录。
到目前为止,我们讨论的所有异常处理程序都只处理围绕数据反序列化的错误。Kafka Streams 还提供了一种处理输出端生产者错误的功能。截至 3.0 版,绑定器没有提供支持此功能的一流机制。但是,这并不意味着您不能使用生产者异常处理程序。您可以使用绑定器依赖于 适用于 Apache Kafka 项目的 Spring 的各种自定义程序来执行此操作。这些自定义程序将成为我们本系列下一篇博文中的主题。
Kafka Streams 绑定器允许监视底层流线程的健康状况,并通过 Spring Boot 执行器端点公开健康指标指标。您可以在 此处 找到更多详细信息。除了健康指标外,绑定器还通过 Micrometer 指标注册表公开 Kafka Streams 指标。在此注册表中提供了通过 KafkaStreams 对象提供的全部基本指标。您可以在 此处 找到有关此内容的更多信息。
在这篇博文中,我们了解了 Kafka Streams 用于启用处理反序列化异常的各种策略。最重要的是,Kafka Streams 绑定器还提供了一个处理程序,允许您将容易出错的有效负载发送到 DLQ 主题。我们看到绑定器提供了处理这些 DLQ 主题的细粒度控制。
感谢您阅读至此!在下一篇博文中,我们将了解绑定器如何启用进一步的自定义。