Green Beans:Spring Integration 入门

工程 | Josh Long | 2011年2月24日 | ...

应用程序并非孤立存在。它们需要与其客户和其他应用程序进行通信。应用程序集成就是为了实现这种通信。集成允许应用程序彼此共享服务和数据,而且,同样重要的是,集成帮助应用程序与其用户连接。

Spring Integration 提供了一个框架来构建集成解决方案,以促进这些类型的解决方案。Spring Integration 解决方案描述了数据通过管道的流程。数据作为消息进入处理管道。消息通过命名管道(称为通道)向前移动,这些管道将数据路由到不同的组件(称为端点)。您可以根据需要将任意数量的端点和通道串在一起。

这个模型在其简单性上与 Unix 命令行非常相似。例如,以下 Unix 命令:

cat spring.txt |grep spring |cut -f1 -d, | sort | uniq > spring_cleaning.txt

在这个命令中,来自外部资源(名为spring.txt的文件)的数据被传递到grep命令行。grep筛选结果并查找所有匹配单词“spring”的行,并将它们作为输出发送。grep命令的输出作为输入发送到cut命令,该命令将解析结果并以逗号 (",") 分割每一行,并且只保留第一列数据(逗号后的一切都被丢弃)。此中继继续到最后,最终将输出写回另一个称为spring_cleaning.txt的外部资源。在这个命令中,我们使用 Unix “管道” (“|”) 将一个命令的输出连接到另一个命令的输入。我们通过将专门的、单一用途的命令组合成管道来读取、清理、过滤、转换和写入数据。这种方法称为管道和过滤器模型,Spring Integration 允许您使用相同的简单模型来考虑更大的问题。在 Spring Integration 中,通道是管道,端点是过滤器。

Spring Integration 功能齐全

Spring Integration 支持许多不同的用例。以下是一些常见的用例:
  • 转换数据:转换器端点可能会将有效负载的类型从一个类更改为另一个类,或者删除、添加或更改消息头。
  • 路由数据:路由器端点提供自定义路由逻辑 - 也许来自输入通道的数据应该传递到多个输出通道?
  • 过滤数据:也许输入数据不适合继续处理,应该被剔除。您可以使用过滤器端点有条件地阻止数据继续处理。
  • 使不同的系统适应 Spring Integration:适配器提供数据进入和离开 Spring Integration 解决方案的能力。
  • 分割数据:当有效负载太大,或打算将其划分为较小的数据块时。一个例子可能是一个应该拆分为行的单个文件。
  • 聚合数据:这是分割功能的反向:聚合器等待消息逐个到达,并将它们收集起来,直到满足某些条件。然后,它将所有聚合的消息作为一条消息发送。
这些是 Spring Integration 开发人员工具箱中的工具。Spring Integration 本质上只是一个带有公共抽象和接口的 Maven 依赖项。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

Spring Integration 提供了许多现成的模块,每个模块都支持特定类型的需求(例如:XML 处理或 Web 服务集成)。例如,文件命名空间提供入站和出站适配器,分别用于从文件系统读取文件和将文件写入文件系统。file:inbound-channel-adapter 适配文件系统 - 它监视您指定的目录,每当出现新文件时,都会生成有效负载类型为java.io.File 的新文件的有效负载消息。一个常见的用例是从java.io.File 将文件转换为文件内容的表示 - 无论是String还是byte数组 (byte[])。由于这非常常见,文件命名空间同时提供file:file-to-string-transformer端点和file:file-to-bytes-transformer端点。最后,一个常见的场景是人们想要将Stringbyte数组数据写入文件系统。为此,file命名空间提供file:outbound-channel-adapter适配器。这些都只是前面描述的基本端点的面向文件 IO 的实现,例如适配器和转换器。

Spring Integration 附带许多预打包的模块,这些模块支持这些相同端点类型的广泛功能。Spring Integration 2.0 附带的一些模块包括文件 IO、JDBC、RSS/ATOM、FTP/FTPS、SFTP、TCP/IP、RMI、HTTP、JMX、电子邮件、IO 流、Twitter、XMPP、Web 服务、HTTP/REST、XML 和 JMS。虽然此列表很长,但并不完整。还有更多支持即将到来,或者作为其他 Spring 产品组合项目的一部分提供,包括 Gemfire 支持、AMQP 支持、基于与 Activiti BPMN 2 业务流程管理项目完成的工作的流程系统集成、基于 Ajax/Comet 的支持、基于 Flex BlazeDS 消息传递的支持等等。

一个简单的示例

让我们来看一个简单的例子 - 一个假设的在线零售商(“电子零售商”!)在网站上处理交易并征用几家第三方公司来帮助完成订单。特别是,我们将关注数据从网站上的购买点到运输公司的流程。集成由服务发送的 JMS 消息触发。运输公司与电子零售商无关,并期望所有订单都通过其 Web 服务端点提交,因此必须使用触发 JMS 消息来调用 Web 服务端点。最后,提交给第三方(在本例中为运输公司)的所有作业都需要进行审核,因此应在我们的数据库中面向报告的审核表中记录信息。
Spring Integration 示例流程 示例集成解决方案的图表 - 此图表由 SpringSource Tool Suite 生成

处理 JMS 消息:第一步是从 JMS 代理接收消息。我们将使用 Spring Integration 入站适配器 - 一个从 JMS 代理接收消息的端点 - 以简单、声明的方式将 JMS 代理连接到我们的应用程序代码。

Spring Integration 允许您处理包装在org.springframework.integration.core.Message中的单个数据块。数据不会自行出现,它必须来自某个地方。要将数据输入和输出 Spring Integration,您分别使用入站和出站适配器。适配器是一种特殊的端点类型,它们会告诉您外部系统中何时发生有趣的事情,以及它们会告诉您外部系统中发生了什么。因为适配器只接收数据或发送数据,所以它们是单向的


    <int-jms:message-driven-channel-adapter
      channel="partnerNotifications"
      connection-factory="connectionFactory"
      transaction-manager="jmsTransactionManager"
      acknowledge="transacted"
       destination-name="${jms.partnernotifications.destination}"
    />

    <int:channel id="partnerNotifications"/>

我们不会介绍connectionFactoryjmsTransactionManager bean 的细节,因为它们是标准的 Spring JMS bean,可以在源代码中查看。每当 JMS 代理(在 JMSdestination中)出现消息时,Spring Integration 都会在名为partnerNotifications的通道上发布org.springframework.integration.Message,以便可以对其进行使用。

调用合作伙伴 Web 服务:消息有效负载是字符串中的 XML 文档。我们很快就会介绍它是如何获得的,但可以肯定地说,它已经采用电子零售商合作伙伴可以使用的通用 XML 格式。下一步是向运输公司发出 Web 服务调用。流程如下:消息从 JMS 适配器传入,其有效负载用作 Web 服务调用的有效负载。在从 JMS 适配器接收后且作为 Web 服务调用发送之前,它会在partnerNotifications通道中排队。

这表示一种不同类型的端点,类似于适配器,称为网关。网关是双向的,处理请求/回复场景。出站网关向外部系统发送请求并等待来自该外部系统的回复,该回复作为入站消息传递给您(请求者)。Web 服务调用为每个请求生成响应。回复数据作为 Spring Integration 消息发送到partnerXmlShippingReplies通道。

更新审核表:一旦我们从 Web 服务调用收到成功的回复,我们就需要更新我们的审核表。此审核表是面向数据仓库的表;它包含非规范化的记录,可用于跟踪订单在履行系统中的进度,并推动报告和分析。来自 Web 服务的回复消息是一个简单的 XML 文档,其中包含我们需要馈送到审核表的信息。

消息有两个重要的部分:它的头和它的有效负载。在 Spring Integration 中,有效负载可以是任何类型的对象。消息头基本上是java.util.Map中的一系列键值对。头键是String,但值可以是任何类型。头通常携带有关有效负载的元数据,端点在处理有效负载时可以依赖这些元数据。

在我们的示例中,我们需要提取重要的XML属性,并将它们作为消息头提供,以便可以将其馈送到JDBC查询以更新表。我们将使用Spring Integration的xml:xpath-header-enricher根据webservice的响应评估XPath表达式,并将解析的表达式提取为消息头的值。在下面的示例中,我们创建三个消息头:customerIdpurchaseIddate


    <int-xml:xpath-header-enricher input-channel="partnerXmlShippingReplies" output-channel="partnerShippingReplies">
        <int-xml:header name="purchaseId" xpath-expression="//@purchase-id"/>
        <int-xml:header name="customerId" xpath-expression="//@customer-id"/>
        <int-xml:header name="date" xpath-expression="//@confirmation-date"/>
    </int-xml:xpath-header-enricher>

   <int:channel id="partnerShippingReplies"/>

xml:xpath-header-enricher是一个转换器端点:消息以带有XML文档有效负载和消息头的Message形式进入,并以带有相同XML文档有效负载和三个新消息头(以及已经存在的那些消息头)的Message形式离开。在此示例中,输出发布在partnerShippingReplies通道上。

接下来,我们将消息和新的消息头发送到jdbc:outbound-channel-adapter,它将用于执行JDBC插入操作。


    <int-jdbc:outbound-channel-adapter
        data-source="dataSource"
        channel="partnerShippingReplies" >
        <int-jdbc:query>
INSERT INTO purchase_fulfillment_log(
    PURCHASE_ID, CUSTOMER_ID, EVENT_DATE, EVENT)
VALUES( :headers[purchaseId], :headers[customerId],
               :headers[date], 'SHIPPED' )
        </int-jdbc:query>
    </int-jdbc:outbound-channel-adapter>

这是一个完整的、可工作的集成。我们使用基于Java的配置来配置所有必要的bean,但不需要任何Java来处理任何业务逻辑本身——所有真正的处理逻辑都存在于大约25行Spring Integration命名空间元素中,并依赖于您很可能已经熟悉的技术。例如,Spring Integration JMS适配器基于核心Spring中可用的JMS支持。输出Web服务网关构建Spring Web服务堆栈(就像输出HTTP网关基于RestTemplate一样)。通常,XML支持基于Spring OXM支持。最后,输出JDBC适配器基于Spring JDBC支持(例如,在示例中,我们可以提供Spring的JdbcTemplate实例而不是javax.sql.DataSource)。

Spring Integration中的错误处理

事务 在示例中,我们从JMS消费消息,通过Web服务发送消息,然后转换响应,将其写入数据库。这里有很多活动部件,如果出现问题,您应该了解用于处理错误的各种机制。对于JMS和JDBC,常见的直觉是使用事务在某种类型的故障发生时回滚。在示例中使用的jms:message-driven-channel-adapter中,transaction-manager引用是可选的,但在使用时按预期工作:事务相同线程中的任何异常都将导致JMS消息接收回滚。线程局部事务扩展到Spring Integration中在同一线程中执行的所有组件,在本解决方案的情况下,它包含所有内容。

要尝试它,请关闭数据库,然后再次运行解决方案——它将尝试从JMS代理消费消息,调用Web服务,转换回复,然后访问数据库,在那里它将遇到异常,这将导致JMS接收操作回滚,并最终重新排队到JMS代理(在ActiveMQ中,默认情况下,消息最终会进入ActiveMQ.DLQ队列,即死信队列)。

构建体系结构的一致性 因此,事务是处理错误条件的一种方法,但对于不使用事务的资源(如Web服务)却帮助不大。无需事务的一种方法是在实现级别解决问题。例如,Web服务调用或数据库调用可以设置为幂等的:如果可能,使用相同输入的多次调用应该产生相同的输出,而不会产生副作用。例如,使用相同值更新数据库中的单行是幂等的:您可以运行相同的语句100次,最糟糕的情况是您获得与运行更新1次相同的(正确)结果。处理错误的另一种方法是实现补偿逻辑。如果消息的生产者和消费者在同一个线程中,那么如果出现问题,则适用正常的Java错误处理逻辑:抛出异常,并且可以由发送者(根据需要)进行处理。但是,如果生产者和消费者在不同的线程中,则不适用正常规则。在这些情况下,没有消费者可以将异常反馈给生产者。默认情况下,Spring Integration 会捕获异常并将其作为带有异常作为有效负载的消息转发到一个众所周知的自动创建的通道,称为errorChannel。您可以通过在代码中指定带有键MessageHeaders.ERROR_CHANNEL的消息头来指定应将错误转发到的备用通道。您可以从该通道消费消息并适当地对它们做出反应。

Spring Integration组件模型

到目前为止,我们通过依赖于满足我们目标的开箱即用支持来构建一切。你会发现这种情况大多数时候都是正确的——Spring Integration 真的是一个很好的工具箱。就像一盒乐高积木一样,您可以将看似无限的模块组合在一起以解决大多数问题。但是,这并不意味着每个问题都已解决。例如,Spring Integration 不了解您的领域模型的细节,也不了解您的业务逻辑和服务的细节。有时您想降低到Java级别,并只插入正确的行为。

为了支持自定义逻辑,Spring Integration 支持所有核心组件(transformerssplittersroutersadaptersaggregators等)的通用、可插入(在核心命名空间中)的实现,并期望您提供自定义Java逻辑来履行组件的职责。此外,Spring Integration 提供了一个service-activator组件,这是一个“逃生舱”组件:您可以使用它来插入自定义Java处理逻辑,无论此代码的目的是什么。您可以将service-activator用于任何您想要的东西——也许您只想使用iText将PDF写入文件系统或执行某种内在的业务计算?所有这些自定义组件都允许POJO实现。它们都通过调用您指定的bean上的方法来工作。关于方法的形状和形式没有严格的规则,因为Spring Integration非常灵活。但是,通常情况下,您应该想象该组件将位于其他组件之间,因此必须同时接受传入消息作为参数,并将其返回类型作为传出消息。消息输入,消息输出。

方法规范往往根据组件类型而略有不同:转换器可能会接收一种类型的消息并产生一种完全不同的、转换后的类型(当然!)的传出消息。拆分器接收单个消息并将其返回类型作为消息集合。聚合器反向工作:它将其输入参数作为消息集合,并将其输出作为单个聚合消息返回。列表还在继续,详细信息当然可以在文档中找到。对于我们的示例,让我们来看一个service-activator来演示方法映射的灵活性。要使用service-activator,我们必须首先在XML DSL中声明它,如下所示


 <service-activator
      input-channel = "in"
      ref = "helloWorldServiceActivator" method = "sayHello"
      output-channel = "out" />

此示例是典型的:输入通道产生消息,组件处理它,并将处理结果发送到输出通道。但是,这里的处理逻辑由您提供。ref属性指定应使用哪个bean来转换传入消息。它还规定了要使用的方法。如果该bean只有一个方法,或者该bean有多个方法,但只有一个方法用Spring Integration的@ServiceActivator注解进行注解,那么不需要像这里一样在XML中规定特定方法。让我们看一下这种service-activator的初步实现。


@Component
public class HelloWorldServiceActivator {

 public Message<String> sayHello( Message<String> inboundMessage) {
 Map<String,Object> headers = inboundMessage.getHeaders();
 String payload = inboundMessage.getPayload();
 System.out.println( "In the sayHello method, printing out the  "+
 "payload of the inbound message: "+payload + ". Also, there are " +
 headers.size() + " headers." ) ;
 return MessageBuilder.withPayload( inboundMessage.getPayload() )
           .copyHeadersIfAbsent( inboundMessage.getHeaders() )
           .build();
 }
}

该bean很简单:它是一个简单的POJO,并声明一个接收Spring Integration消息并将其输入作为输出返回的单个方法。但是,如果您不想依赖Spring Integration的消息类型,则不必这样做。Spring Integration具有一些智能启发式方法,并且在许多情况下可以自动代表您执行正确操作。让我们稍微修改一下该方法的代码,使其根据有效负载而不是Message包装器类工作


  public String sayHello( String inboundPayload) {
    //  ... same as before
  }

此代码与之前的工作方式相同,只是现在它只依赖于Message有效负载的类型。标题会自动为您复制。如果您愿意,您还可以声明Spring Integration放置消息头(位于java.util.Map<String,Object>中)的位置,如下所示


  public String sayHello( Map<String,Object> headers, String inboundPayload) {
    // ... same as before
  }

如果您想更好地控制放置的位置,也许是为了避免歧义,您可以使用Spring Integration的注解。让我们使用注解来修改最后一个示例


  public String sayHello( @Headers Map<String,Object> headers, @Payload String inboundPayload) {
    // ... same as before
  }

还有一个注解——@Header——在这里特别有用,它告诉Spring Integration您只想将一个特定头的值注入到您的方法中


  public String sayHello( @Header("id") UUID idHeaderValue, @Payload String inboundPayload) {
    // ... same as before
  }

这降低了代码的复杂性,并允许您编写表达性强的集成代码。所有其他组件都像@ServiceActivatorservice-activator一样支持此动态映射功能:@Transformertransformer@Splittersplitter@Aggregatoraggregator@Routerrouter等。

从您的服务中使用Spring Integration

Spring Integration解决方案并不总是需要由适配器启动。您可以从Java代码启动它们。前面的示例是由JMS消息启动的——每当消费JMS消息时,处理就会开始。

让我们看一下该交换的另一面——JMS消息的生产者。在前面的示例中,我们查看了当客户在在线电子零售商中结帐购物车时发送的消息的处理。使用Spring Integration API在本节中,我们将了解如何从购物车生成该消息,将其转换为XML,然后将其发送到前面的示例正在等待处理它的JMS代理。


  @Autowired @Qualifier("partnerNotifications")
  private MessageChannel messageChannel ;

  @Override
  @Transactional
  public void checkout(long purchaseId) {
    Purchase purchase = getPurchaseById(purchaseId);

    if (purchase.isFrozen())
      throw new RuntimeException("you can't check out Purchase(#" + purchase.getId() + ") that's already been checked out!");

    Date purchasedDate = new Date();
    Set<LineItem> lis = purchase.getLineItems();
    for (LineItem lineItem : lis) {
      lineItem.setPurchasedDate(purchasedDate);
      entityManager.merge(lineItem);
    }
    purchase.setFrozen(true);
    entityManager.merge(purchase);

    Message<Purchase> msg = MessageBuilder.withPayload(purchase).build();
    this.messageChannel.send(msg);
  }

大部分代码都是典型的服务层代码——我们感兴趣的只有MessageChannel的使用。MessageChannel是我们在XML中配置的各种Spring Integration通道类型的运行时基类型。一旦您获得了对MessageChannel的引用,只需通过它发送消息即可。您可以使用MessageChannel并直接与它交互以发送和接收消息,就像您可以使用低级JMS和AMQP API来发送和接收消息一样。

Spring Integration中的Message对象是不可变的——您不能直接创建Message对象。请改用MessageBuilder类及其静态方法来构建Message。MessageBuilder类包含基于现有payload和现有Headers映射创建新Message的方法。该API是流畅的——方法调用可以链接在一起。在这个例子中,我们使用MessageBuilder类基于Purchase对象(购物车领域模型中的本地JPA实体)来构建一个Message。

我们通过使用Spring Integration实现了间接性:我们可以在以后的Spring Integration中自由地改变消息的流程。服务代码不需要更改,因为它只与Spring Integration交互。

JMS消费者(我们在第一个例子中建立的那个)比较慢,因为它需要进行代价高昂的Web服务调用。通过将通知发送到JMS,然后让其他一些进程外的集成来处理Web服务的调用,我们获得了两个好处:结账服务更快,并且可以独立于服务扩展缓慢的处理过程。例如,结账服务可能部署在Web应用程序中,每台机器一个。另一方面,调用Web服务的通知处理只是一个简单的main(String[])类,可以在同一台机器上运行多次,而不会出现任何问题以满足负载。

总结

我们已经探索了集成领域的广阔天地,并学习了如何使用Spring Integration以一种干净、灵活的方式将不同的系统连接在一起。我们已经探索了Spring Integration如何受益于其在核心Spring框架之上的位置——对于希望解决集成问题的Spring开发者来说,它是一个自然的下一步。在这篇文章中,我们还探索了Spring Integration友好的基于Spring XML DSL的编程模型,以及它的基于POJO的编程模型。用户可以在我们的基于Git的社区示例项目Spring Integration入门文件夹下找到这个以及所有其他“Green Beans”文章的源代码。

获取Spring通讯

关注Spring通讯

订阅

领先一步

VMware提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部