更进一步
VMware 提供培训和认证,以加速您的进步。
了解更多这是 Spring Cloud Stream 2.0.0.RELEASE 准备工作中的一系列预发布博客中的第二篇。
Spring Cloud Stream 2.0 引入了轮询消费者,应用程序可以控制消息处理速率。
Spring Cloud Stream 具有生产者和消费者的概念;当使用消息传递范例时,MessageChannel
被绑定到目标(例如 Kafka 主题、Rabbit 交换器/队列)。 迄今为止,在消费者端,只要有空闲消费者可用,就会传递消息。 实际上,broker 控制了传递速率;通常,当前消息被处理后,会立即传递下一条消息。
2.0 引入了轮询消费者,应用程序可以控制消息消耗速率。 Kafka 和 RabbitMQ binder 支持轮询消费者。
使用轮询消费者,我们绑定 PollableMessageSource
而不是将 MessageChannel
绑定到目标;例如,可以这样配置 PolledProcessor
绑定
public interface PolledProcessor {
@Input
PollableMessageSource destIn();
@Output
MessageChannel destOut();
}
消息源有一个方法
boolean poll(MessageHandler handler);
在处理程序的 handleRequest
方法退出之前,消息不会被确认。
MessageHandler
是来自 spring-messaging 的接口;您可以提供标准的 Spring Integration 消息处理程序之一,或者您自己的实现(通常是一个 lambda)。 因为 handleMessage
方法采用 Message<?>
参数,所以没有类型信息,并且不会转换 payload。
但是,消息转换正如本系列的第一篇博客中所讨论的也可以应用于轮询消费者。 为了将类型信息传递给转换服务,我们在重载的 poll()
方法中提供了一个参数化类型引用
boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)
并且消息 payload 将被转换为该类型,它可以很简单,例如,内容类型为 text/plain
new ParameterizedTypeReference<String>() {}
或者更复杂,例如 JSON 内容类型
new ParameterizedTypeReference<Map<String, Foo>>() {}
以下简单的 Spring Boot 应用程序提供了一个完整的示例;它接收 String payload,将其转换为大写并将结果转发到另一个目标。
@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {
private final Logger logger =
LoggerFactory.getLogger(Blog2Application.class);
public static void main(String[] args) {
SpringApplication.run(Blog2Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
应用程序现在可以控制消息的消耗速率。
有关更多信息,请参阅参考手册中的使用轮询消费者。
我们鼓励您使用以下工具之一提供反馈
尽情享用!