领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我谨代表 Spring Integration 团队宣布 Spring Integration 的第二个里程碑版本 5.0,该版本已在 里程碑仓库中提供。
自之前的里程碑版本以来,此版本的一些亮点。
当然,首先要非常感谢您,社区,您的贡献!
MongoDbOutboundGateway
- 用于对集合执行查询或任何任意操作
对 MongoDB 组件的初始 Java DSL 支持
MongoDb 组件现在可以在其表达式中使用 org.springframework.data.mongodb.core.query.Query
API
@Bean public IntegrationFlow mongoDbGatewayFlow() { return f -> f .handle(MongoDb.outboundGateway(this.mongoTemplate) .collectionCallback(MongoCollection::count) .collectionNameFunction(m -> m.getHeaders().get("collection"))); }
Java DSL IntegrationFlow
现在可以从标记有 @MessagingGateway
的接口开始,并且对目标代理 bean 的所有方法调用都将执行向下游 IntegrationFlow
发送 Message
的操作。 这使您可以省略 @IntegrationComponentScan
和额外的通道配置。 例如,Control Bus 组件的简单网关
@MessagingGateway
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
.get();
}
当然,还有一些关于响应式流主题的新闻。
MessageChannelReactiveUtils
可用于将任何 MessageChannel
适配到 org.reactivestreams.Publisher
。 当您想以一种松散耦合的方式从一侧“flux”上游数据,而从另一侧进行反应式反压时,这可能很有用
@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
.subscribe(v -> done.countDown());
此技术现在用于现有的 IntegrationFlowDefinition.toReactivePublisher()
中
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
ReactiveChannel
现在除了常规的(但具有反压)send(Message<?>)
实现之外,还能够订阅上游 Publisher
。 这使我们能够引入像从 Publisher
启动 IntegrationFlow
这样的功能
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlows.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
通过这种明智的方式将 ReactiveChannel
放置在端点之间(MessageChannels.reactive()
),我们可以同时获得集成和反应式世界的最佳效果!
有关更多信息,请参见新增功能。
我们将在下一个里程碑中提供更多功能和改进,因此,请继续关注,并随时向我们提供任何反馈!