领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我很高兴地宣布 spring-integration-kafka (Spring Integration Kafka Support) 2.0 版本的第一个里程碑现已发布。
Spring Integration Kafka 扩展项目为 Apache Kafka 提供了 inbound 和 outbound 通道适配器。
从 2.0 版本开始,该项目是基于新的 spring-kafka 项目进行的完全重写,该项目使用了 Kafka 0.9.x.x 提供的纯 Java Producer 和 Consumer 客户端。
构件 org.springframework.integration:spring-integration-kafka:2.0.0.M1 可在 Milestone 仓库中找到。
得益于 spring-kafka 项目的 MessageListenerContainer 基础,KafkaMessageDrivenChannelAdapter 的定义现在非常简单。
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
使用 XML 配置时,我们也只需要声明一个组件。
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
得益于 spring-kafka 项目的 KafkaTemplate 基础,KafkaProducerMessageHandler 也同样简单。
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
XML 配置也得到了简化。
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
从 1.2 版本开始,Spring Integration Java DSL 引入了 Kafka09 工厂,以涵盖此 2.0 版本上述通道适配器的功能。例如,生产部分可能如下所示:
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
最后,也不要错过 Spring for Apache Kafka 的公告!
与下一个 Spring for Apache Kafka 版本一起,我们可能会考虑为 Kafka Streams 实现一些适配器。
由于该项目的代码库变得相当直接,并且 Apache Kafka API 看似即将稳定,因此我们打算在时机成熟时将此项目合并到 Spring Integration 5.0 中。
同时,我们期待您的反馈,如果一切顺利,我们计划在未来几周内发布 2.0.0.RELEASE!