抢先一步
VMware 提供培训和认证,以加速您的进步。
了解更多代表 Spring Integration 和 Spring Cloud Stream 团队,我很高兴地宣布 spring-kafka
(适用于 Apache Kafka 的 Spring)1.0 版的发布候选版本现已可用。
工件 org.springframework.kafka:spring-kafka:1.0.0.RC1
和 org.springframework.kafka:spring-kafka-test:1.0.0.RC1
可在 里程碑 存储库中找到。
首先,非常感谢所有参与其中的人,感谢积极参与的社区成员,他们提供了功能请求和贡献。特别感谢 Martin Dam,他花费了大量时间帮助我们处理用于处理缓慢侦听器的 pause/resume
算法。
自 第二个里程碑 以来,我们进行了一些重构和清理工作,包括
可以将 ConsumerRebalanceListener
注入到 MessageListenerContainer
中;
可以使用 (De)Serializer
分别为 ConsumerFactory
和 ProducerFactory
中的 key
和 value
自定义 KafkaConsumer
和 KafkaProducer
;这是一种替代使用属性进行配置的方法。
提供了基于 Jackson 库的 JsonSerializer
和 JsonDeserializer
;
提供了 RecordFilterStrategy
和 FilteringMessageListenerAdapter
以允许跳过记录或处理重复交付;
提供了易于理解的 RetryingMessageListenerAdapter
;
当 MessageListenerContainer
进入空闲状态时,会在可配置的时间后发出 ListenerContainerIdleEvent
;
提供 TopicPartitionInitialOffset
以允许配置 MessageListenerContainer
以分配到主题中的特定分区,并在启动时可选地跳转到所需偏移量;
@KafkaListener
的 @TopicPartition
属性已使用 @PartitionOffset
属性进行增强,用于初始偏移量配置;
改进了消费者重新平衡事件。
此发布候选版本实现的主要功能是在目标侦听器处理记录速度缓慢时暂停/恢复 KafkaConsumer
。如果我们在 session.timeout.ms
内不轮询 KafkaConsumer
,Kafka 会假设我们的消费者有缺陷并启动分区重新平衡过程(在使用组管理进行分区分配时)。为了保持活动状态,我们必须轮询消费者。考虑 Kafka 在轮询时返回大量记录并且处理这些记录花费的时间超过会话超时的情况。为此,可以使用 enablePause
、pauseAfter
和 queueDepth
配置 MessageListenerContainer
- 并且 KafkaConsumer
将被暂停(继续轮询,但不会接收任何记录),直到侦听器完成其工作并恢复消费者。再次感谢 Martin Dam 为此功能提供的帮助和测试。
有关更多信息,请参阅 参考手册。
不要错过 发布候选版本,适用于 Spring Integration Kafka 2.0,它已重新设计为基于此适用于 Apache Kafka 的 Spring 基础。
我们预计将在大约一周内发布 GA 版本,因此请尝试使用候选版本并尽快报告任何问题。
Gary Russell 将在 Spring One 平台 上讨论 适用于 Apache Kafka 的 Spring,该平台将于今年 8 月 1 日至 4 日在拉斯维加斯举行。还有许多其他精彩的演讲,因此请 查看议程 并 获取门票(如果您尚未这样做)。