Spring Cloud Stream 2.0 - 轮询式消费者

工程 | Gary Russell | 2018年2月27日 | ...

这是 2.0.0.RELEASE 版本发布前系列博客中的第二篇。

序言

Spring Cloud Stream 2.0 引入了轮询式消费者,应用程序可以控制消息处理速率。

引言

Spring Cloud Stream 包含生产者和消费者的概念;在使用消息传递范式时,MessageChannel 会绑定到目标(例如 Kafka 主题、Rabbit Exchanges/Queues)。到目前为止,在消费者端,只要有空闲的消费者可用,就会传递消息。实际上,消息代理控制着传递速率;通常,下一条消息会在当前消息处理完毕后立即传递。

2.0 版本引入了轮询式消费者,应用程序可以控制消息的消费速率。Kafka 和 RabbitMQ 绑定器都支持轮询式消费者。

详情

使用轮询式消费者时,我们不绑定 MessageChannel 到目标,而是绑定一个 PollableMessageSource;例如,PolledProcessor 绑定可能配置如下:

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

消息源有一个方法

boolean poll(MessageHandler handler);

在处理程序 `handleRequest` 方法退出之前,消息不会被确认。

MessageHandler 是 spring-messaging 中的接口;您可以提供标准的 Spring Integration 消息处理程序之一,或者您自己的实现(通常是 lambda)。由于 `handleMessage` 方法接受一个 `Message<?>` 参数,因此没有类型信息,并且消息体不会被转换。

然而,如本系列第一篇博客 所讨论的,消息转换同样可以应用于轮询式消费者。为了将类型信息传递给转换服务,我们在重载的 `poll()` 方法中提供了一个参数化类型引用:

boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)

消息体将被转换为指定类型,这可以是简单的,例如 `text/plain` 内容类型:

  • new ParameterizedTypeReference<String>() {}

或者更复杂的,例如 JSON 内容类型:

  • new ParameterizedTypeReference<Map<String, Foo>>() {}

综合示例

下面的简单 Spring Boot 应用程序提供了一个完整的示例;它接收 String 消息体,将其转换为大写,并将结果转发到另一个目标。

@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {

  private final Logger logger =
  	  LoggerFactory.getLogger(Blog2Application.class);

  public static void main(String[] args) {
    SpringApplication.run(Blog2Application.class, args);
  }

  @Bean
  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
          logger.info("Received: " + payload);
          dest.send(MessageBuilder.withPayload(payload.toUpperCase())
              .copyHeaders(m.getHeaders())
              .build());
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
          logger.info("Processed a message");
        }
        else {
          logger.info("Nothing to do");
        }
        Thread.sleep(5_000);
      }
    };
  }

  public static interface PolledProcessor {

    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();

  }

}

结论

现在应用程序可以控制消息的消费速率。

有关更多信息,请参阅参考手册中 使用轮询式消费者

我们鼓励您使用以下任一方式提供反馈:

祝您使用愉快!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有