领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我谨代表 Spring Integration 团队,宣布 Spring Integration 5.0 的第二个里程碑版本,该版本已在 Milestone Repository 中提供。
自 上一个 里程碑版本以来,本次发布的一些亮点。
当然,首先要感谢社区的贡献!
MongoDbOutboundGateway - 用于执行查询或在集合上执行任何任意操作
MongoDB 组件的初始 Java DSL 支持
现在,MongoDB 组件可以在其表达式中使用 org.springframework.data.mongodb.core.query.Query API
@Bean public IntegrationFlow mongoDbGatewayFlow() { return f -> f .handle(MongoDb.outboundGateway(this.mongoTemplate) .collectionCallback(MongoCollection::count) .collectionNameFunction(m -> m.getHeaders().get("collection"))); }
Java DSL 中的 IntegrationFlow 现在可以从标记有 @MessagingGateway 的接口开始,目标代理 bean 上的所有方法调用将执行将 Message 发送到下游 IntegrationFlow。这允许您省略 @IntegrationComponentScan 和额外的通道配置。例如,一个用于 Control Bus 组件的简单网关
@MessagingGateway
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
.get();
}
当然,还有一些关于 Reactive Streams 的消息。
MessageChannelReactiveUtils 可用于将任何 MessageChannel 适配到 org.reactivestreams.Publisher。这在您希望通过集成以松散耦合的方式,“flux”上游数据,并从另一端获得响应式背压时非常有用。
@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
.subscribe(v -> done.countDown());
此技术现在已用于现有的 IntegrationFlowDefinition.toReactivePublisher()
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
ReactiveChannel 现在除了常规(但具有背压)的 send(Message<?>) 实现之外,还可以订阅上游 Publisher。这使我们能够引入诸如从 Publisher 启动 IntegrationFlow 之类的功能。
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlows.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
通过在端点之间(MessageChannels.reactive())明智地放置 ReactiveChannel,我们可以充分发挥集成和响应式世界的优势!
有关更多信息,请参阅 What’s New。
我们将在接下来的里程碑版本中提供更多功能和改进,敬请期待,并随时向我们反馈!