Spring Integration 5.0 里程碑版本 2 发布

发布 | Artem Bilan | 2017年1月5日 | ...

我代表 Spring Integration 团队宣布 Spring Integration 5.0 的第二个里程碑版本,该版本现已在 里程碑版本仓库 中提供。

上一个 里程碑版本发布以来,此版本的一些亮点。

当然,首先要感谢您,社区,对您的贡献表示衷心的感谢!

MongoDB 改进

  • MongoDbOutboundGateway - 用于执行查询或对集合执行任何任意操作

  • MongoDB 组件的初始 Java DSL 支持

  • MongoDB 组件现在可以在其表达式中使用 org.springframework.data.mongodb.core.query.Query API

    @Bean public IntegrationFlow mongoDbGatewayFlow() { return f -> f .handle(MongoDb.outboundGateway(this.mongoTemplate) .collectionCallback(MongoCollection::count) .collectionNameFunction(m -> m.getHeaders().get("collection"))); }

@MessagingGateway 和 Java DSL

Java DSL IntegrationFlow 现在可以从用 @MessagingGateway 标记的接口开始,并且对目标代理 bean 的所有方法调用都将执行将 Message 发送到下游 IntegrationFlow 的操作。这使您可以省略 @IntegrationComponentScan 和额外的通道配置。例如,用于控制总线组件的简单网关

@MessagingGateway
public interface ControlBusGateway {

    void send(String command);

}
...
@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
                 .controlBus()
                 .get();
}

响应式流支持

当然,还有一些关于响应式流主题的消息。

MessageChannelReactiveUtils 可用于将任何 MessageChannel 适配到 org.reactivestreams.Publisher。当您希望以一种松散耦合的方式从一侧“流化”上游数据,并从另一侧获得响应式反压时,这将非常有用。

@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
                   .map(Message::getPayload)
                   .map(String::toUpperCase)
                   .doOnNext(results::add)
                   .subscribe(v -> done.countDown());

此技术现在用于现有的 IntegrationFlowDefinition.toReactivePublisher() 中。

@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
    return IntegrationFlows
             .from("inputChannel")
             .split(s -> s.delimiters(","))
             .<String, Integer>transform(Integer::parseInt)
             .channel(MessageChannels.queue())
             .toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;

ReactiveChannel 现在能够订阅上游 Publisher,以及常规(但具有反压)send(Message<?>) 实现。这使我们能够引入一个功能,例如从 Publisher 启动 IntegrationFlow

Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
        IntegrationFlows.from(messageFlux)
                .<Integer, Integer>transform(p -> p * 2)
                .channel(resultChannel)
                .get();

this.integrationFlowContext.registration(integrationFlow)
        .register();

这样,将 ReactiveChannel 放在端点之间(MessageChannels.reactive()),我们可以获得集成和响应式两个世界的最佳优势!

请参阅 新增功能 以获取更多信息。

我们将在接下来的里程碑版本中提供更多功能和改进,敬请期待,如有任何反馈,请随时与我们联系!

项目页面 | GitHub | 帮助 | 文档 | 聊天

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加快您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部