Spring XD 1.1 中的流处理

工程 | Josh Long | 2015年2月20日 | ...

这篇技巧文章很大程度上借鉴了这篇关于 Spring XD 的流式支持的 Wiki 页面,由多位 Spring XD 团队成员编写,特别是才华横溢的 Ilayaperumal Gopinathan

Spring XD 1.1 现已发布,包含了许多新功能。本次发布的一个主题是丰富的流处理支持。Spring XD 1.1 提供了与 Project ReactorStreamRxJavaObservable 以及 Spark 的流式处理的集成。

我们将专门探讨使用 Reactor,尽管这些概念在所有支持的流式 API 中都是相似的。

在消息总线上接收到的消息可以从输入流中访问。返回值是由对输入流应用各种操作的结果生成的输出流。输出流的内容将被发送到消息总线,供其他处理器或接收器消费。要实现一个基于 Stream 的处理器模块,您需要实现 org.springframework.xd.reactor.Processor 接口。

import org.springframework.xd.reactor.Processor;
import org.springframework.xd.tuple.Tuple;
import reactor.rx.Stream;

import static com.acme.Math.avg;
import static org.springframework.xd.tuple.TupleBuilder.tuple;

public class MovingAverage implements Processor<Tuple, Tuple> {

  @Override
  public Stream<Tuple> process(Stream<Tuple> inputStream) {
    return inputStream.map(tuple -> tuple.getDouble("measurement"))
      .buffer(5)
      .map(data -> tuple().of("average", avg(data)));
  }
}

为这个模块编写测试非常简单,只需设置一个 Spring Integration 流,该流通过一个 org.springframework.xd.reactor.SynchronousDispatcherMessageHandler 组件在请求通道上接收输入,并将其路由到此处理器,该组件本身将其输出写入输出通道。然后,您可以 打包注册 自定义处理器到 Spring XD 管理服务器。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有