领先一步
VMware 提供培训和认证,帮助您快速提升技能。
了解更多我们很高兴地宣布 Reactor Kafka 1.0.0 的第一个里程碑版本发布。
Reactor Kafka 是一个基于 Project Reactor 的 Apache Kafka 响应式 API。Reactor Kafka API 允许使用具有非阻塞背压和极低开销的功能性 API 来发布消息到 Kafka 主题以及从 Kafka 主题消费消息。这使得使用 Reactor 的应用程序能够使用 Kafka 作为消息总线或流媒体平台,并与其他系统集成以提供端到端的响应式管道。
Reactor Kafka 的价值在于有效利用资源,尤其是在与多个外部系统交互(其中 Kafka 是其中一个)的应用程序中。端到端的响应式管道受益于非阻塞背压和高效的线程利用率,能够高效处理大量并发请求。Project Reactor 提供的优化功能使得开发具有极低开销和可预测容量规划的响应式应用程序成为可能,从而交付低延迟、高吞吐量的管道。
要开始运行示例响应式 Kafka 生产者和消费者,请按照参考指南的入门部分中的说明操作。
Reactor Kafka API 基于 Apache Kafka 生产者/消费者 API,主要包含两个类:
Sender
用于向 Kafka 主题发布消息
Receiver
用于从 Kafka 主题消费消息
这些响应式接口提供了底层 Kafka Producer
和 Consumer
的全部功能。
Sender<Integer, String> sender =
Sender.create(SenderOptions.create(producerProps)); (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = (2)
Flux.range(1, 10)
.map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false) (3)
.doOnNext(r -> log.debug("Message #{} result: {}",
r.correlationMetadata(), r.recordMetadata())) (4)
.subscribe(); (5)
创建一个 Sender
要发送到 Kafka 的出站消息的 Flux
响应式发送
记录每次发送的结果
订阅以启动消息到 Kafka 的流程
ReceiverOptions<Integer, String> receiverOptions = (1)
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Receiver.create(receiverOptions) (2)
.receive() (3)
.subscribe(r -> {
log.info("Received message {} ", r.record()); (4)
r.offset().acknowledge(); (5)
});
创建 ReceiverOptions
并配置对 Kafka 主题的订阅
创建 Receiver
响应式接收
记录每条传入的消息
处理消息后确认,以便可以提交偏移量
Reactor Kafka 源代码和示例可在 github 上找到。
更多信息和额外资源,请参见 Reactor Kafka 参考指南 和 Javadoc 文档。