领先一步
VMware提供培训和认证,以加速您的进步。
了解更多应用程序并非孤立存在。它们需要与其客户和其他应用程序进行通信。应用程序集成就是为了实现这种通信。集成允许应用程序彼此共享服务和数据,而且,同样重要的是,集成帮助应用程序与其用户连接。
Spring Integration 提供了一个框架来构建集成解决方案,以促进这些类型的解决方案。Spring Integration 解决方案描述了数据通过管道的流程。数据作为消息进入处理管道。消息通过命名管道(称为通道)向前移动,这些管道将数据路由到不同的组件(称为端点)。您可以根据需要将任意数量的端点和通道串在一起。
这个模型在其简单性上与 Unix 命令行非常相似。例如,以下 Unix 命令:
cat spring.txt |grep spring |cut -f1 -d, | sort | uniq > spring_cleaning.txt
在这个命令中,来自外部资源(名为spring.txt
的文件)的数据被传递到grep
命令行。grep
筛选结果并查找所有匹配单词“spring”的行,并将它们作为输出发送。grep
命令的输出作为输入发送到cut
命令,该命令将解析结果并以逗号 (",") 分割每一行,并且只保留第一列数据(逗号后的一切都被丢弃)。此中继继续到最后,最终将输出写回另一个称为spring_cleaning.txt
的外部资源。在这个命令中,我们使用 Unix “管道” (“|”) 将一个命令的输出连接到另一个命令的输入。我们通过将专门的、单一用途的命令组合成管道来读取、清理、过滤、转换和写入数据。这种方法称为管道和过滤器模型,Spring Integration 允许您使用相同的简单模型来考虑更大的问题。在 Spring Integration 中,通道是管道,端点是过滤器。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
Spring Integration 提供了许多现成的模块,每个模块都支持特定类型的需求(例如:XML 处理或 Web 服务集成)。例如,文件命名空间提供入站和出站适配器,分别用于从文件系统读取文件和将文件写入文件系统。file:inbound-channel-adapter
适配文件系统 - 它监视您指定的目录,每当出现新文件时,都会生成有效负载类型为java.io.File
的新文件的有效负载消息。一个常见的用例是从java.io.File
将文件转换为文件内容的表示 - 无论是String
还是byte
数组 (byte[]
)。由于这非常常见,文件命名空间同时提供file:file-to-string-transformer
端点和file:file-to-bytes-transformer
端点。最后,一个常见的场景是人们想要将String
或byte
数组数据写入文件系统。为此,file
命名空间提供file:outbound-channel-adapter
适配器。这些都只是前面描述的基本端点的面向文件 IO 的实现,例如适配器和转换器。
Spring Integration 附带许多预打包的模块,这些模块支持这些相同端点类型的广泛功能。Spring Integration 2.0 附带的一些模块包括文件 IO、JDBC、RSS/ATOM、FTP/FTPS、SFTP、TCP/IP、RMI、HTTP、JMX、电子邮件、IO 流、Twitter、XMPP、Web 服务、HTTP/REST、XML 和 JMS。虽然此列表很长,但并不完整。还有更多支持即将到来,或者作为其他 Spring 产品组合项目的一部分提供,包括 Gemfire 支持、AMQP 支持、基于与 Activiti BPMN 2 业务流程管理项目完成的工作的流程系统集成、基于 Ajax/Comet 的支持、基于 Flex BlazeDS 消息传递的支持等等。
处理 JMS 消息:第一步是从 JMS 代理接收消息。我们将使用 Spring Integration 入站适配器 - 一个从 JMS 代理接收消息的端点 - 以简单、声明的方式将 JMS 代理连接到我们的应用程序代码。
Spring Integration 允许您处理包装在org.springframework.integration.core.Message
中的单个数据块。数据不会自行出现,它必须来自某个地方。要将数据输入和输出 Spring Integration,您分别使用入站和出站适配器。适配器是一种特殊的端点类型,它们会告诉您外部系统中何时发生有趣的事情,以及它们会告诉您外部系统中发生了什么。因为适配器只接收数据或发送数据,所以它们是单向的。
<int-jms:message-driven-channel-adapter
channel="partnerNotifications"
connection-factory="connectionFactory"
transaction-manager="jmsTransactionManager"
acknowledge="transacted"
destination-name="${jms.partnernotifications.destination}"
/>
<int:channel id="partnerNotifications"/>
我们不会介绍connectionFactory
和jmsTransactionManager
bean 的细节,因为它们是标准的 Spring JMS bean,可以在源代码中查看。每当 JMS 代理(在 JMSdestination
中)出现消息时,Spring Integration 都会在名为partnerNotifications
的通道上发布org.springframework.integration.Message
,以便可以对其进行使用。
调用合作伙伴 Web 服务:消息有效负载是字符串中的 XML 文档。我们很快就会介绍它是如何获得的,但可以肯定地说,它已经采用电子零售商合作伙伴可以使用的通用 XML 格式。下一步是向运输公司发出 Web 服务调用。流程如下:消息从 JMS 适配器传入,其有效负载用作 Web 服务调用的有效负载。在从 JMS 适配器接收后且作为 Web 服务调用发送之前,它会在partnerNotifications
通道中排队。
这表示一种不同类型的端点,类似于适配器,称为网关。网关是双向的,处理请求/回复场景。出站网关向外部系统发送请求并等待来自该外部系统的回复,该回复作为入站消息传递给您(请求者)。Web 服务调用为每个请求生成响应。回复数据作为 Spring Integration 消息发送到partnerXmlShippingReplies
通道。
更新审核表:一旦我们从 Web 服务调用收到成功的回复,我们就需要更新我们的审核表。此审核表是面向数据仓库的表;它包含非规范化的记录,可用于跟踪订单在履行系统中的进度,并推动报告和分析。来自 Web 服务的回复消息是一个简单的 XML 文档,其中包含我们需要馈送到审核表的信息。
消息有两个重要的部分:它的头和它的有效负载。在 Spring Integration 中,有效负载可以是任何类型的对象。消息头基本上是java.util.Map
中的一系列键值对。头键是String
,但值可以是任何类型。头通常携带有关有效负载的元数据,端点在处理有效负载时可以依赖这些元数据。
在我们的示例中,我们需要提取重要的XML属性,并将它们作为消息头提供,以便可以将其馈送到JDBC查询以更新表。我们将使用Spring Integration的xml:xpath-header-enricher
根据webservice的响应评估XPath表达式,并将解析的表达式提取为消息头的值。在下面的示例中,我们创建三个消息头:customerId
、purchaseId
和date
。
<int-xml:xpath-header-enricher input-channel="partnerXmlShippingReplies" output-channel="partnerShippingReplies">
<int-xml:header name="purchaseId" xpath-expression="//@purchase-id"/>
<int-xml:header name="customerId" xpath-expression="//@customer-id"/>
<int-xml:header name="date" xpath-expression="//@confirmation-date"/>
</int-xml:xpath-header-enricher>
<int:channel id="partnerShippingReplies"/>
xml:xpath-header-enricher
是一个转换器端点:消息以带有XML文档有效负载和消息头的Message
形式进入,并以带有相同XML文档有效负载和三个新消息头(以及已经存在的那些消息头)的Message
形式离开。在此示例中,输出发布在partnerShippingReplies
通道上。
接下来,我们将消息和新的消息头发送到jdbc:outbound-channel-adapter
,它将用于执行JDBC插入操作。
<int-jdbc:outbound-channel-adapter
data-source="dataSource"
channel="partnerShippingReplies" >
<int-jdbc:query>
INSERT INTO purchase_fulfillment_log(
PURCHASE_ID, CUSTOMER_ID, EVENT_DATE, EVENT)
VALUES( :headers[purchaseId], :headers[customerId],
:headers[date], 'SHIPPED' )
</int-jdbc:query>
</int-jdbc:outbound-channel-adapter>
这是一个完整的、可工作的集成。我们使用基于Java的配置来配置所有必要的bean,但不需要任何Java来处理任何业务逻辑本身——所有真正的处理逻辑都存在于大约25行Spring Integration命名空间元素中,并依赖于您很可能已经熟悉的技术。例如,Spring Integration JMS适配器基于核心Spring中可用的JMS支持。输出Web服务网关构建Spring Web服务堆栈(就像输出HTTP网关基于RestTemplate
一样)。通常,XML支持基于Spring OXM支持。最后,输出JDBC适配器基于Spring JDBC支持(例如,在示例中,我们可以提供Spring的JdbcTemplate实例而不是javax.sql.DataSource
)。
jms:message-driven-channel-adapter
中,transaction-manager
引用是可选的,但在使用时按预期工作:事务相同线程中的任何异常都将导致JMS消息接收回滚。线程局部事务扩展到Spring Integration中在同一线程中执行的所有组件,在本解决方案的情况下,它包含所有内容。要尝试它,请关闭数据库,然后再次运行解决方案——它将尝试从JMS代理消费消息,调用Web服务,转换回复,然后访问数据库,在那里它将遇到异常,这将导致JMS接收操作回滚,并最终重新排队到JMS代理(在ActiveMQ中,默认情况下,消息最终会进入ActiveMQ.DLQ
队列,即死信队列)。
构建体系结构的一致性 因此,事务是处理错误条件的一种方法,但对于不使用事务的资源(如Web服务)却帮助不大。无需事务的一种方法是在实现级别解决问题。例如,Web服务调用或数据库调用可以设置为幂等的:如果可能,使用相同输入的多次调用应该产生相同的输出,而不会产生副作用。例如,使用相同值更新数据库中的单行是幂等的:您可以运行相同的语句100次,最糟糕的情况是您获得与运行更新1次相同的(正确)结果。处理错误的另一种方法是实现补偿逻辑。如果消息的生产者和消费者在同一个线程中,那么如果出现问题,则适用正常的Java错误处理逻辑:抛出异常,并且可以由发送者(根据需要)进行处理。但是,如果生产者和消费者在不同的线程中,则不适用正常规则。在这些情况下,没有消费者可以将异常反馈给生产者。默认情况下,Spring Integration 会捕获异常并将其作为带有异常作为有效负载的消息转发到一个众所周知的自动创建的通道,称为errorChannel
。您可以通过在代码中指定带有键MessageHeaders.ERROR_CHANNEL
的消息头来指定应将错误转发到的备用通道。您可以从该通道消费消息并适当地对它们做出反应。
为了支持自定义逻辑,Spring Integration 支持所有核心组件(transformers
、splitters
、routers
、adapters
、aggregators
等)的通用、可插入(在核心命名空间中)的实现,并期望您提供自定义Java逻辑来履行组件的职责。此外,Spring Integration 提供了一个service-activator
组件,这是一个“逃生舱”组件:您可以使用它来插入自定义Java处理逻辑,无论此代码的目的是什么。您可以将service-activator
用于任何您想要的东西——也许您只想使用iText将PDF写入文件系统或执行某种内在的业务计算?所有这些自定义组件都允许POJO实现。它们都通过调用您指定的bean上的方法来工作。关于方法的形状和形式没有严格的规则,因为Spring Integration非常灵活。但是,通常情况下,您应该想象该组件将位于其他组件之间,因此必须同时接受传入消息作为参数,并将其返回类型作为传出消息。消息输入,消息输出。
方法规范往往根据组件类型而略有不同:转换器可能会接收一种类型的消息并产生一种完全不同的、转换后的类型(当然!)的传出消息。拆分器接收单个消息并将其返回类型作为消息集合。聚合器反向工作:它将其输入参数作为消息集合,并将其输出作为单个聚合消息返回。列表还在继续,详细信息当然可以在文档中找到。对于我们的示例,让我们来看一个service-activator来演示方法映射的灵活性。要使用service-activator,我们必须首先在XML DSL中声明它,如下所示
<service-activator
input-channel = "in"
ref = "helloWorldServiceActivator" method = "sayHello"
output-channel = "out" />
此示例是典型的:输入通道产生消息,组件处理它,并将处理结果发送到输出通道。但是,这里的处理逻辑由您提供。ref
属性指定应使用哪个bean来转换传入消息。它还规定了要使用的方法。如果该bean只有一个方法,或者该bean有多个方法,但只有一个方法用Spring Integration的@ServiceActivator
注解进行注解,那么不需要像这里一样在XML中规定特定方法。让我们看一下这种service-activator
的初步实现。
@Component
public class HelloWorldServiceActivator {
public Message<String> sayHello( Message<String> inboundMessage) {
Map<String,Object> headers = inboundMessage.getHeaders();
String payload = inboundMessage.getPayload();
System.out.println( "In the sayHello method, printing out the "+
"payload of the inbound message: "+payload + ". Also, there are " +
headers.size() + " headers." ) ;
return MessageBuilder.withPayload( inboundMessage.getPayload() )
.copyHeadersIfAbsent( inboundMessage.getHeaders() )
.build();
}
}
该bean很简单:它是一个简单的POJO,并声明一个接收Spring Integration消息并将其输入作为输出返回的单个方法。但是,如果您不想依赖Spring Integration的消息类型,则不必这样做。Spring Integration具有一些智能启发式方法,并且在许多情况下可以自动代表您执行正确操作。让我们稍微修改一下该方法的代码,使其根据有效负载而不是Message包装器类工作
public String sayHello( String inboundPayload) {
// ... same as before
}
此代码与之前的工作方式相同,只是现在它只依赖于Message
有效负载的类型。标题会自动为您复制。如果您愿意,您还可以声明Spring Integration放置消息头(位于java.util.Map<String,Object>
中)的位置,如下所示
public String sayHello( Map<String,Object> headers, String inboundPayload) {
// ... same as before
}
如果您想更好地控制放置的位置,也许是为了避免歧义,您可以使用Spring Integration的注解。让我们使用注解来修改最后一个示例
public String sayHello( @Headers Map<String,Object> headers, @Payload String inboundPayload) {
// ... same as before
}
还有一个注解——@Header
——在这里特别有用,它告诉Spring Integration您只想将一个特定头的值注入到您的方法中
public String sayHello( @Header("id") UUID idHeaderValue, @Payload String inboundPayload) {
// ... same as before
}
这降低了代码的复杂性,并允许您编写表达性强的集成代码。所有其他组件都像@ServiceActivator
和service-activator
一样支持此动态映射功能:@Transformer
和transformer
、@Splitter
和splitter
、@Aggregator
和aggregator
、@Router
和router
等。
让我们看一下该交换的另一面——JMS消息的生产者。在前面的示例中,我们查看了当客户在在线电子零售商中结帐购物车时发送的消息的处理。使用Spring Integration API在本节中,我们将了解如何从购物车生成该消息,将其转换为XML,然后将其发送到前面的示例正在等待处理它的JMS代理。
@Autowired @Qualifier("partnerNotifications")
private MessageChannel messageChannel ;
@Override
@Transactional
public void checkout(long purchaseId) {
Purchase purchase = getPurchaseById(purchaseId);
if (purchase.isFrozen())
throw new RuntimeException("you can't check out Purchase(#" + purchase.getId() + ") that's already been checked out!");
Date purchasedDate = new Date();
Set<LineItem> lis = purchase.getLineItems();
for (LineItem lineItem : lis) {
lineItem.setPurchasedDate(purchasedDate);
entityManager.merge(lineItem);
}
purchase.setFrozen(true);
entityManager.merge(purchase);
Message<Purchase> msg = MessageBuilder.withPayload(purchase).build();
this.messageChannel.send(msg);
}
大部分代码都是典型的服务层代码——我们感兴趣的只有MessageChannel
的使用。MessageChannel
是我们在XML中配置的各种Spring Integration通道类型的运行时基类型。一旦您获得了对MessageChannel
的引用,只需通过它发送消息即可。您可以使用MessageChannel
并直接与它交互以发送和接收消息,就像您可以使用低级JMS和AMQP API来发送和接收消息一样。
Spring Integration中的Message
对象是不可变的——您不能直接创建Message
对象。请改用MessageBuilder
类及其静态方法来构建Message。MessageBuilder
类包含基于现有payload和现有Headers映射创建新Message的方法。该API是流畅的——方法调用可以链接在一起。在这个例子中,我们使用MessageBuilder
类基于Purchase
对象(购物车领域模型中的本地JPA实体)来构建一个Message。
我们通过使用Spring Integration实现了间接性:我们可以在以后的Spring Integration中自由地改变消息的流程。服务代码不需要更改,因为它只与Spring Integration交互。
JMS消费者(我们在第一个例子中建立的那个)比较慢,因为它需要进行代价高昂的Web服务调用。通过将通知发送到JMS,然后让其他一些进程外的集成来处理Web服务的调用,我们获得了两个好处:结账服务更快,并且可以独立于服务扩展缓慢的处理过程。例如,结账服务可能部署在Web应用程序中,每台机器一个。另一方面,调用Web服务的通知处理只是一个简单的main(String[])
类,可以在同一台机器上运行多次,而不会出现任何问题以满足负载。