领先一步
VMware 提供培训和认证,助你加速进步。
了解更多我很高兴地宣布,适用于 Spring Integration 的 Java DSL 1.2 M2
现已发布!
首先,我要感谢所有创建 issue、提交 Pull Request、提供反馈或仅在 StackOverflow 上提问的人。特别感谢自上一个Milestone 1以来的早期采用者。在他们的帮助下,我们改进并修复了运行时流注册的一些问题。
Artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.M2
已在Milestone 仓库中可用。所以,请尝试一下,如有任何反馈,请随时提交 GitHub issue!
当前迭代的一些亮点
经过众多社区请求,我们最终引入了 Jpa
Factory 和相应的 `IntegrationComponentSpec`,为Spring Integration JPA 组件提供了流畅的 API。
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("pollingResults"))
.get();
}
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
受到 Spring Integration JPA 组件事务支持配置复杂性的“启发”(实际上是编程式的 TransactionalInterceptor
),我们引入了 TransactionInterceptorBuilder
。此外,我们提供了 TransactionHandleMessageAdvice
,它允许从任何端点启动整个子流的事务,而不仅仅是像常规 ConsumerEndpointSpec.advice()
那样仅针对 handleRequestMessage
。实际上,主要的功能是由最近在Spring Integration Core 中引入的 HandleMessageAdvice
实现的,它是一个标记接口,用于区分仅针对 handleRequestMessage
的 advice 或针对从当前 MessageHandler
开始的整个流程的 advice。为方便起见,已向 ConsumerEndpointSpec
添加了多个 .transactional()
方法。
Scatter-Gather EI 模式现在有了自己的 Java DSL API。
@Bean
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10)),
gatherer -> gatherer
.releaseStrategy(group ->
group.size() == 3 ||
group.getMessages()
.stream()
.anyMatch(m -> (Double) m.getPayload() > 5)),
scatterGather -> scatterGather
.gatherTimeout(10_000));
}
其中 scatterer
只是一个 RecipientListRouter
,gatherer
是一个 AggregatingMessageHandler
,而最后一个 Consumer
接受 ScatterGatherHandler
的选项。
.routeToRecipients()
API 现在为 recipient 提供了更多配置变体。
.routeToRecipients(r -> r
.recipient("foo-channel", "'foo' == payload")
.recipient("bar-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("baz"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"bax".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
之前,.route()
操作符会将 IntegrationFlow
中的下一个 .channel()
作为 Router
的 defaultOutputChannel
。根据用户经验,无条件地做出这样的决定并不合理。我们重做了 .route()
,使其与标准的 AbstractMessageRouter
行为对齐。已添加 .defaultOutputChannel()
和 .defaultSubFlowMapping()
来利用 Router
的 default
逻辑。为了回退到之前的行为,提供了 .defaultOutputToParentFlow()
,正如你在上面的 .routeToRecipients()
示例中所注意到的。
有关 1.2.0.M2
版本的更多信息,请参阅提交历史。务必阅读 JavaDocs 以理解你使用的 API!
我们预计在 Spring Integration 4.3.2 和 Spring Boot 1.4.1 有一定采用后,将在几周内发布 1.2
版本的第一个(也是希望是最后一个)Release Candidate。这很快就会发生,因为 spring-integration-java-dsl
将迁移到Spring Integration Core 5.0
和 Java 8 代码库。当前的 1.2
版本仍将得到支持,但仅限于 bug 修复。