Spring Integration Java DSL 1.0 RC1 发布

发布 | Artem Bilan | 2014年10月31日 | ...

亲爱的 Spring 社区:

我们很高兴地宣布,在 Spring Integration 4.1 Release Candidate 发布后不久,Spring Integration Java DSL 1.0 Release Candidate 现已发布。请使用 Maven 或 Gradle 的 Milestone Repository,或下载 分发存档 来进行试用。

有关更多信息,请参阅项目的 主页

此次发布包含许多新功能和改进,以及一些 bug 修复。GA 版本计划在11月中旬发布。

以下是自 上一个里程碑 以来的主要更改摘要

重构和破坏性更改

尽管仍然支持早期 Java 版本,但 Spring Integration Java DSL 主要面向 Java 8 及其 Lambda 支持。我们已删除了一些 函数式接口,转而使用 Java 8 中的类似接口:Consumer<T>Function<T, R> 等。当然,为了支持与旧 Java 版本的向后兼容性,我们在 DSL 源代码中实现了类似的接口。使用旧于 8 的 Java 版本且使用了更改后接口的用户需要进行修改以修复其编译错误。例如

从这里

.handle(Integer.class, (p, h) -> p * 2,
		new EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>>() {
				@Override
				public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
					spec.poller(Pollers.cron("7 * * * * ?"));
				}
		})

到这里

.handle(Integer.class, (p, h) -> p * 2,
		new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
				@Override
				public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
					spec.poller(Pollers.cron("7 * * * * ?"));
				}
		})

当然,如果您在这里使用 Java 8 Lambda,代码将无需更改

.handle(Integer.class, (p, h) -> p * 2, e -> e.poller(Pollers.cron("7 * * * * ?")))

IntegrationFlows 现在只包含 from(...) 方法。.fromFixedMessageChannel() 已被 .from(String messageChannelName, boolean fixedSubscriber) 替换。

此外,为了解决一些包缠绕问题,我们将一些类移动到了不同的包中。

方法作用域函数

为了简化 IDE 的代码补全,并避免冗余地搜索所需的 命名空间工厂,我们添加了带有 Function<T, R> 参数的重载方法。例如,以下代码片段是等效的

.....
.channel(Amqp.pollableChannel(this.rabbitConnectionFactory)
							.queueName("amqpReplyChannel")
							.channelTransacted(true))
....
.channel(c -> c.amqpPollable(this.rabbitConnectionFactory)
							.queueName("amqpReplyChannel")
							.channelTransacted(true))
....

其中 c 变量是 Channel 的“method-aggregator”对象,它委托给相应的 命名空间工厂。其他类似的 Lambda 方法包括

  • IntegrationFlows.from(MessageSourcesFunction sources)
  • IntegrationFlows.from(MessageProducersFunction producers)
  • IntegrationFlows.from(MessagingGatewaysFunction gateways)
  • IntegrationFlowDefinition.handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters)
  • EndpointSpec.poller(Function<PollerFactory, PollerSpec> pollers)

FunctionExpression

Spring Integration 具有强大的 Spring Expression Language (SpEL) 支持。由于 Java DSL 是纯粹的(哦!)Java,因此在 expression 属性中用长字符串指定业务逻辑并没有太大意义。受 Java 8 Lambda 支持的启发,并追求最小改动的目标,我们引入了 FunctionExpression - SpEL Expression 接口的实现 - 它接受一个 Function<T, R> 并在每次 getValue() 时委托给它。现在,DSL 中的许多组件都提供 (Function<T, R> function) 方法作为类似 SpEL 方法的替代。以下是 FtpInboundFileSynchronizingMessageSourcelocalFilename 属性的示例

使用 SpEL

@Bean
public IntegrationFlow ftpInboundFlow() {
	return IntegrationFlows
			.from(s -> s.ftp(this.ftpSessionFactory)
							.remoteDirectory("ftpSource")
							.localFilenameExpression("payload.toUpperCase() + '.a'")
			.channel(c -> c.queue("ftpInboundResultChannel"))
			.get();
}

使用 Lambda

@Bean
public IntegrationFlow ftpInboundFlow() {
	return IntegrationFlows
			.from(s -> s.ftp(this.ftpSessionFactory)
							.remoteDirectory("ftpSource")
							.localFilename(f -> f.toUpperCase() + ".a")))
			.channel(c -> c.queue("ftpInboundResultChannel"))
			.get();
}

FunctionExpression 的其他有趣用法包括 EnricherHeaderEnricher

.enrich(e -> e.requestChannel("enrichChannel")
			.requestPayload(Message::getPayload)
			.propertyFunction("date", m -> new Date()))

FunctionExpression 还支持运行时类型转换,就像标准的 SpelExpression 一样。

子流程

我们为一些 if...elsepublish-subscribe 组件引入了 SubFlow 支持。最简单的例子是 .publishSubscribeChannel()

@Bean
public IntegrationFlow subscribersFlow() {
	return flow -> flow
			.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
					.subscribe(f -> f
							.<Integer>handle((p, h) -> p / 2)
							.channel(c -> c.queue("subscriber1Results")))
					.subscribe(f -> f
							.<Integer>handle((p, h) -> p * 2)
							.channel(c -> c.queue("subscriber2Results"))))
			.<Integer>handle((p, h) -> p * 3)
			.channel(c -> c.queue("subscriber3Results"));
}

当然,使用单独的 IntegrationFlow @Bean 定义也可以达到相同的结果,但我们希望您会发现子流程风格的逻辑组合很有用。

类似的 publish-subscribe 子流程组合由 .routeToRecipients() 提供。

另一个例子是 .filter() 上的 .discardFlow() 而不是 .discardChannel()

.route() 值得特别关注

@Bean
public IntegrationFlow routeFlow() {
	return f -> f
			.<Integer, Boolean>route(p -> p % 2 == 0,
					m -> m.channelMapping("true", "evenChannel")
							.subFlowMapping("false", sf ->
									sf.<Integer>handle((p, h) -> p * 3)))
			.transform(Object::toString)
			.channel(c -> c.queue("oddChannel"));
}

.channelMapping() 继续像常规 Router 映射一样工作,但 .subFlowMapping() 将该子流程与主流程绑定。换句话说,任何路由器的子流程在 .route() 之后都会返回主流程。

类似的“返回主流程”子流程由 .gateway() 支持

@Bean
public IntegrationFlow gatewayFlow() {
        return f -> 
                   f.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
			.gateway(gf -> gf.transform("From Gateway SubFlow: "::concat));
}

然而,这个 Gateway 子流程只是通过显式的 DirectChannel 与主流程连接,并使用该通道作为 requestChannel 选项包装成常规的 GatewayMessageHandler

当然,子流程可以嵌套任意深度,但我们不建议这样做,因为事实上,即使在路由器的情况下,在流程中添加复杂的子流程也会很快变得难以人工解析。

结论

自上一个里程碑以来,我们没有添加更多的 协议特定适配器。并非所有适配器都将直接由 DSL 支持,尽管最常用的适配器享有第一类支持。但是,那些不享有第一类支持的适配器可以通过 .handle() 轻松连接。正如我们之前讨论过的,我们正在寻找输入来确定剩余适配器的实现优先级,所以请不要犹豫分享您的想法和建议!

您可以通过其 源代码参考手册 获取有关这些类和现有类的更多信息。

我们期待尽快收到您的评论和反馈(StackOverflow (spring-integration 标签)、Spring JIRAGitHub),并在我们未来几周的 GA 版本发布前报告您发现的问题。

一如既往,我们非常欢迎 贡献

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有