使用 Spring Cloud Stream 和 Apache Kafka Streams 进行流处理。第 5 部分 - 应用程序定制

工程 | Soby Chacko | 2019 年 12 月 06 日 | ...

第 1 部分 - 编程模型 第 2 部分 - 编程模型(续) 第 3 部分 - 数据反序列化和序列化 第 4 部分 - 错误处理

在这篇博文中,我们将继续讨论 Spring Cloud Stream 中对 Kafka Streams 的支持。我们将详细说明您可以自定义 Kafka Streams 应用程序的方式。

自定义 StreamsBuilderFactoryBean

Kafka Streams binder 使用 StreamsBuilderFactoryBean,该工厂由 Spring for Apache Kafka 项目提供,用于构建 StreamsBuilder 对象,该对象是 Kafka Streams 应用程序的基础。此工厂 bean 是一个 Spring 生命周期 bean。通常,出于各种原因,必须在启动之前自定义此工厂 bean。正如在关于错误处理的上一篇博客文章中所述,如果您想注册生产异常处理程序,则需要自定义 StreamsBuilderFactoryBean。假设您有以下生产者异常处理程序

class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

如果您选择使用配置(使用 default.production.exception.handler),您可以直接注册它。

但是,使用 binder 时,更优雅的方法是将此注册为 StreamsBuilderFactoryBean 定制器的一部分,如下所示

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            IgnoreRecordTooLargeHandler.class);
    };
}

请注意,如果应用程序中有多个处理器,您可以根据应用程序 ID 控制哪个处理器获得自定义。 例如,您可以这样检查它

return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {

这是设置状态侦听器的另一个示例

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

自定义 KafkaStreams 对象。

KafkaStreams 对象是任何 Kafka Streams 应用程序的核心。 StreamsBuilderFactoryBean 负责创建拓扑,然后创建 KafkaStreams 对象。 在启动 KafkaStreams 对象之前,StreamsBuilderFactoryBean 提供了一个自定义此 KafkaStreams 对象的机会。 例如,如果您想为未捕获的异常设置应用程序范围的处理程序,您可以执行以下操作

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

请注意,我们从 StreamsBuilderFactoryBean 的定制器开始。 但是,在其中,我们使用单独的 KafkaStreamsCustomizer

总结

在这篇博文中,我们了解了 Spring Cloud Stream 中的 Kafka Streams binder 如何让您自定义底层的 StreamsBuilderFactoryBeanKafkaStreams 对象。

感谢您阅读到这里! 接下来,在本系列的最后一篇博文中,我们将介绍 binder 如何让您处理状态存储以及启用针对它们的交互式查询。

获取 Spring 新闻资讯

与 Spring 新闻资讯保持联系

订阅

抢占先机

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部