Reactor Kafka 1.0.0.M1 版本发布

发布 | Rajini Sivaram | 2016年12月15日 | ...

我们很高兴地宣布 Reactor Kafka 1.0.0 的第一个里程碑版本发布。

什么是 Reactor Kafka?

Reactor Kafka 是一个基于 Project ReactorApache Kafka 响应式 API。Reactor Kafka API 允许使用具有非阻塞背压和极低开销的功能性 API 来发布消息到 Kafka 主题以及从 Kafka 主题消费消息。这使得使用 Reactor 的应用程序能够使用 Kafka 作为消息总线或流媒体平台,并与其他系统集成以提供端到端的响应式管道。

Reactor Kafka 的价值在于有效利用资源,尤其是在与多个外部系统交互(其中 Kafka 是其中一个)的应用程序中。端到端的响应式管道受益于非阻塞背压和高效的线程利用率,能够高效处理大量并发请求。Project Reactor 提供的优化功能使得开发具有极低开销和可预测容量规划的响应式应用程序成为可能,从而交付低延迟、高吞吐量的管道。

入门指南

要开始运行示例响应式 Kafka 生产者和消费者,请按照参考指南的入门部分中的说明操作。

Reactor Kafka API

Reactor Kafka API 基于 Apache Kafka 生产者/消费者 API,主要包含两个类:

  • Sender 用于向 Kafka 主题发布消息

  • Receiver 用于从 Kafka 主题消费消息

这些响应式接口提供了底层 Kafka ProducerConsumer 的全部功能。

响应式发送器

Sender<Integer, String> sender =
    Sender.create(SenderOptions.create(producerProps));                 (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux =             (2)
    Flux.range(1, 10)
        .map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false)                                        (3)
      .doOnNext(r -> log.debug("Message #{} result: {}",
                         r.correlationMetadata(), r.recordMetadata()))  (4)
      .subscribe();                                                     (5)
  1. 创建一个 Sender

  2. 要发送到 Kafka 的出站消息的 Flux

  3. 响应式发送

  4. 记录每次发送的结果

  5. 订阅以启动消息到 Kafka 的流程

响应式接收器

ReceiverOptions<Integer, String> receiverOptions =                      (1)
    ReceiverOptions.<Integer, String>create(consumerProps)
                   .subscription(Collections.singleton(topic));
Receiver.create(receiverOptions)                                        (2)
        .receive()                                                      (3)
        .subscribe(r -> {
                log.info("Received message {} ", r.record());           (4)
                r.offset().acknowledge();                               (5)
            });
  1. 创建 ReceiverOptions 并配置对 Kafka 主题的订阅

  2. 创建 Receiver

  3. 响应式接收

  4. 记录每条传入的消息

  5. 处理消息后确认,以便可以提交偏移量

资源

Reactor Kafka 源代码和示例可在 github 上找到。

更多信息和额外资源,请参见 Reactor Kafka 参考指南Javadoc 文档

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,帮助您快速提升技能。

了解更多

获取支持

Tanzu Spring 提供一个简单的订阅,包含 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部