适用于 Apache Kafka 的 Spring 2.1.0.RELEASE(以及 1.3.2 和 2.0.2)发布

版本发布 | Gary Russell | 2017 年 12 月 1 日 | ...

我很高兴地宣布适用于 Apache Kafka 的 Spring 2.1.0.RELEASE 现已推出。

此外,维护版本 1.3.2.RELEASE 和 2.0.2.RELEASE 也已发布,其中包含重要的错误修复。另请参阅以下有关 spring-integration-kafka 3.0.0.RELEASE 的信息。建议所有用户进行升级。

2.1 版本的主要目的是将 kafka-clients 库升级到 1.0.0,但我们也包含了一些改进

  • 有时,当无法处理消息时,您可能希望停止容器,以便可以更正条件并重新传递消息。框架现在为记录侦听器提供了 ContainerStoppingErrorHandler,为批处理侦听器提供了 ContainerStoppingBatchErrorHandler

  • KafkaAdmin 现在支持在检测到 NewTopic bean 的分区数大于主题上当前存在的分区数时增加分区。

  • StringJsonMessageConverterJsonSerializer/JsonDeserializer 现在在 Headers 中传递和使用类型信息。这允许在同一主题上轻松发送/接收多种类型

    @SpringBootApplication public class Kafka21Application {

    public static void main(String[] args) {
        SpringApplication.run(Kafka21Application.class, args)
            .close();
    }
    
    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> {
            template.send(MessageBuilder.withPayload(42)
                    .setHeader(KafkaHeaders.TOPIC, "blog")
                    .build());
            template.send(MessageBuilder.withPayload("43")
                    .setHeader(KafkaHeaders.TOPIC, "blog")
                    .build());
            Thread.sleep(5_000);
        };
    }
    
    @Bean
    public StringJsonMessageConverter converter() {
        return new StringJsonMessageConverter();
    }
    
    @Component
    @KafkaListener(id = "multi", topics = "blog")
    public static class Listener {
    
        @KafkaHandler
        public void intListener(Integer in) {
            System.out.println("Got an int: " + in);
        }
    
        @KafkaHandler
        public void stringListener(String in) {
            System.out.println("Got a string: " + in);
        }
    
    }
    

    }

    获取一个 int:42 获取一个字符串:43

第一次运行此应用程序时,您可能需要此属性……

spring.kafka.consumer.auto-offset-reset=earliest

……以防模板在容器完全启动之前发送消息。

此外,可以使用 Kafka 属性为生产者/消费者配置 JsonSerializerJsonDeserializer

重要

根据 CVE-2017-4995,默认情况下,只会反序列化 java.utiljava.lang 中的类;要反序列化(信任)其他包,请在反序列化程序上或在消息转换器的自定义 DefaultJackson2TypeMapper 中使用 addTrustedPackages 方法。对于 JsonDeserializer,可以在 Kafka 消费者配置中的属性 JsonDeserializer.TRUSTED_PACKAGES 中提供这些包。

有关完整信息,请参阅 新增功能

最后,spring-integration-kafka 3.0.0.RELEASE 也已发布;它基于适用于 Apache Kafka 的 Spring 2.1、Spring Integration 5.0 和 Spring Framework 5.0;它需要 Java 8,并具有以下新功能

  • spring-messaging 标头映射到 Kafka Headers 或从 Kafka Headers 映射。

有关 spring-kafkaspring-integration-kafkakafka-clients 版本兼容性的完整矩阵,请参阅 项目页面

项目页面 | 问题 | 贡献 | 帮助 | 聊天

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部