Spring Integration Kafka扩展1.0.GA版本现已发布

发布 | Artem Bilan | 2015年2月9日 | ...

尊敬的Spring社区成员:

我们欣喜地宣布,Spring Integration Kafka 1.0 GA扩展现已发布,它为Apache Kafka提供了熟悉的Spring Integration端点。像往常一样,可以使用发布仓库(通过Maven或Gradle)

compile "org.springframework.integration:spring-integration-kafka:1.0.0.RELEASE"

或者下载发行版压缩包来体验它。

首先要感谢所有为该项目做出贡献的人——特别感谢项目的创始人Soby Chacko,他实现了基础架构,以及基于高级消费者的消息源和生产者;还要感谢Marius Bogoevici,他学习了用于消息驱动型消费者的简单消费者API的复杂之处。

概述

毫不奇怪,这个项目完全基于Apache Kafka(0.8.1.1版本)和Spring Integration基础(4.0.5.RELEASE版本)。我们提供了一些抽象,例如ConfigurationConnectionFactoryKafkaMessageListenerContainerKafkaConsumerContext/KafkaProducerContextKafkaMessage等,以遵循Spring的解耦和易用性原则。基于这些抽象,我们提供了高级API,例如KafkaMessageDrivenChannelAdapterKafkaHighLevelConsumerMessageSourceKafkaProducerMessageHandler,它们是Spring Integration中的适配器。还提供了XML配置支持。

KafkaHighLevelConsumerMessageSource

Kafka高级消费者通过<int-kafka:inbound-channel-adapter><int-kafka:consumer-context>来表示,使用KafkaStream从Kafka主题中轮询消息。它的主要优点是易于使用,并且能够在多个消息源实例并行运行时在消费者之间平衡分区。

典型的配置可能如下所示:

<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext"
                                    channel="inputFromKafka">
       <int:poller fixed-delay="10"/>
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContext"
                            consumer-timeout="4000"
                            zookeeper-connect="zookeeperConnect">
     <int-kafka:consumer-configurations>
           <int-kafka:consumer-configuration group-id="default"
                                  value-decoder="valueDecoder"
                                  key-decoder="valueDecoder"
                                  max-messages="5000">
                  <int-kafka:topic id="test1" streams="4"/>
                  <int-kafka:topic id="test2" streams="4"/>
           </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

如您所见,<int-kafka:consumer-context>还需要一个对zookeeperConnect的引用。这是一个简单的bean,它表示与Zookeeper集群的连接。

<int-kafka:zookeeper-connect id="zookeeperConnect" 
                          zk-connect="host1,host2,host3:2182"
                          zk-connection-timeout="6000"
                          zk-session-timeout="6000"
                          zk-sync-time="2000"/>

KafkaHighLevelConsumerMessageSource生成具有Map<String, Map<Integer, List<Object>>>有效负载的Message,其中它类似于“每个主题的按分区划分的Kafka消息”。

KafkaMessageDrivenChannelAdapter

Kafka简单消费者通过<int-kafka:message-driven-adapter>来表示,并且基于众所周知的ListenerContainer抽象——KafkaMessageListenerContainer(类似于Spring AMQP的SimpleMessageListenerContainer或Spring JMS的DefaultMessageListenerContainer)。

@Bean
public Configuration zkConfiguration() {
   return new ZookeeperConfiguration(new ZookeeperConnect());
}

@Bean
public ConnectionFactory kafkaConnectionFactory() {
   return new DefaultConnectionFactory(zkConfiguration());
}

@Bean
public MessageProducer kafkaMessageDrivenChannelAdapter() {
   KafkaMessageDrivenChannelAdapter adapter = 
            new KafkaMessageDrivenChannelAdapter(
   		new KafkaMessageListenerContainer(kafkaConnectionFactory(),    
                                       "topic1", "topic2"));
   adapter.setOutputChannel(inputChannel());
   return adapter;
}

此组件的主要优点是能够更好地控制单个组件侦听的分区(可配置),以及起始偏移量(例如,在重放主题时)。此外,还可以使用更丰富的偏移量管理和错误处理策略。

侦听任务的结果是一个单一的Message,其有效负载基于Kafka消息,以及具有来自KafkaHeaders的键的附加headers。在一个分区内保持顺序。

这两个适配器都可以配置kafka.serializer.Decoder来处理Kafka消息以及Kafka消息键。Spring Integration Kafka提供了现成的AvroEncoder/Decoder实现。

此外,Spring Integration Kafka引入了OffsetManager抽象来处理Kafka主题偏移量,而这在高级消费者中是不可用的。提供了MetadataStoreOffsetManagerKafkaTopicOffsetManager。必须将OffsetManager注入到KafkaMessageListenerContainer中。默认情况下,使用由Spring Integration Core中的SimpleMetadataStore支持的MetadataStoreOffsetManager

KafkaProducerMessageHandler

Kafka生产者<int-kafka:outbound-channel-adapter><int-kafka:producer-context>对表示。后者利用目标KafkaProducer的配置,该配置由MessageHeaders中的KafkaHeaders.TOPIC<int-kafka:outbound-channel-adapter>上的topic-expression选择,并与<int-kafka:producer-configuration>子元素上配置的topic选项匹配。

<int-kafka:producer-context id="kafkaProducerContext">
	<int-kafka:producer-configurations>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					key-class-type="java.lang.String"
					value-class-type="java.lang.String"
					topic="test1"
					value-encoder="kafkaEncoder"
					key-encoder="kafkaEncoder"
					compression-codec="default"/>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					topic="test2"
					compression-codec="default"/>
	</int-kafka:producer-configurations>
</int-kafka:producer-context>

如您所见,有足够的选项来调整目标Producer,并且每个Producer都可以由特定的broker-list支持。如果只有一个<int-kafka:producer-configuration>存在,则可以向任何topic发送消息,例如基于Message上下文头。

Spring XD将这些适配器用作Kafka源和接收器。此外,它还提供了KafkaMessageBus。这些功能将很快在Spring XD 1.1 RELEASE中可用。

此外,在我们完成此版本发布时,Apache Kafka 0.8.2也发布了最终版本。我们很高兴祝贺该团队,我们将在不久的将来整合新功能——这仅仅是为Spring应用程序提供Kafka顶级支持的开始!

有关更多信息,请参阅项目主页

与往常一样,我们期待您的意见和反馈(StackOverflowspring-integration标签)、Spring JIRAGitHub),我们非常欢迎贡献

获取Spring时事通讯

通过Spring时事通讯保持联系

订阅

领先一步

VMware提供培训和认证,以快速提升您的进度。

了解更多

获得支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部