领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多此提示主要来自 Spring XD 团队成员撰写的关于 Spring XD 的流支持 的此维基页面,尤其是来自优秀的 Ilayaperumal Gopinathan 的贡献。
Spring XD 1.1 现已发布,并包含了许多新功能。此版本的一个主题是丰富的流处理支持。Spring XD 1.1 提供了与 Project Reactor Stream
、RxJava Observable
和 Spark 的流处理的集成。
让我们具体看看如何使用 Reactor,尽管所有支持的流 API 的概念都类似。
消息总线上的消息可从输入流中访问。返回值是输出流,它是将各种操作应用于输入流的结果。输出流的内容被发送到消息总线,供其他处理器或接收器使用。要实现基于Stream
的处理器模块,您需要实现接口org.springframework.xd.reactor.Processor
import org.springframework.xd.reactor.Processor;
import org.springframework.xd.tuple.Tuple;
import reactor.rx.Stream;
import static com.acme.Math.avg;
import static org.springframework.xd.tuple.TupleBuilder.tuple;
public class MovingAverage implements Processor<Tuple, Tuple> {
@Override
public Stream<Tuple> process(Stream<Tuple> inputStream) {
return inputStream.map(tuple -> tuple.getDouble("measurement"))
.buffer(5)
.map(data -> tuple().of("average", avg(data)));
}
}
为此编写测试就像设置一个 Spring Integration 流一样简单,该流在请求通道上接收输入,并通过org.springframework.xd.reactor.SynchronousDispatcherMessageHandler
组件将其路由到此处理器,该组件本身将其输出写入输出通道。从那里,您可以打包并在 Spring XD 管理服务器中注册自定义处理器。