领先一步
VMware 提供培训和认证,助您快速提升。
了解更多亲爱的 Spring 社区成员,
我们很高兴地宣布,继 Spring Integration 4.1 Release Candidate 发布后不久,Spring Integration Java DSL 1.0 Release Candidate 现已可用。请使用 Maven 或 Gradle 配置 Milestone Repository,或下载 发行版归档文件,来试用一下。
有关更多信息,请参阅项目主页。
此版本包含了许多新特性和改进,以及大量的 bug 修复。GA (General Availability) 版本计划于 11 月中旬发布。
以下是自上一个里程碑版本以来的主要变更摘要:
重构和破坏性变更
虽然仍支持更早的 Java 版本,但 Spring Integration Java DSL 主要针对 Java 8 及其 Lambda 支持。我们移除了一些 functional interfaces,转而使用 Java 8 中类似的接口,如 Consumer<T>、Function<T, R> 等。当然,为了支持与旧 Java 版本的向后兼容性,我们在 DSL 源代码中实现了类似的接口。使用更改后的接口且 Java 版本低于 8 的用户需要进行修改以解决编译错误。例如:
从此
.handle(Integer.class, (p, h) -> p * 2,
new EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
到此
.handle(Integer.class, (p, h) -> p * 2,
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
当然,如果您在此处使用 Java 8 Lambda,则代码无需更改
.handle(Integer.class, (p, h) -> p * 2, e -> e.poller(Pollers.cron("7 * * * * ?")))
IntegrationFlows 现在只包含 from(...) 方法。.fromFixedMessageChannel() 已被 .from(String messageChannelName, boolean fixedSubscriber) 替换。
此外,为了解决一些包缠绕(package tangle)问题,我们将一些类移动到了不同的包中。
方法范围函数
为了简化 IDE 中的代码补全并避免对所需 Namespace Factory 进行冗余搜索,我们添加了带有 Function<T, R> 参数的重载方法。例如,这些代码片段是等效的:
.....
.channel(Amqp.pollableChannel(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
.channel(c -> c.amqpPollable(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
其中变量 c 是 Channel 的“方法聚合器”对象,它委托给适当的 Namespace Factory。其他类似的 Lambda 方法有:
IntegrationFlows.from(MessageSourcesFunction sources)IntegrationFlows.from(MessageProducersFunction producers)IntegrationFlows.from(MessagingGatewaysFunction gateways)IntegrationFlowDefinition.handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters)EndpointSpec.poller(Function<PollerFactory, PollerSpec> pollers)FunctionExpression
Spring Integration 拥有出色的 Spring Expression Language (SpEL) 支持。由于 Java DSL 是纯 (呃!) Java,因此在 expression 属性的长字符串中指定某些业务逻辑意义不大。受 Java 8 Lambda 支持的启发,并为了追求最小化变更,我们引入了 FunctionExpression——SpEL Expression 接口的一个实现——它接受一个 Function<T, R> 并在每次调用 getValue() 时委托给它。现在,DSL 中的许多组件都提供了 (Function<T, R> function) 方法,作为类似 SpEL 方法的替代方案。以下是 FtpInboundFileSynchronizingMessageSource 的 localFilename 属性示例:
使用 SpEL
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilenameExpression("payload.toUpperCase() + '.a'")
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
使用 Lambda
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilename(f -> f.toUpperCase() + ".a")))
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
FunctionExpression 的其他有趣用法是 Enricher 和 HeaderEnricher
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
FunctionExpression 也支持运行时类型转换,与标准的 SpelExpression 类似。
子流(SubFlows)
我们为一些 if...else 和 publish-subscribe 组件引入了 SubFlow 支持。最简单的例子是 .publishSubscribeChannel()
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
当然,相同的结果也可以通过单独的 IntegrationFlow @Bean 定义来实现,但我们希望您会发现子流风格的逻辑组合很有用。
类似的 publish-subscribe 子流组合由 .routeToRecipients() 提供。
另一个例子是在 .filter() 上使用 .discardFlow() 而不是 .discardChannel()。
.route() 值得特别关注
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping() 继续像常规 Router 映射一样工作,但 .subFlowMapping() 将子流与主流绑定在一起。换句话说,任何路由器的子流在 .route() 之后都会返回主流程。
类似的“返回主流程”子流由 .gateway() 支持
@Bean
public IntegrationFlow gatewayFlow() {
return f ->
f.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway(gf -> gf.transform("From Gateway SubFlow: "::concat));
}
然而,这个 Gateway SubFlow 只是通过显式的 DirectChannel 与主流程连接,并使用该通道作为 requestChannel 选项包装到常规的 GatewayMessageHandler 中。
当然,子流可以任意深度嵌套,但我们不建议这样做,因为实际上,即使在路由器的情况下,在流程中添加复杂的子流也会很快变得难以被人理解。
结论
自上一个里程碑版本以来,我们没有添加更多 protocol specific adapters。并非所有适配器都会被 DSL 直接支持,尽管最常用的适配器拥有头等支持。但是,那些没有头等支持的适配器可以使用 .handle() 轻松连接。正如我们之前讨论的,我们正在寻求您的意见来优先实现剩余的适配器,所以请不要吝啬分享您的想法和建议!
您可以从它们的源代码和参考手册中获取关于这些类和现有类的更多信息。
我们期待尽快收到您的评论和反馈(StackOverflow (spring-integration 标签),Spring JIRA,GitHub),并在我们 GA 之前的几周内报告您发现的问题。
一如既往,我们非常欢迎贡献。