遥遥领先
VMware 提供培训和认证,以加速您的进步。
了解更多此技巧大量借鉴了 Spring XD 团队成员在 Spring XD 的流处理支持 上的这个 Wiki 页面,尤其是 Ilayaperumal Gopinathan 的精彩贡献
Spring XD 1.1 发布了,包含许多新功能。此版本的一个主题是丰富的流处理支持。Spring XD 1.1 提供了与 Project Reactor Stream
s,RxJava Observable
s 和 Spark 流处理的集成。
让我们专门研究一下 Reactor 的使用,尽管这些概念在所有受支持的流处理 API 中都是相似的。
在消息总线上传递的消息可以从输入流访问。返回值是输出流,它是将各种操作应用于输入流的结果。输出流的内容被发送到消息总线,供其他处理器或接收器使用。要实现基于 Stream
的处理器模块,你需要实现接口 org.springframework.xd.reactor.Processor
。
import org.springframework.xd.reactor.Processor;
import org.springframework.xd.tuple.Tuple;
import reactor.rx.Stream;
import static com.acme.Math.avg;
import static org.springframework.xd.tuple.TupleBuilder.tuple;
public class MovingAverage implements Processor<Tuple, Tuple> {
@Override
public Stream<Tuple> process(Stream<Tuple> inputStream) {
return inputStream.map(tuple -> tuple.getDouble("measurement"))
.buffer(5)
.map(data -> tuple().of("average", avg(data)));
}
}
为此编写测试就像设置一个 Spring Integration 流一样简单,该流在请求通道上接收输入,并通过 org.springframework.xd.reactor.SynchronousDispatcherMessageHandler
组件将其路由到此处理器,该组件本身将其输出写入输出通道。从那里,你可以打包和注册 Spring XD 管理服务器中的自定义处理器。