领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布适用于 Apache Kafka 的 Spring 2.1.0.RELEASE 现已推出。
此外,维护版本 1.3.2.RELEASE 和 2.0.2.RELEASE 也已发布,其中包含重要的错误修复。另请参阅以下有关 spring-integration-kafka
3.0.0.RELEASE 的信息。建议所有用户进行升级。
2.1 版本的主要目的是将 kafka-clients
库升级到 1.0.0,但我们也包含了一些改进
有时,当无法处理消息时,您可能希望停止容器,以便可以更正条件并重新传递消息。框架现在为记录侦听器提供了 ContainerStoppingErrorHandler
,为批处理侦听器提供了 ContainerStoppingBatchErrorHandler
。
KafkaAdmin
现在支持在检测到 NewTopic
bean 的分区数大于主题上当前存在的分区数时增加分区。
StringJsonMessageConverter
和 JsonSerializer/JsonDeserializer
现在在 Headers
中传递和使用类型信息。这允许在同一主题上轻松发送/接收多种类型
@SpringBootApplication public class Kafka21Application {
public static void main(String[] args) {
SpringApplication.run(Kafka21Application.class, args)
.close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> {
template.send(MessageBuilder.withPayload(42)
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
template.send(MessageBuilder.withPayload("43")
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
Thread.sleep(5_000);
};
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Component
@KafkaListener(id = "multi", topics = "blog")
public static class Listener {
@KafkaHandler
public void intListener(Integer in) {
System.out.println("Got an int: " + in);
}
@KafkaHandler
public void stringListener(String in) {
System.out.println("Got a string: " + in);
}
}
}
获取一个 int:42 获取一个字符串:43
第一次运行此应用程序时,您可能需要此属性……
spring.kafka.consumer.auto-offset-reset=earliest
……以防模板在容器完全启动之前发送消息。
此外,可以使用 Kafka 属性为生产者/消费者配置 JsonSerializer
和 JsonDeserializer
。
重要
根据 CVE-2017-4995,默认情况下,只会反序列化 java.util
和 java.lang
中的类;要反序列化(信任)其他包,请在反序列化程序上或在消息转换器的自定义 DefaultJackson2TypeMapper
中使用 addTrustedPackages
方法。对于 JsonDeserializer
,可以在 Kafka 消费者配置中的属性 JsonDeserializer.TRUSTED_PACKAGES
中提供这些包。
有关完整信息,请参阅 新增功能。
最后,spring-integration-kafka
3.0.0.RELEASE 也已发布;它基于适用于 Apache Kafka 的 Spring 2.1、Spring Integration 5.0 和 Spring Framework 5.0;它需要 Java 8,并具有以下新功能
spring-messaging
标头映射到 Kafka Headers
或从 Kafka Headers
映射。有关 spring-kafka
、spring-integration-kafka
和 kafka-clients
版本兼容性的完整矩阵,请参阅 项目页面。