使用 Spring Cloud Stream 和 Apache Kafka Streams 进行流处理。第 4 部分 - 错误处理

工程 | Soby Chacko | 2019 年 12 月 5 日 | ...

第 1 部分 - 编程模型 第 2 部分 - 编程模型(续) 第 3 部分 - 数据反序列化和序列化

在本系列文章中,我们将继续探讨 Kafka Streams 的 Spring Cloud Stream 绑定器,在这篇博文中,我们将了解 Kafka Streams 绑定器中可用的各种错误处理策略。

Kafka Streams 中的错误处理主要集中在输入端反序列化期间和输出端生产期间发生的错误。

处理反序列化异常

Kafka Streams 允许您注册反序列化异常处理程序。默认行为是,当发生反序列化异常时,它会记录该错误并使应用程序失败 (LogAndFailExceptionHandler)。它还允许您记录并跳过记录并继续应用程序 (LogAndContinueExceptionHandler)。通常,您将相应的类作为配置的一部分提供。通过使用绑定器,您可以将这些异常处理程序设置在绑定器级别(这将适用于整个应用程序)或绑定级别(这为您提供了更细粒度的控制)。

以下是在绑定器级别设置反序列化异常处理程序的方法

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

如果您只有一个具有单个输入的处理器,那么在绑定器上设置反序列化异常处理程序是一种简单的方法,如上所示。如果您有多个处理器或输入,并且想要分别控制对它们的错误处理,则需要为每个输入绑定设置。以下是一个示例

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler=logAndContinue

请注意,处理程序实际上是在输入绑定 process-in-0 上设置的。如果您还有其他此类输入绑定,则必须显式设置。

Kafka Streams 和 DLQ(死信队列)

除了 Kafka Streams 提供的两个异常处理程序外,绑定器还提供第三种选项:自定义处理程序,允许您将反序列化错误中的记录发送到特殊的 DLQ。为了激活此功能,您必须在绑定器或绑定级别选择加入,如上所述。

以下是如何操作

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq

请记住,当在绑定器上使用此设置时,它会在全局级别激活 DLQ,并且它将应用于所有通过其绑定输入的主题。如果您不希望发生这种情况,则必须为每个输入绑定启用它。

默认情况下,DLQ 名称命名为 error.<input-topic-name>.<application-id for kafka streams>

您可以将 <input-topic-name> 替换为实际的主题名称。请注意,这**不是**绑定名称,而是实际的主题名称。

如果输入主题是 topic-1,并且 Kafka Streams 应用程序 ID 是 my-application,则默认 DLQ 名称将为 error.topic-1.my-application

更改绑定器生成的默认 DLQ 名称

您可以重置默认 DLQ 名称,如下所示

spring.cloud.stream.bindings.process-in-0.consumer.dlqName=input-1-dlq(将 process-in-0 替换为实际的绑定名称)

如果绑定器供应程序对代理具有所需的权限,它将创建所有必要的 DLQ 主题。如果不是这种情况,则必须在应用程序启动之前手动创建这些主题。

DLQ 主题和分区

默认情况下,绑定器假设 DLQ 主题已配置与输入主题相同数量的分区。如果事实并非如此(即,如果 DLQ 主题配置了不同数量的分区),则必须通过使用 DlqPartitionFunction 实现来告诉绑定器要将记录发送到的分区,如下所示

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}

应用程序中只能存在一个这样的 Bean。因此,在多个处理器或输入具有单独的 DLQ 主题的情况下,您必须使用组(在使用绑定器时与应用程序 ID 相同)过滤记录。

处理生产者错误

到目前为止,我们讨论的所有异常处理程序都只处理围绕数据反序列化的错误。Kafka Streams 还提供了一种处理输出端生产者错误的功能。截至 3.0 版,绑定器没有提供支持此功能的一流机制。但是,这并不意味着您不能使用生产者异常处理程序。您可以使用绑定器依赖于 适用于 Apache Kafka 项目的 Spring 的各种自定义程序来执行此操作。这些自定义程序将成为我们本系列下一篇博文中的主题。

Kafka Streams 绑定器健康指标和指标

Kafka Streams 绑定器允许监视底层流线程的健康状况,并通过 Spring Boot 执行器端点公开健康指标指标。您可以在 此处 找到更多详细信息。除了健康指标外,绑定器还通过 Micrometer 指标注册表公开 Kafka Streams 指标。在此注册表中提供了通过 KafkaStreams 对象提供的全部基本指标。您可以在 此处 找到有关此内容的更多信息。

总结

在这篇博文中,我们了解了 Kafka Streams 用于启用处理反序列化异常的各种策略。最重要的是,Kafka Streams 绑定器还提供了一个处理程序,允许您将容易出错的有效负载发送到 DLQ 主题。我们看到绑定器提供了处理这些 DLQ 主题的细粒度控制。

感谢您阅读至此!在下一篇博文中,我们将了解绑定器如何启用进一步的自定义。

获取 Spring 时事通讯

通过 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部