Spring 集成 Kafka 支持 2.0.0.M1 现已可用

版本 | Artem Bilan | 2016年4月11日 | ...

我很高兴地宣布,spring-integration-kafka(Spring 集成 Kafka 支持)2.0版本的第一个里程碑版本现已可用。

Spring 集成 Kafka扩展项目为 Apache Kafka 提供了inboundoutbound通道适配器。

从 2.0 版本开始,该项目是基于新的spring-kafka项目进行的完整重写,该项目使用 Kafka 0.9.x.x提供的纯 Java ProducerConsumer客户端。

工件org.springframework.integration:spring-integration-kafka:2.0.0.M1可在里程碑存储库中找到。

主要功能

Kafka Consumer 通道适配器

拥有spring-kafka项目中的MessageListenerContainer基础,KafkaMessageDrivenChannelAdapter的定义现在非常简单。

@Bean
public MessageProducer kafkaProducer(
                   AbstractMessageListenerContainer<Integer, String> container) {
    KafkaMessageDrivenChannelAdapter<Integer, String> adapter = 
                              new KafkaMessageDrivenChannelAdapter<>(container);
    adapter.setMessageConverter(new StringJsonMessageConverter());
    adapter.setOutputChannel(fromKafkaChannel());
    adapter.setErrorChannel(myErrorChannel());
    return adapter;
}

使用 XML 配置,我们也只需要声明单个组件。

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

Kafka Producer 通道适配器

有了spring-kafka项目中的KafkaTemplate基础,KafkaProducerMessageHandler也很简单。

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
                            KafkaTemplate<Integer, String> template) {
    KafkaProducerMessageHandler<Integer, String> handler = 
                         new KafkaProducerMessageHandler<>(template);
    handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
    handler.setPartitionIdExpression(
                            PARSER.parseExpression("headers.myPartition"));
    return handler;
}

XML 配置也得到了简化。

<int-kafka:outbound-channel-adapter 
                kafka-template="template" 
                channel="inputToKafka"
                topic="foo"/>

Java DSL 更改

1.2版本开始,Spring Integration Java DSL 引入了Kafka09工厂来涵盖此新的2.0版本中上述通道适配器的功能。例如,生产部分可能如下所示:

.handle(Kafka09.outboundChannelAdapter(producerFactory())
             .defaultTopic("foo")
             .partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))

最后,也不要错过适用于 Apache Kafka 的 Spring公告!

后续步骤

与下一个适用于 Apache Kafka 的 Spring 一起,我们还可以考虑为Kafka Streams实现一些适配器。

由于该项目的代码库变得非常简洁,并且 Apache Kafka API 即将稳定,我们打算在时机成熟时将其吸收到 Spring Integration 代码 5.0 中。

与此同时,我们期待您的反馈,如果一切顺利,计划在接下来的几周内发布2.0.0.RELEASE版本!

项目页面 | 帮助

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部