领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我们很高兴宣布 Reactor Kafka 1.0.0 的首个里程碑版本发布。
Reactor Kafka 是一个基于 Project Reactor 的 Apache Kafka 响应式 API。Reactor Kafka API 支持使用函数式 API 以非阻塞背压和极低开销的方式将消息发布到 Kafka 主题以及从 Kafka 主题消费消息。这使得使用 Reactor 的应用程序能够将 Kafka 用作消息总线或流平台,并与其他系统集成以提供端到端的响应式管道。
Reactor Kafka 的价值主张在于在包含多个外部交互的应用中高效利用资源,其中 Kafka 是外部系统之一。端到端的响应式管道受益于非阻塞背压和高效的线程使用,从而能够高效处理大量并发请求。Project Reactor 提供的优化使得响应式应用的开发开销极低,容量规划可预测,从而交付低延迟、高吞吐的管道。
要开始使用并运行示例响应式 Kafka 生产者和消费者,请遵循参考指南中“入门”部分的说明。
Reactor Kafka API 基于 Apache Kafka 生产者/消费者 API,包含两个主要类:
用于将消息发布到 Kafka 主题的 Sender
用于从 Kafka 主题消费消息的 Receiver
这些响应式接口提供了底层 Kafka Producer
和 Consumer
的全部功能。
Sender<Integer, String> sender =
Sender.create(SenderOptions.create(producerProps)); (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = (2)
Flux.range(1, 10)
.map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false) (3)
.doOnNext(r -> log.debug("Message #{} result: {}",
r.correlationMetadata(), r.recordMetadata())) (4)
.subscribe(); (5)
创建 Sender
要发送到 Kafka 的出站消息 Flux
响应式发送
记录每次发送的结果
订阅以启动消息流向 Kafka
ReceiverOptions<Integer, String> receiverOptions = (1)
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Receiver.create(receiverOptions) (2)
.receive() (3)
.subscribe(r -> {
log.info("Received message {} ", r.record()); (4)
r.offset().acknowledge(); (5)
});
创建 ReceiverOptions
并配置对 Kafka 主题的订阅
创建 Receiver
响应式接收
记录每条接收到的消息
处理消息后确认,以便提交偏移量
Reactor Kafka 源代码和示例可在 github 上获取。
更多信息和额外资源,请参阅Reactor Kafka 参考指南和Javadocs。