领先一步
VMware 提供培训和认证,助您加速进步。
了解更多第一部分 - 编程模型 第二部分 - 编程模型(续) 第三部分 - 数据反序列化和序列化 第四部分 - 错误处理
在本篇博文中,我们将继续探讨 Spring Cloud Stream 对 Kafka Streams 的支持。我们将详细介绍如何自定义 Kafka Streams 应用程序。
Kafka Streams 绑定器使用 StreamsBuilderFactoryBean,该工厂由 Spring for Apache Kafka 项目提供,用于构建 Kafka Streams 应用程序的基础 StreamsBuilder 对象。这个工厂 Bean 是一个 Spring 生命周期 Bean。通常,在启动之前必须自定义此工厂 Bean,原因有很多。正如在关于错误处理的上一篇博客文章中所述,如果您想注册一个生产环境异常处理器,就需要自定义 StreamsBuilderFactoryBean。假设您有这个生产环境异常处理器:
class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
} else {
return ProductionExceptionHandlerResponse.FAIL;
}
}
}
如果您选择使用配置(使用 default.production.exception.handler),则可以直接注册它。
然而,当使用绑定器时,一个更优雅的方法是将此作为 StreamsBuilderFactoryBean 自定义器的一部分进行注册,如下所示:
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);
};
}
请注意,如果应用程序中有多个处理器,您可以根据应用程序 ID 控制哪个处理器获得自定义。例如,您可以通过这种方式进行检查:
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
这是设置状态监听器的另一个示例:
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
KafkaStreams 对象是任何 Kafka Streams 应用程序的核心。StreamsBuilderFactoryBean 负责创建拓扑,然后创建 KafkaStreams 对象。在启动 KafkaStreams 对象之前,StreamsBuilderFactoryBean 提供了一个自定义此 KafkaStreams 对象的机会。例如,如果您想为未捕获的异常设置一个应用程序范围的处理器,您可以这样做:
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
请注意,我们首先使用了 StreamsBuilderFactoryBean 的自定义器。但是,在其中,我们使用了单独的 KafkaStreamsCustomizer。
在这篇博客文章中,我们看到了 Spring Cloud Stream 中的 Kafka Streams 绑定器如何允许您自定义底层的 StreamsBuilderFactoryBean 和 KafkaStreams 对象。
感谢您阅读到这里!接下来,在本系列的最后一篇博客文章中,我们将探讨绑定器如何让您处理状态存储以及如何对它们启用交互式查询。