更进一步
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布 spring-integration-kafka
(Spring Integration Kafka 支持) 版本 2.0
的第一个里程碑版本现已发布。
Spring Integration Kafka 扩展项目为 Apache Kafka 提供了 inbound
和 outbound
通道适配器。
从 2.0 版本开始,该项目基于新的 spring-kafka
项目进行了完全重写,该项目使用 Kafka 0.9.x.x
提供的纯 Java Producer
和 Consumer
客户端。
org.springframework.integration:spring-integration-kafka:2.0.0.M1
组件可在 Milestone 存储库中找到。
由于有了 spring-kafka
项目的 MessageListenerContainer
基础,KafkaMessageDrivenChannelAdapter
的定义现在非常简单
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
使用 XML 配置,我们应该只声明单个组件
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
有了 spring-kafka
项目的 KafkaTemplate
基础,KafkaProducerMessageHandler
也同样简单
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
XML 配置也得到了简化
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
从 1.2
版本开始,Spring Integration Java DSL 引入了 Kafka09
工厂来覆盖此新 2.0
版本中上述通道适配器的功能。例如,生产部分可能如下所示
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
最后,不要错过 Spring for Apache Kafka 的发布公告!
结合下一个 Spring for Apache Kafka,我们可能会考虑为 Kafka Streams 实现一些适配器。
由于该项目的代码库变得非常简单,并且看起来 Apache Kafka API 将会 保持稳定,我们打算在时机成熟时将该项目纳入 Spring Integration Code 5.0 中。
与此同时,我们期待您的反馈,如果一切顺利,计划在未来几周内发布 2.0.0.RELEASE
!