领先一步
VMware 提供培训和认证,以加快您的进度。
了解更多我很高兴地宣布,spring-integration-kafka
(Spring 集成 Kafka 支持)2.0版本的第一个里程碑版本现已可用。
Spring 集成 Kafka扩展项目为 Apache Kafka 提供了inbound
和outbound
通道适配器。
从 2.0 版本开始,该项目是基于新的spring-kafka
项目进行的完整重写,该项目使用 Kafka 0.9.x.x
提供的纯 Java Producer
和Consumer
客户端。
工件org.springframework.integration:spring-integration-kafka:2.0.0.M1
可在里程碑存储库中找到。
拥有spring-kafka
项目中的MessageListenerContainer
基础,KafkaMessageDrivenChannelAdapter
的定义现在非常简单。
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
使用 XML 配置,我们也只需要声明单个组件。
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
有了spring-kafka
项目中的KafkaTemplate
基础,KafkaProducerMessageHandler
也很简单。
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
XML 配置也得到了简化。
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
从1.2
版本开始,Spring Integration Java DSL 引入了Kafka09
工厂来涵盖此新的2.0
版本中上述通道适配器的功能。例如,生产部分可能如下所示:
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
最后,也不要错过适用于 Apache Kafka 的 Spring公告!
与下一个适用于 Apache Kafka 的 Spring 一起,我们还可以考虑为Kafka Streams实现一些适配器。
由于该项目的代码库变得非常简洁,并且 Apache Kafka API 即将稳定,我们打算在时机成熟时将其吸收到 Spring Integration 代码 5.0 中。
与此同时,我们期待您的反馈,如果一切顺利,计划在接下来的几周内发布2.0.0.RELEASE
版本!