Spring Integration 1.2 里程碑版本 2 的 Java DSL 已发布

发布 | Artem Bilan | 2016年9月15日 | ...

我很高兴地宣布,Spring Integration 1.2 M2 的 Java DSL 现已发布!

首先,我要感谢所有创建问题、提交 Pull Request、提供反馈或在 Stack Overflow 上提问的人。尤其要感谢自上一个里程碑版本 1以来的早期使用者。在他们的帮助下,我们改进并修复了一些运行时流程注册问题。

工件org.springframework.integration:spring-integration-java-dsl:1.2.0.M2 可在里程碑仓库中找到。所以,试试看,如有任何反馈,请随时提交GitHub 问题

当前迭代的一些亮点

JPA 支持

在社区多次请求后,我们最终引入了Jpa工厂和相应的`IntegrationComponentSpec`,为Spring Integration JPA组件提供流畅的 API

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManagerFactory)
                    .entityClass(StudentDomain.class)
                    .maxResults(1)
                    .expectSingleResult(true),
                e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
            .channel(c -> c.queue("pollingResults"))
            .get();
}

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
            .handle(Jpa.updatingGateway(this.entityManagerFactory),
                    e -> e.transactional(true))
            .channel(c -> c.queue("persistResults"));
}

@Bean
public IntegrationFlow retrievingGatewayFlow() {
    return f -> f
            .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                    .jpaQuery("from Student s where s.id = :id")
                    .expectSingleResult(true)
                    .parameterExpression("id", "payload"))
            .channel(c -> c.queue("retrieveResults"));
}

流程中间的事务支持

受 Spring Integration JPA 组件事务支持配置复杂性(实际上是编程方式的TransactionalInterceptor)的启发,我们引入了TransactionInterceptorBuilder。此外,我们还提供了TransactionHandleMessageAdvice,它允许从任何端点为整个子流程启动事务,而不仅仅是handleRequestMessage,就像常规ConsumerEndpointSpec.advice()的情况一样。实际上,主要技巧是由HandleMessageAdvice完成的,它最近在Spring Integration Core中引入,这是一个标记接口,用于区分仅适用于handleRequestMessage的建议或从当前MessageHandler开始的流程的建议。为方便起见,已将一堆.transactional()方法添加到ConsumerEndpointSpec

散布-收集支持

散布-收集 EI 模式现在拥有自己的 Java DSL API

@Bean
public IntegrationFlow scatterGatherFlow() {
    return f -> f
      .scatterGather(scatterer -> scatterer
         .applySequence(true)
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10)),
      gatherer -> gatherer
         .releaseStrategy(group ->
                group.size() == 3 ||
                      group.getMessages()
                          .stream()
                          .anyMatch(m -> (Double) m.getPayload() > 5)),
      scatterGather -> scatterGather
        	 .gatherTimeout(10_000));
}

其中scatterer只是一个RecipientListRoutergatherer是一个AggregatingMessageHandler,最后一个Consumer接受ScatterGatherHandler的选项。

更多路由器改进

.routeToRecipients() API 现在为接收者提供了更多配置变体

.routeToRecipients(r -> r
    .recipient("foo-channel", "'foo' == payload")
    .recipient("bar-channel", m ->
        m.getHeaders().containsKey("recipient")
            && (boolean) m.getHeaders().get("recipient"))
    .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
        f -> f.<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("recipientListSubFlow1Result")))
    .recipientFlow((String p) -> p.startsWith("baz"),
        f -> f.transform("Hello "::concat)
            .channel(c -> c.queue("recipientListSubFlow2Result")))
    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                   "bax".equals(m.getPayload())),
        f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
    .defaultOutputToParentFlow())

以前,.route()操作符将IntegrationFlow中的下一个.channel()作为RouterdefaultOutputChannel。根据用户体验,这种无条件的决策并不合理。我们重新设计了.route(),使其与标准AbstractMessageRouter行为保持一致。已添加.defaultOutputChannel().defaultSubFlowMapping()以利用Routerdefault逻辑。为了回退到之前的行为,存在.defaultOutputToParentFlow(),正如您在上面的.routeToRecipients()示例中注意到的那样。

查看1.2.0.M2版本的提交历史以获取更多信息。并且始终阅读 JavaDoc 以了解您使用的 API!

后续步骤

我们预计在几周内发布 1.2 版本的第一个(也是希望是最后一个)候选版本,这需要在 Spring Integration 4.3.2 和 Spring Boot 1.4.1 之后进行一些采用。这足够快了,因为spring-integration-java-dsl将迁移到Spring Integration Core5.0和 Java 8 代码库。当前的1.2版本将继续支持,但仅限于错误修复。

项目页面 | 文档 | 问题 | 帮助

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部