Spring Integration Kafka 扩展 1.0.GA 版本已发布

发布 | Artem Bilan | 2015 年 2 月 9 日 | ...

亲爱的 Spring 社区,

我们很高兴地宣布 Spring Integration Kafka 1.0 GA 扩展现已发布,它为 Apache Kafka 提供了熟悉的 Spring Integration 端点。 像往常一样,使用 Release Repository 以及 Maven 或 Gradle

compile "org.springframework.integration:spring-integration-kafka:1.0.0.RELEASE"

或下载 分发归档,体验一下。

首先感谢所有为该项目做出贡献的人——特别感谢该项目的创始人 Soby Chacko,他实现了基础设施,以及基于高级消费者的消息源和生产者,以及 Marius Bogoevici,他学习了用于消息驱动消费者的简单消费者 API 的复杂性。

概述

毫不奇怪,这个项目完全基于 Apache Kafka(0.8.1.1 版本)和 Spring Integration 基础(4.0.5.RELEASE 版本)。 我们提供了一些抽象,例如 ConfigurationConnectionFactoryKafkaMessageListenerContainerKafkaConsumerContext/KafkaProducerContextKafkaMessage 等,以遵循 Spring 的解耦和易用性原则。 基于这些抽象,我们提供了高级 API,例如 KafkaMessageDrivenChannelAdapterKafkaHighLevelConsumerMessageSourceKafkaProducerMessageHandler,这些是 Spring Integration 方面的适配器。 还提供了 XML 配置支持。

KafkaHighLevelConsumerMessageSource

Kafka 高级消费者 通过 <int-kafka:inbound-channel-adapter><int-kafka:consumer-context> 呈现,以使用 KafkaStream 从 Kafka 主题 poll 消息。 它的主要优点是使用简单,并且如果消息源的多个实例并行运行,则能够在消费者之间平衡分区。

典型的配置可能如下所示

<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext"
                                    channel="inputFromKafka">
       <int:poller fixed-delay="10"/>
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContext"
                            consumer-timeout="4000"
                            zookeeper-connect="zookeeperConnect">
     <int-kafka:consumer-configurations>
           <int-kafka:consumer-configuration group-id="default"
                                  value-decoder="valueDecoder"
                                  key-decoder="valueDecoder"
                                  max-messages="5000">
                  <int-kafka:topic id="test1" streams="4"/>
                  <int-kafka:topic id="test2" streams="4"/>
           </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

正如您所看到的,除了 <int-kafka:consumer-context> 之外,还需要引用 zookeeperConnect。 这是一个简单的 bean,它表示与 Zookeeper 集群的连接

<int-kafka:zookeeper-connect id="zookeeperConnect" 
                          zk-connect="host1,host2,host3:2182"
                          zk-connection-timeout="6000"
                          zk-session-timeout="6000"
                          zk-sync-time="2000"/>

KafkaHighLevelConsumerMessageSource 生成带有 Map<String, Map<Integer, List<Object>>> payloadMessage,它类似于“每个主题的按分区划分的 Kafka 消息”。

KafkaMessageDrivenChannelAdapter

Kafka 简单消费者 通过 <int-kafka:message-driven-adapter> 呈现,并且基于众所周知的 ListenerContainer 抽象 - KafkaMessageListenerContainer(类似于 Spring AMQP SimpleMessageListenerContainer 或 Spring JMS DefaultMessageListenerContainer

@Bean
public Configuration zkConfiguration() {
   return new ZookeeperConfiguration(new ZookeeperConnect());
}

@Bean
public ConnectionFactory kafkaConnectionFactory() {
   return new DefaultConnectionFactory(zkConfiguration());
}

@Bean
public MessageProducer kafkaMessageDrivenChannelAdapter() {
   KafkaMessageDrivenChannelAdapter adapter = 
            new KafkaMessageDrivenChannelAdapter(
   		new KafkaMessageListenerContainer(kafkaConnectionFactory(),    
                                       "topic1", "topic2"));
   adapter.setOutputChannel(inputChannel());
   return adapter;
}

该组件的主要优点是可以更好地控制组件侦听的分区(这些分区是可配置的),以及起始偏移量(例如,在需要重放主题的情况下)。 此外,还有更丰富的偏移量管理和错误处理策略。

监听任务的结果是单个 Message,其 payload 基于 Kafka 消息和带有来自 KafkaHeaders 的键的其他 headers。 排序在分区内保留。

可以为 Kafka 消息以及 Kafka 消息键使用 kafka.serializer.Decoder 配置这两种适配器。 Spring Integration Kafka 开箱即用地提供了 Avro Encoder/Decoder 实现。

此外,Spring Integration Kafka 引入了 OffsetManager 抽象来处理 Kafka 主题偏移量,这在使用 High Level Consumer 时不可用。 提供了 MetadataStoreOffsetManagerKafkaTopicOffsetManagerOffsetManager 必须注入到 KafkaMessageListenerContainer。 默认情况下,使用 MetadataStoreOffsetManager,它由 Spring Integration Core 中的 SimpleMetadataStore 支持。

KafkaProducerMessageHandler

Kafka 生产者 通过 <int-kafka:outbound-channel-adapter><int-kafka:producer-context> 对呈现。 后者利用目标 Kafka Producer 的配置,该配置由 MessageHeaders 中的 KafkaHeaders.TOPIC 选择,或者由 <int-kafka:outbound-channel-adapter> 上的 topic-expression 选择,并与 <int-kafka:producer-configuration> 子元素上配置的 topic 选项匹配

<int-kafka:producer-context id="kafkaProducerContext">
	<int-kafka:producer-configurations>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					key-class-type="java.lang.String"
					value-class-type="java.lang.String"
					topic="test1"
					value-encoder="kafkaEncoder"
					key-encoder="kafkaEncoder"
					compression-codec="default"/>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					topic="test2"
					compression-codec="default"/>
	</int-kafka:producer-configurations>
</int-kafka:producer-context>

正如您所看到的,有足够的选项来调整目标 Producer,并且每个 Producer 都可以由特定的 broker-list 支持。 如果只存在一个 <int-kafka:producer-configuration>,您可以将消息发送到任何 topic,例如,基于 Message 上下文标头。

Spring XD 使用这些适配器作为 Kafka 源和接收器。 此外,它还提供了 KafkaMessageBus。 这些功能将很快在 Spring XD 1.1 RELEASE 中提供。

此外,在我们完成此版本时,Apache Kafka 0.8.2 也收到了最终版本。 我们很高兴祝贺该团队,我们将在不久的将来合并新功能 - 这仅仅是在 Spring 应用程序中提供对 Kafka 的一流支持的开始!

有关更多信息,请参见项目主页

与往常一样,我们期待您的评论和反馈(StackOverflow (spring-integration 标签), Spring JIRA, GitHub),并且我们非常欢迎贡献

获取 Spring 新闻资讯

订阅 Spring 新闻资讯,保持联系

订阅

遥遥领先

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区中所有即将举行的活动。

查看所有