领先一步
VMware 提供培训和认证,以加快您的进步。
了解更多我代表 Spring Integration 团队宣布 Spring Integration 5.0 的第二个里程碑版本,该版本现已在 里程碑版本仓库 中提供。
自 上一个 里程碑版本发布以来,此版本的一些亮点。
当然,首先要感谢您,社区,对您的贡献表示衷心的感谢!
MongoDbOutboundGateway
- 用于执行查询或对集合执行任何任意操作
MongoDB 组件的初始 Java DSL 支持
MongoDB 组件现在可以在其表达式中使用 org.springframework.data.mongodb.core.query.Query
API
@Bean public IntegrationFlow mongoDbGatewayFlow() { return f -> f .handle(MongoDb.outboundGateway(this.mongoTemplate) .collectionCallback(MongoCollection::count) .collectionNameFunction(m -> m.getHeaders().get("collection"))); }
Java DSL IntegrationFlow
现在可以从用 @MessagingGateway
标记的接口开始,并且对目标代理 bean 的所有方法调用都将执行将 Message
发送到下游 IntegrationFlow
的操作。这使您可以省略 @IntegrationComponentScan
和额外的通道配置。例如,用于控制总线组件的简单网关
@MessagingGateway
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
.get();
}
当然,还有一些关于响应式流主题的消息。
MessageChannelReactiveUtils
可用于将任何 MessageChannel
适配到 org.reactivestreams.Publisher
。当您希望以一种松散耦合的方式从一侧“流化”上游数据,并从另一侧获得响应式反压时,这将非常有用。
@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
.subscribe(v -> done.countDown());
此技术现在用于现有的 IntegrationFlowDefinition.toReactivePublisher()
中。
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
ReactiveChannel
现在能够订阅上游 Publisher
,以及常规(但具有反压)send(Message<?>)
实现。这使我们能够引入一个功能,例如从 Publisher
启动 IntegrationFlow
。
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlows.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
这样,将 ReactiveChannel
放在端点之间(MessageChannels.reactive()
),我们可以获得集成和响应式两个世界的最佳优势!
请参阅 新增功能 以获取更多信息。
我们将在接下来的里程碑版本中提供更多功能和改进,敬请期待,如有任何反馈,请随时与我们联系!