领先一步
VMware 提供培训和认证,助您加速进步。
了解更多亲爱的 Spring 社区:
我们很高兴地宣布,在 Spring Integration 4.1 Release Candidate 发布后不久,Spring Integration Java DSL 1.0 Release Candidate 现已发布。请使用 Maven 或 Gradle 的 Milestone Repository,或下载 分发存档 来进行试用。
有关更多信息,请参阅项目的 主页。
此次发布包含许多新功能和改进,以及一些 bug 修复。GA 版本计划在11月中旬发布。
以下是自 上一个里程碑 以来的主要更改摘要
重构和破坏性更改
尽管仍然支持早期 Java 版本,但 Spring Integration Java DSL 主要面向 Java 8 及其 Lambda 支持。我们已删除了一些 函数式接口,转而使用 Java 8 中的类似接口:Consumer<T>、Function<T, R> 等。当然,为了支持与旧 Java 版本的向后兼容性,我们在 DSL 源代码中实现了类似的接口。使用旧于 8 的 Java 版本且使用了更改后接口的用户需要进行修改以修复其编译错误。例如
从这里
.handle(Integer.class, (p, h) -> p * 2,
new EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
到这里
.handle(Integer.class, (p, h) -> p * 2,
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
当然,如果您在这里使用 Java 8 Lambda,代码将无需更改
.handle(Integer.class, (p, h) -> p * 2, e -> e.poller(Pollers.cron("7 * * * * ?")))
IntegrationFlows 现在只包含 from(...) 方法。.fromFixedMessageChannel() 已被 .from(String messageChannelName, boolean fixedSubscriber) 替换。
此外,为了解决一些包缠绕问题,我们将一些类移动到了不同的包中。
方法作用域函数
为了简化 IDE 的代码补全,并避免冗余地搜索所需的 命名空间工厂,我们添加了带有 Function<T, R> 参数的重载方法。例如,以下代码片段是等效的
.....
.channel(Amqp.pollableChannel(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
.channel(c -> c.amqpPollable(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
其中 c 变量是 Channel 的“method-aggregator”对象,它委托给相应的 命名空间工厂。其他类似的 Lambda 方法包括
IntegrationFlows.from(MessageSourcesFunction sources)IntegrationFlows.from(MessageProducersFunction producers)IntegrationFlows.from(MessagingGatewaysFunction gateways)IntegrationFlowDefinition.handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters)EndpointSpec.poller(Function<PollerFactory, PollerSpec> pollers)FunctionExpression
Spring Integration 具有强大的 Spring Expression Language (SpEL) 支持。由于 Java DSL 是纯粹的(哦!)Java,因此在 expression 属性中用长字符串指定业务逻辑并没有太大意义。受 Java 8 Lambda 支持的启发,并追求最小改动的目标,我们引入了 FunctionExpression - SpEL Expression 接口的实现 - 它接受一个 Function<T, R> 并在每次 getValue() 时委托给它。现在,DSL 中的许多组件都提供 (Function<T, R> function) 方法作为类似 SpEL 方法的替代。以下是 FtpInboundFileSynchronizingMessageSource 的 localFilename 属性的示例
使用 SpEL
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilenameExpression("payload.toUpperCase() + '.a'")
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
使用 Lambda
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilename(f -> f.toUpperCase() + ".a")))
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
FunctionExpression 的其他有趣用法包括 Enricher 和 HeaderEnricher
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
FunctionExpression 还支持运行时类型转换,就像标准的 SpelExpression 一样。
子流程
我们为一些 if...else 和 publish-subscribe 组件引入了 SubFlow 支持。最简单的例子是 .publishSubscribeChannel()
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
当然,使用单独的 IntegrationFlow @Bean 定义也可以达到相同的结果,但我们希望您会发现子流程风格的逻辑组合很有用。
类似的 publish-subscribe 子流程组合由 .routeToRecipients() 提供。
另一个例子是 .filter() 上的 .discardFlow() 而不是 .discardChannel()。
.route() 值得特别关注
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping() 继续像常规 Router 映射一样工作,但 .subFlowMapping() 将该子流程与主流程绑定。换句话说,任何路由器的子流程在 .route() 之后都会返回主流程。
类似的“返回主流程”子流程由 .gateway() 支持
@Bean
public IntegrationFlow gatewayFlow() {
return f ->
f.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway(gf -> gf.transform("From Gateway SubFlow: "::concat));
}
然而,这个 Gateway 子流程只是通过显式的 DirectChannel 与主流程连接,并使用该通道作为 requestChannel 选项包装成常规的 GatewayMessageHandler。
当然,子流程可以嵌套任意深度,但我们不建议这样做,因为事实上,即使在路由器的情况下,在流程中添加复杂的子流程也会很快变得难以人工解析。
结论
自上一个里程碑以来,我们没有添加更多的 协议特定适配器。并非所有适配器都将直接由 DSL 支持,尽管最常用的适配器享有第一类支持。但是,那些不享有第一类支持的适配器可以通过 .handle() 轻松连接。正如我们之前讨论过的,我们正在寻找输入来确定剩余适配器的实现优先级,所以请不要犹豫分享您的想法和建议!
您可以通过其 源代码 和 参考手册 获取有关这些类和现有类的更多信息。
我们期待尽快收到您的评论和反馈(StackOverflow (spring-integration 标签)、Spring JIRA、GitHub),并在我们未来几周的 GA 版本发布前报告您发现的问题。
一如既往,我们非常欢迎 贡献。