领先一步
VMware提供培训和认证,以加速您的进步。
了解更多第1部分 - 编程模型 第2部分 - 编程模型续 第3部分 - 数据反序列化和序列化 第4部分 - 错误处理
在这篇博客文章中,我们将继续讨论Spring Cloud Stream对Kafka Streams的支持。我们将详细阐述自定义Kafka Streams应用程序的方法。
Kafka Streams绑定器使用StreamsBuilderFactoryBean(由Spring for Apache Kafka项目提供)来构建StreamsBuilder对象,该对象是Kafka Streams应用程序的基础。此工厂bean是一个Spring生命周期bean。通常,出于各种原因,必须在启动此工厂bean之前对其进行自定义。如关于错误处理的上一篇博客文章中所述,如果要注册生产者异常处理程序,则需要自定义StreamsBuilderFactoryBean
。假设您有此生产者异常处理程序
class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
} else {
return ProductionExceptionHandlerResponse.FAIL;
}
}
}
您可以选择直接使用配置注册它(使用default.production.exception.handler
)。
但是,在使用绑定器时,更优雅的方法是将其注册为StreamsBuilderFactoryBean
自定义程序的一部分,如下所示
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);
};
}
请注意,如果应用程序中有多个处理器,您可以根据应用程序ID控制哪个处理器获得自定义。例如,您可以通过这种方式进行检查
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
这是另一个设置状态侦听器的示例
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
KafkaStreams对象是任何Kafka Streams应用程序的核心。StreamsBuilderFactoryBean
负责创建拓扑,然后创建KafkaStreams
对象。在启动KafkaStreams
对象之前,StreamsBuilderFactoryBean提供了自定义此KafkaStreams
对象的机会。例如,如果要设置未捕获异常的应用程序范围处理程序,可以执行以下操作
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
请注意,我们从StreamsBuilderFactoryBean
的自定义程序开始。但是,在其中,我们使用单独的KafkaStreamsCustomizer
。
在这篇博客文章中,我们了解了Spring Cloud Stream中的Kafka Streams绑定器如何让您自定义底层的StreamsBuilderFactoryBean
和KafkaStreams
对象。
感谢您的阅读!接下来,在本系列的最后一篇博客文章中,我们将了解绑定器如何让您处理状态存储并启用针对它们的交互式查询。