Spring XD 1.1 中的流处理

工程 | Josh Long | 2015 年 2 月 20 日 | ...

此提示主要来自 Spring XD 团队成员撰写的关于 Spring XD 的流支持 的此维基页面,尤其是来自优秀的 Ilayaperumal Gopinathan 的贡献。

Spring XD 1.1 现已发布,并包含了许多新功能。此版本的一个主题是丰富的流处理支持。Spring XD 1.1 提供了与 Project Reactor StreamRxJava ObservableSpark 的流处理的集成。

让我们具体看看如何使用 Reactor,尽管所有支持的流 API 的概念都类似。

消息总线上的消息可从输入流中访问。返回值是输出流,它是将各种操作应用于输入流的结果。输出流的内容被发送到消息总线,供其他处理器或接收器使用。要实现基于Stream的处理器模块,您需要实现接口org.springframework.xd.reactor.Processor

import org.springframework.xd.reactor.Processor;
import org.springframework.xd.tuple.Tuple;
import reactor.rx.Stream;

import static com.acme.Math.avg;
import static org.springframework.xd.tuple.TupleBuilder.tuple;

public class MovingAverage implements Processor<Tuple, Tuple> {

  @Override
  public Stream<Tuple> process(Stream<Tuple> inputStream) {
    return inputStream.map(tuple -> tuple.getDouble("measurement"))
      .buffer(5)
      .map(data -> tuple().of("average", avg(data)));
  }
}

为此编写测试就像设置一个 Spring Integration 流一样简单,该流在请求通道上接收输入,并通过org.springframework.xd.reactor.SynchronousDispatcherMessageHandler组件将其路由到此处理器,该组件本身将其输出写入输出通道。从那里,您可以打包并在 Spring XD 管理服务器中注册自定义处理器。

获取 Spring 时事通讯

通过 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部