Spring Integration 5.0 Milestone 2 已发布

发布 | Artem Bilan | 2017 年 01 月 05 日 | ...

我谨代表 Spring Integration 团队,宣布 Spring Integration 5.0 的第二个里程碑版本,该版本已在 Milestone Repository 中提供。

上一个 里程碑版本以来,本次发布的一些亮点。

当然,首先要感谢社区的贡献!

MongoDB 改进

  • MongoDbOutboundGateway - 用于执行查询或在集合上执行任何任意操作

  • MongoDB 组件的初始 Java DSL 支持

  • 现在,MongoDB 组件可以在其表达式中使用 org.springframework.data.mongodb.core.query.Query API

    @Bean public IntegrationFlow mongoDbGatewayFlow() { return f -> f .handle(MongoDb.outboundGateway(this.mongoTemplate) .collectionCallback(MongoCollection::count) .collectionNameFunction(m -> m.getHeaders().get("collection"))); }

@MessagingGateway 和 Java DSL

Java DSL 中的 IntegrationFlow 现在可以从标记有 @MessagingGateway 的接口开始,目标代理 bean 上的所有方法调用将执行将 Message 发送到下游 IntegrationFlow。这允许您省略 @IntegrationComponentScan 和额外的通道配置。例如,一个用于 Control Bus 组件的简单网关

@MessagingGateway
public interface ControlBusGateway {

    void send(String command);

}
...
@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
                 .controlBus()
                 .get();
}

Reactive Streams 支持

当然,还有一些关于 Reactive Streams 的消息。

MessageChannelReactiveUtils 可用于将任何 MessageChannel 适配到 org.reactivestreams.Publisher。这在您希望通过集成以松散耦合的方式,“flux”上游数据,并从另一端获得响应式背压时非常有用。

@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
                   .map(Message::getPayload)
                   .map(String::toUpperCase)
                   .doOnNext(results::add)
                   .subscribe(v -> done.countDown());

此技术现在已用于现有的 IntegrationFlowDefinition.toReactivePublisher()

@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
    return IntegrationFlows
             .from("inputChannel")
             .split(s -> s.delimiters(","))
             .<String, Integer>transform(Integer::parseInt)
             .channel(MessageChannels.queue())
             .toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;

ReactiveChannel 现在除了常规(但具有背压)的 send(Message<?>) 实现之外,还可以订阅上游 Publisher。这使我们能够引入诸如从 Publisher 启动 IntegrationFlow 之类的功能。

Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
        IntegrationFlows.from(messageFlux)
                .<Integer, Integer>transform(p -> p * 2)
                .channel(resultChannel)
                .get();

this.integrationFlowContext.registration(integrationFlow)
        .register();

通过在端点之间(MessageChannels.reactive())明智地放置 ReactiveChannel,我们可以充分发挥集成和响应式世界的优势!

有关更多信息,请参阅 What’s New

我们将在接下来的里程碑版本中提供更多功能和改进,敬请期待,并随时向我们反馈!

项目主页 | GitHub | 帮助 | 文档 | 聊天

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有