抢占先机
VMware 提供培训和认证,助您加速进步。
了解更多第一部分 - 编程模型 第二部分 - 编程模型(续) 第三部分 - 数据反序列化和序列化
本篇博文继续深入探讨 Spring Cloud Stream 的 Kafka Streams 绑定器,我们将重点介绍 Kafka Streams 绑定器中提供的各种错误处理策略。
Kafka Streams 中的错误处理主要集中在入站反序列化过程中发生的错误以及出站生产过程中发生的错误。
Kafka Streams 允许您注册反序列化异常处理器。默认行为是,当发生反序列化异常时,它会记录错误并使应用程序失败 (LogAndFailExceptionHandler
)。它也允许您记录并跳过该记录,然后继续应用程序 (LogAndContinueExceptionHandler
)。通常,您需要将相应的类作为配置的一部分提供。通过使用绑定器,您可以在绑定器级别设置这些异常处理器,这将适用于整个应用程序;或者在绑定级别设置,这能提供更细粒度的控制。
您可以在绑定器级别按如下方式设置反序列化异常处理器:
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue
如果您只有一个带有单个输入的处理器,如上所示,在绑定器上设置反序列化异常处理器是一种简单的方法。如果您有多个处理器或输入,并且希望分别控制它们的错误处理,则需要在每个输入绑定上进行设置。以下是执行此操作的示例:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler=logAndContinue
请注意,处理器实际上是在输入绑定 process-in-0
上设置的。如果您有更多这样的输入绑定,则必须明确设置。
除了 Kafka Streams 提供的两种异常处理器外,绑定器还提供了第三种选择:一个自定义处理器,允许您将反序列化错误的记录发送到特殊的 DLQ。为了激活此功能,您必须如上所述在绑定器级别或绑定级别选择启用它。
操作方法如下:
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq
请记住,在绑定器级别使用此设置时,这会在全局级别激活 DLQ,并将通过所有输入绑定应用于所有输入主题。如果这不是您期望的行为,则必须为每个输入绑定单独启用它。
默认情况下,DLQ 名称格式为 error.<input-topic-name>.<application-id for kafka streams>
。
您可以将 <input-topic-name>
替换为实际主题名称。请注意,这不是绑定名称,而是实际主题名称。
如果输入主题是 topic-1,Kafka Streams 应用程序 ID 是 my-application,则默认的 DLQ 名称将是 error.topic-1.my-application
。
您可以按如下方式重置默认的 DLQ 名称:
spring.cloud.stream.bindings.process-in-0.consumer.dlqName=input-1-dlq
(将 process-in-0
替换为实际的绑定名称)
如果绑定器供应者在 broker 上具有所需的权限,它将创建所有必需的 DLQ 主题。如果不是这种情况,则必须在应用程序启动前手动创建这些主题。
默认情况下,绑定器假定 DLQ 主题配置的分区数量与输入主题相同。如果情况并非如此(即 DLQ 主题配置的分区数量不同),则必须通过实现 DlqPartitionFunction 来告知绑定器要将记录发送到哪个分区,如下所示:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
应用程序中只能存在一个这样的 Bean。因此,如果存在具有独立 DLQ 主题的多个处理器或输入,您必须使用组(在使用绑定器时与应用程序 ID 相同)来过滤记录。
我们目前讨论的所有异常处理器都只处理与数据反序列化相关的错误。Kafka Streams 还提供了处理出站生产者错误的功能。截至 3.0. Release 版本,绑定器没有提供一流的机制来支持这一点。但这并不意味着您无法使用生产者异常处理器。您可以利用绑定器依赖于 Spring for Apache Kafka 项目的各种定制器来实现此功能。这些定制器将是我们本系列下一篇博文的主题。
Kafka Streams 绑定器允许监控底层 streams 线程的健康状况,并通过 Spring Boot Actuator 端点暴露健康指标。您可以在这里找到更多详情。除了健康指标外,绑定器还通过 Micrometer meter-registry 暴露 Kafka Streams 指标。通过 KafkaStreams 对象可用的所有基本指标都可以在此注册表中找到。您可以在这里找到更多关于此的信息。
在这篇博文中,我们探讨了 Kafka Streams 用于处理反序列化异常的各种策略。除此之外,Kafka Streams 绑定器还提供了一个处理器,允许您将可能产生错误的有效负载发送到 DLQ 主题。我们了解到绑定器提供了对这些 DLQ 主题进行细粒度控制的功能。
感谢您阅读到这里!在下一篇博文中,我们将了解绑定器如何实现进一步的定制化。