遥遥领先
VMware 提供培训和认证,以加速您的进度。
了解更多我很高兴地宣布 Spring for Apache Kafka 2.1.0.RELEASE 发布。
此外,维护版本 1.3.2.RELEASE 和 2.0.2.RELEASE 也已发布,其中包含重要的错误修复。 另请参见下文有关 spring-integration-kafka 3.0.0.RELEASE 的信息。 建议所有用户升级。
2.1 版本的主要目的是将 kafka-clients 库升级到 1.0.0,但我们包含了一些改进
有时,当无法处理消息时,您可能希望停止容器,以便可以更正情况并重新传递消息。 该框架现在为记录侦听器提供 ContainerStoppingErrorHandler,为批处理侦听器提供 ContainerStoppingBatchErrorHandler。
当检测到 NewTopic bean 的分区数大于主题上当前存在的分区数时,KafkaAdmin 现在支持增加分区。
StringJsonMessageConverter 和 JsonSerializer/JsonDeserializer 现在在 Headers 中传递和使用类型信息。 这允许在同一主题上轻松发送/接收多种类型
@SpringBootApplication public class Kafka21Application {
public static void main(String[] args) {
SpringApplication.run(Kafka21Application.class, args)
.close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> {
template.send(MessageBuilder.withPayload(42)
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
template.send(MessageBuilder.withPayload("43")
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
Thread.sleep(5_000);
};
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Component
@KafkaListener(id = "multi", topics = "blog")
public static class Listener {
@KafkaHandler
public void intListener(Integer in) {
System.out.println("Got an int: " + in);
}
@KafkaHandler
public void stringListener(String in) {
System.out.println("Got a string: " + in);
}
}
}
获得一个整数:42 获得一个字符串:43
首次运行此应用程序时,您可能需要此属性……
spring.kafka.consumer.auto-offset-reset=earliest
……以防模板在容器完全启动之前发送消息。
此外,可以使用 Kafka 属性为生产者/消费者配置 JsonSerializer 和 JsonDeserializer。
重要提示
根据 CVE-2017-4995,默认情况下只会反序列化 java.util 和 java.lang 中的类; 要反序列化(信任)其他包,请在反序列化器上或为消息转换器在自定义的 DefaultJackson2TypeMapper 中使用 addTrustedPackages 方法。 对于 JsonDeserializer,可以在 Kafka 消费者配置的属性 JsonDeserializer.TRUSTED_PACKAGES 中提供包。
请参阅新增功能获取完整信息。
最后,spring-integration-kafka 3.0.0.RELEASE 也已发布;它基于 Spring for Apache Kafka 2.1、Spring Integration 5.0 和 Spring Framework 5.0;它需要 Java 8 并且具有以下新功能
spring-messaging 标头映射到/从 Kafka Headers。有关 spring-kafka、spring-integration-kafka 和 kafka-clients 版本兼容性的完整矩阵,请参阅项目页面。