Spring Integration Java DSL 1.0 RC1 发布

发布 | Artem Bilan | 2014年10月31日 | ...

尊敬的 Spring 社区:

我们高兴地宣布,紧随Spring Integration 4.1 发布候选版本之后,Spring Integration Java DSL 1.0 发布候选版本现已推出。请使用 Maven 或 Gradle 的里程碑仓库,或下载发行版存档来试用它。

请访问项目主页了解更多信息。

此版本包含许多新功能和改进,以及一些错误修复。GA 版本计划于 11 月中旬发布。

以下是自上次里程碑版本以来的主要更改摘要:

重构和重大更改

虽然仍然支持较早的 Java 版本,但 Spring Integration Java DSL 主要面向 Java 8 及其 Lambda 支持。我们已删除了一些 `functional interfaces`,转而使用 Java 8 中类似的接口:`Consumer<T>`、`Function<T, R>` 等。当然,为了向后兼容旧版 Java 版本,我们在 DSL 源代码中实现了类似的接口。使用 Java 8 以下版本的更改接口的用户需要修改代码以修复编译错误。例如:

从这里

.handle(Integer.class, (p, h) -> p * 2,
		new EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>>() {
				@Override
				public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
					spec.poller(Pollers.cron("7 * * * * ?"));
				}
		})

到这

.handle(Integer.class, (p, h) -> p * 2,
		new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
				@Override
				public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
					spec.poller(Pollers.cron("7 * * * * ?"));
				}
		})

当然,如果您在此处使用 Java 8 Lambda,则代码不需要更改。

.handle(Integer.class, (p, h) -> p * 2, e -> e.poller(Pollers.cron("7 * * * * ?")))

`IntegrationFlows` 现在仅包含 `from(...)` 方法。`fromFixedMessageChannel()` 已被 `from(String messageChannelName, boolean fixedSubscriber)` 替换。

此外,为了解决一些包混乱问题,我们已将某些类移动到不同的包。

方法作用域函数

为了简化 IDE 的代码补全并避免冗余搜索所需的 `Namespace Factory`,我们添加了带有 `Function<T, R>` 参数的重载方法。例如,这些代码片段是相等的:

.....
.channel(Amqp.pollableChannel(this.rabbitConnectionFactory)
							.queueName("amqpReplyChannel")
							.channelTransacted(true))
....
.channel(c -> c.amqpPollable(this.rabbitConnectionFactory)
							.queueName("amqpReplyChannel")
							.channelTransacted(true))
....

其中 `c` 变量是 `Channel` 的“方法聚合器”对象,它委托给相应的 `Namespace Factory`。其他类似的 Lambda 方法包括:

  • IntegrationFlows.from(MessageSourcesFunction sources)
  • IntegrationFlows.from(MessageProducersFunction producers)
  • IntegrationFlows.from(MessagingGatewaysFunction gateways)
  • IntegrationFlowDefinition.handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters)
  • EndpointSpec.poller(Function<PollerFactory, PollerSpec> pollers)

FunctionExpression

Spring Integration 具有强大的 Spring 表达式语言 (SpEL) 支持。由于 Java DSL 是纯 (eh!) Java,因此在 `expression` 属性中指定一些业务逻辑的长字符串实际上没有意义。受到 Java 8 Lambda 支持的启发,并力求最小化更改,我们引入了 `FunctionExpression`——SpEL `Expression` 接口的一种实现——它接受一个 `Function<T, R>` 并在每次 `getValue()` 时委托给它。现在,DSL 中的许多组件都提供 `(Function<T, R> function)` 方法作为类似 SpEL 方法的替代方案。以下是 `FtpInboundFileSynchronizingMessageSource` 的 `localFilename` 属性的示例:

使用 SpEL

@Bean
public IntegrationFlow ftpInboundFlow() {
	return IntegrationFlows
			.from(s -> s.ftp(this.ftpSessionFactory)
							.remoteDirectory("ftpSource")
							.localFilenameExpression("payload.toUpperCase() + '.a'")
			.channel(c -> c.queue("ftpInboundResultChannel"))
			.get();
}

使用 Lambda

@Bean
public IntegrationFlow ftpInboundFlow() {
	return IntegrationFlows
			.from(s -> s.ftp(this.ftpSessionFactory)
							.remoteDirectory("ftpSource")
							.localFilename(f -> f.toUpperCase() + ".a")))
			.channel(c -> c.queue("ftpInboundResultChannel"))
			.get();
}

`FunctionExpression` 的其他有趣用途是 `Enricher` 和 `HeaderEnricher`。

.enrich(e -> e.requestChannel("enrichChannel")
			.requestPayload(Message::getPayload)
			.propertyFunction("date", m -> new Date()))

`FunctionExpression` 还支持运行时类型转换,就像在标准 `SpelExpression` 中一样。

子流程

我们为某些 `if...else` 和 `发布-订阅` 组件引入了 `子流程` 支持。最简单的示例是 `publishSubscribeChannel()`。

@Bean
public IntegrationFlow subscribersFlow() {
	return flow -> flow
			.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
					.subscribe(f -> f
							.<Integer>handle((p, h) -> p / 2)
							.channel(c -> c.queue("subscriber1Results")))
					.subscribe(f -> f
							.<Integer>handle((p, h) -> p * 2)
							.channel(c -> c.queue("subscriber2Results"))))
			.<Integer>handle((p, h) -> p * 3)
			.channel(c -> c.queue("subscriber3Results"));
}

当然,我们可以使用单独的 `IntegrationFlow` `@Bean` 定义来实现相同的结果,但我们希望您会发现子流程逻辑组合方式很有用。

`routeToRecipients()` 提供了类似的 `发布-订阅` 子流程组合。

另一个示例是在 `filter()` 上使用 `discardFlow()` 代替 `discardChannel()`。

`route()` 值得特别关注。

@Bean
public IntegrationFlow routeFlow() {
	return f -> f
			.<Integer, Boolean>route(p -> p % 2 == 0,
					m -> m.channelMapping("true", "evenChannel")
							.subFlowMapping("false", sf ->
									sf.<Integer>handle((p, h) -> p * 3)))
			.transform(Object::toString)
			.channel(c -> c.queue("oddChannel"));
}

`channelMapping()` 的工作方式与常规 `Router` 映射相同,但 `subFlowMapping()` 将该子流程与主流程绑定。换句话说,任何路由器的子流程在 `route()` 后都会返回主流程。

`gateway()` 支持类似的“返回主流程”子流程。

@Bean
public IntegrationFlow gatewayFlow() {
        return f -> 
                   f.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
			.gateway(gf -> gf.transform("From Gateway SubFlow: "::concat));
}

但是,此 Gateway 子流程只是通过显式的 `DirectChannel` 与主流程连接,并使用该通道作为 `requestChannel` 选项包装到常规 `GatewayMessageHandler` 中。

当然,子流程可以嵌套到任何深度,但我们不建议这样做,因为实际上,即使在路由器情况下,在流程中添加复杂的子流程也会很快变得难以让人理解。

结论

自从上次里程碑版本以来,我们没有添加更多 `协议特定适配器`。并非所有适配器都将直接由 DSL 支持,尽管最常用的适配器具有第一类支持。但是,那些没有第一类支持的适配器可以使用 `.handle()`轻松连接。正如我们之前讨论的那样,我们正在寻求意见以优先实施剩余的适配器,因此,请不要犹豫,分享您的想法和意见!

您可以从它们的源代码参考手册中获取有关这些类和现有类的更多信息。

我们期待您尽快发表评论和反馈(StackOverflow(`spring-integration` 标签)、Spring JIRAGitHub),并在我们 GA 之前报告您发现的问题(大约两周内)。

与往常一样,我们非常欢迎贡献

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部