Spring Integration Kafka Support 2.0.0.M1 现已发布

发布 | Artem Bilan | 2016年4月11日 | ...

我很高兴地宣布 spring-integration-kafka (Spring Integration Kafka Support) 2.0 版本的第一个里程碑现已发布。

Spring Integration Kafka 扩展项目为 Apache Kafka 提供了 inboundoutbound 通道适配器。

从 2.0 版本开始,该项目是基于新的 spring-kafka 项目进行的完全重写,该项目使用了 Kafka 0.9.x.x 提供的纯 Java ProducerConsumer 客户端。

构件 org.springframework.integration:spring-integration-kafka:2.0.0.M1 可在 Milestone 仓库中找到。

主要功能

Kafka Consumer Channel Adapter

得益于 spring-kafka 项目的 MessageListenerContainer 基础,KafkaMessageDrivenChannelAdapter 的定义现在非常简单。

@Bean
public MessageProducer kafkaProducer(
                   AbstractMessageListenerContainer<Integer, String> container) {
    KafkaMessageDrivenChannelAdapter<Integer, String> adapter = 
                              new KafkaMessageDrivenChannelAdapter<>(container);
    adapter.setMessageConverter(new StringJsonMessageConverter());
    adapter.setOutputChannel(fromKafkaChannel());
    adapter.setErrorChannel(myErrorChannel());
    return adapter;
}

使用 XML 配置时,我们也只需要声明一个组件。

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

Kafka Producer Channel Adapter

得益于 spring-kafka 项目的 KafkaTemplate 基础,KafkaProducerMessageHandler 也同样简单。

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
                            KafkaTemplate<Integer, String> template) {
    KafkaProducerMessageHandler<Integer, String> handler = 
                         new KafkaProducerMessageHandler<>(template);
    handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
    handler.setPartitionIdExpression(
                            PARSER.parseExpression("headers.myPartition"));
    return handler;
}

XML 配置也得到了简化。

<int-kafka:outbound-channel-adapter 
                kafka-template="template" 
                channel="inputToKafka"
                topic="foo"/>

Java DSL 变更

1.2 版本开始,Spring Integration Java DSL 引入了 Kafka09 工厂,以涵盖此 2.0 版本上述通道适配器的功能。例如,生产部分可能如下所示:

.handle(Kafka09.outboundChannelAdapter(producerFactory())
             .defaultTopic("foo")
             .partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))

最后,也不要错过 Spring for Apache Kafka 的公告!

下一步

与下一个 Spring for Apache Kafka 版本一起,我们可能会考虑为 Kafka Streams 实现一些适配器。

由于该项目的代码库变得相当直接,并且 Apache Kafka API 看似即将稳定,因此我们打算在时机成熟时将此项目合并到 Spring Integration 5.0 中。

同时,我们期待您的反馈,如果一切顺利,我们计划在未来几周内发布 2.0.0.RELEASE

项目主页 | 帮助

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有