领先一步
VMware提供培训和认证,以快速提升您的进度。
了解更多尊敬的Spring社区成员:
我们欣喜地宣布,Spring Integration Kafka 1.0 GA扩展现已发布,它为Apache Kafka提供了熟悉的Spring Integration端点。像往常一样,可以使用发布仓库(通过Maven或Gradle)
compile "org.springframework.integration:spring-integration-kafka:1.0.0.RELEASE"
或者下载发行版压缩包来体验它。
首先要感谢所有为该项目做出贡献的人——特别感谢项目的创始人Soby Chacko,他实现了基础架构,以及基于高级消费者的消息源和生产者;还要感谢Marius Bogoevici,他学习了用于消息驱动型消费者的简单消费者API的复杂之处。
概述
毫不奇怪,这个项目完全基于Apache Kafka(0.8.1.1版本)和Spring Integration基础(4.0.5.RELEASE版本)。我们提供了一些抽象,例如Configuration
、ConnectionFactory
、KafkaMessageListenerContainer
、KafkaConsumerContext
/KafkaProducerContext
、KafkaMessage
等,以遵循Spring的解耦和易用性原则。基于这些抽象,我们提供了高级API,例如KafkaMessageDrivenChannelAdapter
、KafkaHighLevelConsumerMessageSource
和KafkaProducerMessageHandler
,它们是Spring Integration中的适配器。还提供了XML配置支持。
KafkaHighLevelConsumerMessageSource
Kafka高级消费者通过<int-kafka:inbound-channel-adapter>
和<int-kafka:consumer-context>
来表示,使用KafkaStream
从Kafka主题中轮询消息。它的主要优点是易于使用,并且能够在多个消息源实例并行运行时在消费者之间平衡分区。
典型的配置可能如下所示:
<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext"
channel="inputFromKafka">
<int:poller fixed-delay="10"/>
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="4000"
zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default"
value-decoder="valueDecoder"
key-decoder="valueDecoder"
max-messages="5000">
<int-kafka:topic id="test1" streams="4"/>
<int-kafka:topic id="test2" streams="4"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
如您所见,<int-kafka:consumer-context>
还需要一个对zookeeperConnect
的引用。这是一个简单的bean,它表示与Zookeeper集群的连接。
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="host1,host2,host3:2182"
zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000"/>
KafkaHighLevelConsumerMessageSource
生成具有Map<String, Map<Integer, List<Object>>>
有效负载的Message
,其中它类似于“每个主题的按分区划分的Kafka消息”。
KafkaMessageDrivenChannelAdapter
Kafka简单消费者通过<int-kafka:message-driven-adapter>
来表示,并且基于众所周知的ListenerContainer
抽象——KafkaMessageListenerContainer
(类似于Spring AMQP的SimpleMessageListenerContainer
或Spring JMS的DefaultMessageListenerContainer
)。
@Bean
public Configuration zkConfiguration() {
return new ZookeeperConfiguration(new ZookeeperConnect());
}
@Bean
public ConnectionFactory kafkaConnectionFactory() {
return new DefaultConnectionFactory(zkConfiguration());
}
@Bean
public MessageProducer kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter adapter =
new KafkaMessageDrivenChannelAdapter(
new KafkaMessageListenerContainer(kafkaConnectionFactory(),
"topic1", "topic2"));
adapter.setOutputChannel(inputChannel());
return adapter;
}
此组件的主要优点是能够更好地控制单个组件侦听的分区(可配置),以及起始偏移量(例如,在重放主题时)。此外,还可以使用更丰富的偏移量管理和错误处理策略。
侦听任务的结果是一个单一的Message
,其有效负载基于Kafka消息,以及具有来自KafkaHeaders
的键的附加headers
。在一个分区内保持顺序。
这两个适配器都可以配置kafka.serializer.Decoder
来处理Kafka消息以及Kafka消息键。Spring Integration Kafka提供了现成的AvroEncoder/Decoder
实现。
此外,Spring Integration Kafka引入了OffsetManager
抽象来处理Kafka主题偏移量,而这在高级消费者
中是不可用的。提供了MetadataStoreOffsetManager
和KafkaTopicOffsetManager
。必须将OffsetManager
注入到KafkaMessageListenerContainer
中。默认情况下,使用由Spring Integration Core中的SimpleMetadataStore
支持的MetadataStoreOffsetManager
。
KafkaProducerMessageHandler
Kafka生产者由<int-kafka:outbound-channel-adapter>
和<int-kafka:producer-context>
对表示。后者利用目标KafkaProducer
的配置,该配置由MessageHeaders
中的KafkaHeaders.TOPIC
或<int-kafka:outbound-channel-adapter>
上的topic-expression
选择,并与<int-kafka:producer-configuration>
子元素上配置的topic
选项匹配。
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="test1"
value-encoder="kafkaEncoder"
key-encoder="kafkaEncoder"
compression-codec="default"/>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test2"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
如您所见,有足够的选项来调整目标Producer
,并且每个Producer
都可以由特定的broker-list
支持。如果只有一个<int-kafka:producer-configuration>
存在,则可以向任何topic
发送消息,例如基于Message
上下文头。
Spring XD将这些适配器用作Kafka源和接收器。此外,它还提供了KafkaMessageBus
。这些功能将很快在Spring XD 1.1 RELEASE中可用。
此外,在我们完成此版本发布时,Apache Kafka 0.8.2也发布了最终版本。我们很高兴祝贺该团队,我们将在不久的将来整合新功能——这仅仅是为Spring应用程序提供Kafka顶级支持的开始!
有关更多信息,请参阅项目主页。
与往常一样,我们期待您的意见和反馈(StackOverflow(spring-integration
标签)、Spring JIRA、GitHub),我们非常欢迎贡献!