领先一步
VMware 提供培训和认证,助您加速进步。
了解更多这篇技巧文章很大程度上借鉴了这篇关于 Spring XD 的流式支持的 Wiki 页面,由多位 Spring XD 团队成员编写,特别是才华横溢的 Ilayaperumal Gopinathan。
Spring XD 1.1 现已发布,包含了许多新功能。本次发布的一个主题是丰富的流处理支持。Spring XD 1.1 提供了与 Project Reactor 的 Stream、RxJava 的 Observable 以及 Spark 的流式处理的集成。
我们将专门探讨使用 Reactor,尽管这些概念在所有支持的流式 API 中都是相似的。
在消息总线上接收到的消息可以从输入流中访问。返回值是由对输入流应用各种操作的结果生成的输出流。输出流的内容将被发送到消息总线,供其他处理器或接收器消费。要实现一个基于 Stream 的处理器模块,您需要实现 org.springframework.xd.reactor.Processor 接口。
import org.springframework.xd.reactor.Processor;
import org.springframework.xd.tuple.Tuple;
import reactor.rx.Stream;
import static com.acme.Math.avg;
import static org.springframework.xd.tuple.TupleBuilder.tuple;
public class MovingAverage implements Processor<Tuple, Tuple> {
@Override
public Stream<Tuple> process(Stream<Tuple> inputStream) {
return inputStream.map(tuple -> tuple.getDouble("measurement"))
.buffer(5)
.map(data -> tuple().of("average", avg(data)));
}
}
为这个模块编写测试非常简单,只需设置一个 Spring Integration 流,该流通过一个 org.springframework.xd.reactor.SynchronousDispatcherMessageHandler 组件在请求通道上接收输入,并将其路由到此处理器,该组件本身将其输出写入输出通道。然后,您可以 打包 并 注册 自定义处理器到 Spring XD 管理服务器。