Spring Integration Kafka 支持 2.0.0.M1 现已发布

发布 | Artem Bilan | 2016 年 4 月 11 日 | ...

我很高兴地宣布 spring-integration-kafka (Spring Integration Kafka 支持) 版本 2.0 的第一个里程碑版本现已发布。

Spring Integration Kafka 扩展项目为 Apache Kafka 提供了 inboundoutbound 通道适配器。

从 2.0 版本开始,该项目基于新的 spring-kafka 项目进行了完全重写,该项目使用 Kafka 0.9.x.x 提供的纯 Java ProducerConsumer 客户端。

org.springframework.integration:spring-integration-kafka:2.0.0.M1 组件可在 Milestone 存储库中找到。

主要特性

Kafka Consumer 通道适配器

由于有了 spring-kafka 项目的 MessageListenerContainer 基础,KafkaMessageDrivenChannelAdapter 的定义现在非常简单

@Bean
public MessageProducer kafkaProducer(
                   AbstractMessageListenerContainer<Integer, String> container) {
    KafkaMessageDrivenChannelAdapter<Integer, String> adapter = 
                              new KafkaMessageDrivenChannelAdapter<>(container);
    adapter.setMessageConverter(new StringJsonMessageConverter());
    adapter.setOutputChannel(fromKafkaChannel());
    adapter.setErrorChannel(myErrorChannel());
    return adapter;
}

使用 XML 配置,我们应该只声明单个组件

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

Kafka Producer 通道适配器

有了 spring-kafka 项目的 KafkaTemplate 基础,KafkaProducerMessageHandler 也同样简单

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
                            KafkaTemplate<Integer, String> template) {
    KafkaProducerMessageHandler<Integer, String> handler = 
                         new KafkaProducerMessageHandler<>(template);
    handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
    handler.setPartitionIdExpression(
                            PARSER.parseExpression("headers.myPartition"));
    return handler;
}

XML 配置也得到了简化

<int-kafka:outbound-channel-adapter 
                kafka-template="template" 
                channel="inputToKafka"
                topic="foo"/>

Java DSL 变更

1.2 版本开始,Spring Integration Java DSL 引入了 Kafka09 工厂来覆盖此新 2.0 版本中上述通道适配器的功能。例如,生产部分可能如下所示

.handle(Kafka09.outboundChannelAdapter(producerFactory())
             .defaultTopic("foo")
             .partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))

最后,不要错过 Spring for Apache Kafka 的发布公告!

下一步

结合下一个 Spring for Apache Kafka,我们可能会考虑为 Kafka Streams 实现一些适配器。

由于该项目的代码库变得非常简单,并且看起来 Apache Kafka API 将会 保持稳定,我们打算在时机成熟时将该项目纳入 Spring Integration Code 5.0 中。

与此同时,我们期待您的反馈,如果一切顺利,计划在未来几周内发布 2.0.0.RELEASE

项目页面 | 帮助

获取 Spring 新闻资讯

通过 Spring 新闻资讯保持联系

订阅

更进一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部