Java DSL for Spring Integration 1.2 Milestone 2 现已发布

发布 | Artem Bilan | 2016年9月15日 | ...

我很高兴地宣布,Spring Integration 的 Java DSL 1.2 M2 现已可用!

首先,我想感谢所有创建 issues、提出 Pull Requests、提供反馈或仅仅是在 StackOverflow 上提问的人。特别感谢早期采用者,他们使用了之前的 Milestone 1。在他们的帮助下,我们改进并修复了一些运行时流注册方面的问题。

构件 org.springframework.integration:spring-integration-java-dsl:1.2.0.M2 可在 Milestone 仓库 中获取。所以,请试用一下,并随时就任何反馈提出 GH issue

当前迭代的一些亮点

JPA 支持

在收到大量社区请求后,我们最终引入了 Jpa 工厂和相应的 `IntegrationComponentSpec`,为 Spring Integration JPA 组件提供流畅的 API。

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManagerFactory)
                    .entityClass(StudentDomain.class)
                    .maxResults(1)
                    .expectSingleResult(true),
                e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
            .channel(c -> c.queue("pollingResults"))
            .get();
}

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
            .handle(Jpa.updatingGateway(this.entityManagerFactory),
                    e -> e.transactional(true))
            .channel(c -> c.queue("persistResults"));
}

@Bean
public IntegrationFlow retrievingGatewayFlow() {
    return f -> f
            .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                    .jpaQuery("from Student s where s.id = :id")
                    .expectSingleResult(true)
                    .parameterExpression("id", "payload"))
            .channel(c -> c.queue("retrieveResults"));
}

流中间事务支持

受 Spring Integration JPA 组件事务支持配置复杂性(实际上是编程式 TransactionalInterceptor)的“启发”,我们引入了 TransactionInterceptorBuilder。此外,我们提供了 TransactionHandleMessageAdvice,它允许从任何端点为整个子流启动事务,而不仅仅是 handleRequestMessage,正如常规 ConsumerEndpointSpec.advice() 的情况一样。实际上,主要技巧是由最近在 Spring Integration Core 中引入的 HandleMessageAdvice 完成的,这是一个标记接口,用于区分仅针对 handleRequestMessage 的 advice 或针对从当前 MessageHandler 开始的流的 advice。为了方便起见,已经向 ConsumerEndpointSpec 添加了一系列 .transactional() 方法。

Scatter-Gather 支持

现在,Scatter-Gather EI 模式有了自己的 Java DSL API。

@Bean
public IntegrationFlow scatterGatherFlow() {
    return f -> f
      .scatterGather(scatterer -> scatterer
         .applySequence(true)
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10)),
      gatherer -> gatherer
         .releaseStrategy(group ->
                group.size() == 3 ||
                      group.getMessages()
                          .stream()
                          .anyMatch(m -> (Double) m.getPayload() > 5)),
      scatterGather -> scatterGather
        	 .gatherTimeout(10_000));
}

其中 scatterer 只是一个 RecipientListRoutergatherer 是一个 AggregatingMessageHandler,而最后的 Consumer 接受 ScatterGatherHandler 的选项。

更多路由器的改进

.routeToRecipients() API 现在为收件人提供了更多配置选项。

.routeToRecipients(r -> r
    .recipient("foo-channel", "'foo' == payload")
    .recipient("bar-channel", m ->
        m.getHeaders().containsKey("recipient")
            && (boolean) m.getHeaders().get("recipient"))
    .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
        f -> f.<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("recipientListSubFlow1Result")))
    .recipientFlow((String p) -> p.startsWith("baz"),
        f -> f.transform("Hello "::concat)
            .channel(c -> c.queue("recipientListSubFlow2Result")))
    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                   "bax".equals(m.getPayload())),
        f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
    .defaultOutputToParentFlow())

以前,.route() 操作符会将 IntegrationFlow 中的下一个 .channel() 作为 RouterdefaultOutputChannel。根据用户体验,无条件做出这样的决定似乎不太合理。我们重新设计了 .route(),使其与标准的 AbstractMessageRouter 行为保持一致。添加了 .defaultOutputChannel().defaultSubFlowMapping() 来利用 Routerdefault 逻辑。为了回退到之前的行为,提供了 .defaultOutputToParentFlow(),正如你在上面 .routeToRecipients() 示例中所注意到的。

有关 1.2.0.M2 版本的更多信息,请查看 提交历史。并始终阅读 JavaDocs 以了解你使用的 API!

下一步

我们预计在几周内发布 1.2 版本的第一个(也希望是最后一个)发布候选版本,在 Spring Integration 4.3.2 和 Spring Boot 1.4.1 采用后。这很快,因为 spring-integration-java-dsl 将迁移到 Spring Integration Core 5.0 和 Java 8 代码库。当前的 1.2 版本将继续得到支持,但仅限于 bug 修复。

项目主页 | 文档 | Issues | 帮助

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有