使用Spring Cloud Stream和Apache Kafka Streams进行流处理。第5部分 - 应用程序自定义

工程 | Soby Chacko | 2019年12月6日 | ...

第1部分 - 编程模型 第2部分 - 编程模型续 第3部分 - 数据反序列化和序列化 第4部分 - 错误处理

在这篇博客文章中,我们将继续讨论Spring Cloud Stream对Kafka Streams的支持。我们将详细阐述自定义Kafka Streams应用程序的方法。

自定义StreamsBuilderFactoryBean

Kafka Streams绑定器使用StreamsBuilderFactoryBean(由Spring for Apache Kafka项目提供)来构建StreamsBuilder对象,该对象是Kafka Streams应用程序的基础。此工厂bean是一个Spring生命周期bean。通常,出于各种原因,必须在启动此工厂bean之前对其进行自定义。如关于错误处理的上一篇博客文章中所述,如果要注册生产者异常处理程序,则需要自定义StreamsBuilderFactoryBean。假设您有此生产者异常处理程序

class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

您可以选择直接使用配置注册它(使用default.production.exception.handler)。

但是,在使用绑定器时,更优雅的方法是将其注册为StreamsBuilderFactoryBean自定义程序的一部分,如下所示

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            IgnoreRecordTooLargeHandler.class);
    };
}

请注意,如果应用程序中有多个处理器,您可以根据应用程序ID控制哪个处理器获得自定义。例如,您可以通过这种方式进行检查

return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {

这是另一个设置状态侦听器的示例

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

自定义KafkaStreams对象。

KafkaStreams对象是任何Kafka Streams应用程序的核心。StreamsBuilderFactoryBean负责创建拓扑,然后创建KafkaStreams对象。在启动KafkaStreams对象之前,StreamsBuilderFactoryBean提供了自定义此KafkaStreams对象的机会。例如,如果要设置未捕获异常的应用程序范围处理程序,可以执行以下操作

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

请注意,我们从StreamsBuilderFactoryBean的自定义程序开始。但是,在其中,我们使用单独的KafkaStreamsCustomizer

总结

在这篇博客文章中,我们了解了Spring Cloud Stream中的Kafka Streams绑定器如何让您自定义底层的StreamsBuilderFactoryBeanKafkaStreams对象。

感谢您的阅读!接下来,在本系列的最后一篇博客文章中,我们将了解绑定器如何让您处理状态存储并启用针对它们的交互式查询。

获取Spring新闻

通过Spring新闻保持联系

订阅

领先一步

VMware提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部