领先一步
VMware提供培训和认证,以加速您的进步。
了解更多在这篇文章中,我们将介绍消息传递的核心概念,以及 Spring 框架及其相关项目提供的对各种消息类型的丰富支持。
什么是消息传递?为了更好地解释这一点,我将改写 Gregor Hohpe 和 Bobby Woolf 在其开创性著作《企业集成模式》(Addison Wesley,2004 年) 中提供的示例。当你拨打电话时,你试图将信息传递给另一方。这只有在对方接听电话时才能奏效。由于并非总是能够接听电话,我们使用语音信箱来排队消息。主叫者在语音信箱中留言,然后被叫者可以稍后异步地检索该消息(或者实际上是多条消息)。
在这个例子中,语音信箱位于双方之间。它存储消息,然后在被叫者——接收者——检索它时传递它。在企业消息传递的世界中,事情的工作方式非常相似:一方将消息发送到消息代理(也称为面向消息的中间件——MOM),另一方——当该方可以时——接收或显式查询消息代理中的任何消息。
类比到此为止。与语音信箱相比,消息代理有很多选项。消息代理非常适合提供额外的服务,例如路由,并保证消息传递。消息代理可以针对不同的用例进行优化,例如,您可以用速度换取持久性。消息代理可以将消息持久化到外部存储以确保持久性,但这通常是一个可以为了速度而切换的配置。
在语音信箱示例中,一条消息由一方发送,然后传递到另一方——通信是*点对点*的。消息代理也支持这一点,以及另一种称为*发布-订阅*的通信类型,其中消息传递给多个客户端。
消息代理的一个常见用途是解决两个不同系统之间的集成问题。发送到消息代理的数据通常采用发送者和接收者都通用的格式。两个系统使用消息代理只需要就数据契约达成一致。消息通常包含消息体(其中存储消息本身的内容)和消息头(它们是键/值对,提供有关消息体的元数据,可用于帮助消息的使用者处理消息)。消息头可以是任何你想要的内容,但它们通常与消息本身或消息的处理器相关。
Java 消息服务 (JMS) API 指定了与消息代理交互的客户端接口。每个消息代理都提供自己的 API 实现,非常类似于 JDBC 驱动程序对 JDBC API 的实现。这意味着 JMS 客户端通常应该使用与服务器相同的客户端版本。有很多优秀的 JMS 代理实现可供选择。原因之一是消息传递一直是应用程序开发的重要组成部分,并且今天变得越来越重要。自 1.1 版以来,JMS 一直是 J2EE(现在是 Java EE)规范的一部分。JMS 规范在过去十年的大部分时间里一直是 1.1 版。
在 JMS 中,客户端使用javax.jms.ConnectionFactory
创建一个javax.jms.Connection
。然后可以使用Connection
来创建一个javax.jms.Session
。Session
代表客户端与代理的交互,并允许发送和接收消息以及其他不太明显的操作。
接口上最有用的方法涉及创建消息生产者和消息使用者,它们向javax.jms.Destination
发送和接收消息。Destination
将 JMS 的“消息代理上的地址”概念映射到一起。它还映射了代理存储消息的位置的概念。在 JMS 中,消息发送到、存储在和从相同位置使用,所有这些都由javax.jms.Destination
实例表示。
Destination
是一个接口,有两个更具体的子接口,javax.jms.Queue
和javax.jms.Topic
。Queue
代表标准队列,这是一种前面描述的点对点结构。Topic
提供发布-订阅消息传递,可以将单条消息传递给多个接收者。
要将消息发送到Destination
,必须创建一个javax.jms.MessageProducer
。然后可以使用MessageProducer
发送javax.jms.Message
。
JMS 支持两种不同的接收消息的机制。第一种方法是使用javax.jmx.MessageConsumer#receive()
方法请求消息,该方法以*同步*方式从Destination
返回单个消息;默认情况下,该方法会阻塞,直到收到消息。客户端可以使用javax.jms.MessageListener
,方法是调用javax.jms.Session#setMessageListener(MessageListener)
,而不是使用MessageConsumer
。MessageListener
是一个接口,只有一个方法public void onMessage(javax.jms.Message)
,每当Destination
上有javax.jms.Message
可供使用时,都会调用该方法。MessageListener
提供*异步*消息处理:随着消息到达,它们将被处理。
JMS API 中还有很多内容需要学习,但是这些类和概念将最有助于我们讨论 Spring 对 JMS 消息传递的支持。第一级支持是org.springframework.jms.core.JmsTemplate
,它提供简化的方法,将我们刚才讨论的内容简化为单行代码。JmsTemplate
需要一个javax.jms.ConnectionFactory
实例才能完成其工作。JmsTemplate
可以为您完成很多工作。例如,要发送消息,JmsTemplate
会建立一个javax.jms.Session
,设置一个javax.jms.MessageConsumer
或javax.jms.MessageProducer
,设置所有事务机制,并为您提供当前javax.jms.Session
的引用,以便您可以创建您选择的消息并发送它。使用所有错误处理和构造逻辑,这很容易节省数十行代码。消息发送后,它会销毁或关闭大部分这些对象。这在应用程序服务器(如 Java EE 服务器)中是标准做法,因为ConnectionFactory
实例由服务器创建,由服务器管理,并被池化。它们在使用后缓存实例。在这些环境中关闭资源只是将它们返回到池中。因此,JmsTemplate
在标准情况下会执行正确的操作,假设ConnectionFactory
缓存或池化实例。
在像应用程序服务器这样的受管理环境中,您通常需要从 JNDI 获取javax.jms.ConnectionFactory
。您可以使用 Spring 为您查找该引用并配置JmsTemplate
。在我们的示例中,我们希望更松散地操作,因此我们将使用独立的 ActiveMQ 消息代理。ActiveMQ 是一个流行的开源消息代理。要使用它,请下载它,然后运行 bin 文件夹中适合您操作系统的启动脚本。在您的应用程序中,您需要客户端库才能连接到相应的 ActiveMQ 版本。在撰写本文时,ActiveMQ 的最新版本是 5.4.2。如果您使用的是 Maven,请将以下依赖项添加到您的 Maven pom 文件中
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-optional</artifactId>
<version>${activemq.version}</version>
</dependency>
确保为${activemq.version}
创建一个 Maven 属性,或者手动将字符串替换为适当的版本。还有一个activemq-all
依赖项,但它会下载许多可能不必要的 jar 包。对于我们的应用程序,上述两个依赖项就足够了。
让我们检查一下基本 JMS 应用程序的配置。首先,让我们检查基本的 Spring XML 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
">
<context:property-placeholder location="jms.properties"/>
<context:component-scan base-package="org.springsource.greenbeans.examples.jms.core"/>
<context:component-scan base-package="org.springsource.greenbeans.examples.jms.jmstemplate"/>
<tx:annotation-driven transaction-manager="jmsTransactionManager"/>
</beans>
您可以看到 XML 主要设置属性占位符解析并启用类路径扫描。最有趣的部分是@Transactional
注解的方法启用事务。该元素引用 Spring 上下文中另一个 bean,jmsTransactionManager
,它在以下 Java 配置类中定义。
package org.springsource.greenbeans.examples.jms.core;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*
import org.springframework.jms.connection.*
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
@Configuration
public class JmsConfiguration {
@Value("${broker.url}")
private String brokerUrl;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(this.brokerUrl);
return new CachingConnectionFactory(activeMQConnectionFactory);
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(this.connectionFactory());
}
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(this.connectionFactory());
}
}
配置相当简单。首先,我们定义一个ActiveMQConnectionFactory
实例,然后将其提供给 Spring 框架的CachingConnectionFactory
实例。一些代理提供它们自己的缓存ConnectionFactory
实现。但是,如果您的代理没有提供,那么您可以始终使用 Spring 缓存连接工厂实现来提高速度。
接下来,我们有一个JmsTransactionManager
,它提供 JMS 本地事务。在 JMS 中,事务回滚只有两种结果:在发送操作失败时,消息不会发送;在接收操作失败时,消息会重新排队到消息代理。最后一种情况可能很复杂。
如果您接收一条消息,然后在处理该消息时遇到错误,并且假设您保持事务打开状态,则事务将回滚,并且该消息将返回到代理。一旦消息进入代理,会发生什么取决于代理和您的配置。通常,消息将立即重新传递。但是,这并不总是期望的行为。因此,大多数(如果不是全部)代理都支持某种死信队列,无法传递的消息将发送到该队列。可以根据需要处理此队列中的消息——当出现此错误情况时,某些监控工具可能会唤醒某人。但是,大多数代理提供更多控制。可能可以设置有关错误消息路由的规则。例如,代理可能会尝试立即重新传递消息,然后如果再次失败,它可能会等待一段时间然后重试,如果再次失败,则等待更长时间。这通常称为退避周期。也许在达到某个阈值后,消息可以传递到死信队列或直接丢弃。无论如何,请检查您的代理文档。
最后,我们通过向它提供对ConnectionFactory
的引用来创建一个JmsTemplate
。
让我们看看JmsTemplate
的实际应用。为了简化示例,我们首先讨论如何在恰当命名的Producer
类中发送消息。消息传递的一个常见用途是将通知发送到一个(或多个)不同的系统,作为同步机制,以便感兴趣的系统拥有某些数据的最新版本。在这个例子中,我们假设我们有一个简单的Customer
POJO,它包含标准字段:firstName
、lastName
、email
和id
。
package org.springsource.greenbeans.examples.jms.core;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*
import org.springframework.beans.factory.annotation.*
import org.springframework.jms.core.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*;
@Component
public class Producer {
@Value("${jms.customer.destination}")
private String customerDestination;
@Autowired
private JmsTemplate jmsTemplate;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void sendCustomerUpdate(final Customer customer) throws Exception {
this.jmsTemplate.send(this.customerDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
log.info("Sending customer data " + ToStringBuilder.reflectionToString(customer));
MapMessage mapMessage = session.createMapMessage();
mapMessage.setLong("id", customer.getId());
mapMessage.setString("firstName", customer.getFirstName());
mapMessage.setString("lastName", customer.getLastName());
mapMessage.setString("email", customer.getEmail());
}
});
}
}
在这个类中,我们看到一个sendCustomerUpdate
方法,它接受一个Customer引用作为参数。使用JmsTemplate
的send方法——它接受两个参数:第一个是目标名称的字符串(“customers”),第二个是Spring框架类MessageCreator
的引用——我们使用传递到我们createMessage(javax.jms.Session)
方法实现中的javax.jms.Session
引用构建JMS消息。您可以创建许多类型的JMS消息:javax.jms.TextMessage
、javax.jms.ObjectMessage
、javax.jms.MapMessage
等。ObjectMessage
的功能正如您预期的那样——它允许您将序列化对象作为有效负载传输到JMS消息。通常情况下,应该避免这种做法。序列化数据类型会将消息的生产者和消费者耦合到相同的API契约,这可能并不总是可行的。即使可以保证在消息交换的两端都可用且具有相同类版本的类型,与其他更灵活的选项相比,这样做通常效率低下。相反,更倾向于分解——也许您可以使用javax.jms.TextMessage
将对象编组为XML或JSON字符串。或者,使用javax.jms.MapMessage
发送对象的组成部分,而不是对象本身,它只是一个具有已知键值对的消息,就像在java.util.Map
中一样。这就是我们在这里采用的方法。所有JVM都具有int
、long
、String
等,并且可以反序列化以这种方式传输的数据。
现在让我们来看看在JMS中接收消息。第一种方法是同步地一次请求一个消息。
package org.springsource.greenbeans.examples.jms.jmstemplate;
import org.apache.commons.logging.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*
@Component
public class RawJmsTemplatePollingMessageConsumer {
@Autowired
protected JmsTemplate jmsTemplate;
@Value("${jms.customer.destination}")
private String destination;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void receiveAndProcessCustomerUpdates() throws Exception {
Message message = this.jmsTemplate.receive(this.destination);
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message ;
String firstName = mapMessage.getString("firstName");
String lastName = mapMessage.getString("lastName");
String email = mapMessage.getString("email");
Long id = mapMessage.getLong("id");
Customer customer = new Customer(id, firstName, lastName, email );
log.info("receiving customer message: " + customer);
}
}
}
此示例使用JmsTemplate
实例在有可用消息时接收新消息,然后将其转换为Customer对象,步骤与发送消息时相反,并在日志中非常有用地写出。如果您必须重复多次,这种JMS消息的打包和解包就会变得乏味。将这种逻辑提取到单独的类中通常很有价值。Spring JMS层次结构支持使用MessageConverter
层次结构的实例,允许您覆盖对象的序列化方式。默认的——SimpleMessageConverter
——在没有其他指定的情况下生效,并且在大多数情况下都能很好地工作,因此我们在这里不覆盖它。但是,如果我们决定要将对象作为XML传输,我们可以利用MarshallingMessageConverter
,它利用Spring框架的OXM(对象到XML编组)支持。最后,请注意,receiveAndProcessCustomerUpdates
方法用@Transactional
注解修饰。如果接收消息时出现任何错误,并且抛出Exception
,Spring将回滚接收并将消息返回给代理。
这个例子很简单,但有一些局限性。首先,我们的代码与JMS和Spring API紧密耦合。其次,这只会处理一条消息,并且只有在调用该方法时才会处理。确保调用该方法是实现者的责任。通常,实现者希望尽快异步处理到达的消息。下一步自然可能是从一个无限循环中连续调用receive方法,以确保尽快处理队列中的所有消息。之后,为了提高特别长时间运行的任务的吞吐量并确保队列始终被清空,您可以添加线程,以便始终运行多个循环。这些是合乎逻辑的下一步,但它们只是为了接收和处理消息而做了很多工作。实际上,这里唯一的业务逻辑是获取消息有效负载并对其进行处理的代码。
Spring框架开箱即用地提供了解决此问题的方案,并且使用它很简单!Spring框架中提供了两种适用于不同情况的实现,它们提供了此功能。它们都植根于AbstractJmsListeningContainer
类。如果您愿意,可以直接使用此层次结构,但实际上,使用Spring的JMS命名空间配置它有一种更简单的方法。
让我们重新审视之前的Spring XML配置,添加http://www.springframework.org/schema/jms
命名空间,然后添加相应的配置。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/jms"
...
xsi:schemaLocation="… http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
….
<jms:listener-container connection-factory="connectionFactory" acknowledge="auto" transaction-manager="jmsTransactionManager">
<jms:listener destination="${jms.customer.destination}" ref="messageListenerContainerConsumer" method="receiveMessage" />
</jms:listener-container>
</beans>
我们只摘录了添加到配置文件中的部分。<jms:listener-container>
元素需要引用正在使用的连接工厂和事务管理器。请注意,Spring消息侦听器提供自己的缓存,因此您应该在此处使用常规的ConnectionFactory
:CachingConnectionFactory
在这里是多余的,不应使用。在这个元素中,您可以根据需要配置任意数量的<jms:listener>
元素,每个元素指定javax.jms.Destination
实例的名称和将接收新消息的Spring bean的引用。或者,您可以配置在配置的bean引用中应调用的方法。如果Spring bean实现了javax.jms.MessageListener
或Spring自己的SessionAwareMessageListener
接口中的任何一个,那么每个接口上的唯一方法都将使用javax.jms.Message
调用,并且无需指定方法。如果配置了方法,则该方法应将其参数作为与javax.jms.Message
有效负载相同类型的对象。对于我们的示例,这将是一个java.util.Map
实例,因为我们期望一个javax.jms.MapMessage
实例。
修改后的代码是
package org.springsource.greenbeans.examples.jms.messagelistenercontainer;
import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;
import java.util.Map;
@Component
public class MessageListenerContainerConsumer {
private Log log = LogFactory.getLog(getClass());
public void receiveMessage(Map<String, Object> message) throws Exception {
String firstName = (String) message.get("firstName");
String lastName = (String) message.get("lastName");
String email = (String) message.get("email");
Long id = (Long) message.get("id");
Customer customer = new Customer(id, firstName, lastName, email);
log.info("receiving customer message: " + customer);
}
}
不错吧?您的代码不知道JMS,甚至几乎不知道Spring(除了@Component
注解之外)。当然,您可以简单地使用XML或Java配置来配置此bean,并且也可以避免这种依赖关系。)此外,您的代码更容易理解。所有相同的规则都适用——例如,在接收过程中抛出的异常将触发回滚。您可以通过指定要使用的侦听器数量来提高并发性,使用XML <jms:listener-container>
元素。您还可以控制正在使用的哪种事务管理类型。
虽然JMS是一个非常强大的选项,但它并非没有局限性。客户端与代理的版本耦合,并且安排已部署系统的升级和代理的升级很快就会变得乏味。JMS本质上是Java中心的。客户端使用Java语言驱动程序连接到给定的代理。消息传递是关于集成的,我们不能总是假设我们正在与其他Java客户端集成,尤其是在存在许多不同平台的世界中。虽然一些JMS消息代理(甚至是开源的)可以扩展到令人难以置信的吞吐量,但实际上存在更快消息传递选项,如果您的情况需要,那么至少值得研究替代方案。JMS是一个很好的API,但没有人会称它为最好的API。因此,虽然许多消息代理支持JMS,但它们也支持其专有的API或更强大或更具表现力的替代API。一个例子是JMS消息发送后缺乏路由功能。
AMQP标准是一种满足这些挑战的流行选项。AMQP(高级消息队列协议)是一个标准,最初源于摩根大通银行在关键任务应用程序中面临的挑战。从他们的工作开始出现了一个规范,最终围绕它形成了一个工作组,该工作组今天包括许多公司,如高盛、Progress Software、微软、Novell、红帽、WS02、高盛、美国银行、巴克莱、思科、瑞士信贷、德意志交易所系统,当然还有VMware的SpringSource部门。SpringSource尤其开发了最流行的基于AMQP的消息代理实现RabbitMQ。
RabbitMQ是一个开源消息代理。它易于安装,特别是如果您运行的是许多系统中的一个,这些系统的包管理器已经提供了RabbitMQ。RabbitMQ是用Erlang语言编写的。通常,实现细节无关紧要,但这个特定细节很重要,因为RabbitMQ的速度很快。你看,Erlang是一种轻量级语言,最初部署在关键任务电话系统中。Erlang具有非常轻量级、直观的线程模型,这使得Erlang程序能够实现比JVM目前能够实现的并发性高得多的并发性。此外,Erlang的线程模型与其网络模型无缝融合。这意味着扩展到多个线程或多台机器的基本方式相同。所有这一切都是为了说明RabbitMQ速度很快。非常快,并且它对错误具有弹性,这是爱立信等公司能够获得九个九(99.9999999%)可用性的原因之一。
AMQP是一种线协议(如HTTP),而不是API。这使得它与语言无关(实际上,有几十个针对不同语言和平台的已知客户端),这意味着RabbitMQ在您通常不会期望关心消息代理的各种工具(如WireShark,一种网络流量监控工具)中得到了支持。从概念上讲,任何AMQP客户端都应该能够与任何其他AMQP实现通信。
AMQP规范指定客户端和服务器端的构造以及例行管理选项。在AMQP中,客户端创建到服务器的连接。客户端可以向交换机发送消息。交换机将消息路由到代理内的队列,或完全停止它们。交换机是无状态的守门人,而队列实际上是排队和存储消息的。
客户端可以从队列中使用消息。交换机和队列之间没有关系:您可以根据需要创建任意数量的队列,并将一个或多个队列绑定到交换机。交换机和队列之间的关系称为绑定。如果消息中的路由键与绑定匹配,则交换机最多将消息的一个副本传递到队列。这很重要,因为我之前说过,可以为单个队列指定多个交换机和绑定。多个匹配不会产生多条消息。交换机决定什么是匹配。有几个众所周知的交换机,它们指定不同的匹配算法。
javax.jms.Topic
最相似)javax.jms.Queue
最相似)。规范也允许添加特殊的交换机。例如,RabbitMQ添加了插件交换机,它基本上是第三方(或者RabbitMQ本身)提供额外功能的扩展点。这促进了不断增长的可安装插件列表,这些插件的功能无所不包,从发送XMPP消息、处理复制到显示用于管理的Web UI。
我们将研究使用Spring AMQP,这是一个Spring项目组合项目,它允许您使用RabbitMQ来完成规范规定的所有操作,还可以执行更高级的RabbitMQ特定操作。
让我们开始构建我们的示例——基本上与我们的JMS示例设计相同——使用RabbitMQ和Spring AMQP客户端。首先,您需要相应的依赖项。如果您使用Maven,请将以下依赖项添加到您的pom.xml
文件中。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${com.rabbitmq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
请务必为${spring.amqp.version}
和${com.rabbitmq.version}
属性占位符创建Maven属性,或者只需直接用相应的版本替换它们。在撰写本文时,${spring.amqp.version}
是1.0.0.M2
,${com.rabbitmq.version}
是2.1.0
。与我们在前面的示例中所做的一样,我们安装了一个简单的Spring XML配置文件来引导其他所有内容。唯一不同的是从使用<tx:annotation-driven>
元素引用的事务管理器实现名称、扫描的包和加载的属性文件的名称!因此,让我们不要在设置上浪费太多时间,直接进入基于AMQP的示例的配置。
package org.springsource.greenbeans.examples.amqp.core;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.*
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;
@Configuration
@SuppressWarnings("unused")
public class AmqpConfiguration {
@Value("${broker.url}")
private String brokerUrl;
@Value("${broker.username}")
private String username;
@Value("${broker.password}")
private String password;
@Value("${amqp.customer.queue}")
private String customerQueueName;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(this.singleConnectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new JsonMessageConverter();
}
@Bean
public ConnectionFactory singleConnectionFactory() {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(this.brokerUrl);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(this.rabbitTemplate());
}
@Bean
public Queue customerQueue() {
Queue q = new Queue(this.customerQueueName);
amqpAdmin().declareQueue(q);
return q;
}
@Bean
public DirectExchange customerExchange() {
DirectExchange directExchange = new DirectExchange(customerQueueName);
this.amqpAdmin().declareExchange(directExchange);
return directExchange ;
}
@Bean
public Binding marketDataBinding() {
return BindingBuilder.from(customerQueue()).to(customerExchange()).with(this.customerQueueName);
}
}
正如您所看到的,这里比我们的JmsTemplate
要复杂一些,但不要担心,大部分内容在形式和功能上与其JMS对应项相同。其余的只是细节。首先,我们配置通常的组件——TransactionManager
(RabbitTransactionManager
)、ConnectionFactory
实例和RabbitTemplate
。大部分内容应该不言自明。
让我们深入探讨一下不一致的地方。第一个细微之处是我们在RabbitTemplate
上配置了对JsonMessageConverter
的引用。记住:AMQP是语言和平台无关的。从Java发送到AMQP代理的消息很可能会被.NET、Python或PHP上的客户端使用。当消息打包并通过网络发送时,有效负载将作为字节流传递。消息的接收者需要能够将这些字节恢复为可以在接收者的平台上读取的内容。如果消息使用Java对象,则这些字节将是序列化后的Java对象,只有另一端具有相同类的Java客户端才能反序列化它。因此,就像在Spring JMS支持中一样,Spring AMQP提供了一个MessageConverter
层次结构。Spring AMQP层次结构具有MarshallingMessageConverter
和SimpleMessageConverter
,此外,它还具有JsonMessageConverter
(目前是Spring AMQP项目独有的),它可以将对象转换为JSON(JavaScript对象表示法格式,所有主要语言和平台都可以解析,并且比XML更简洁/更不容易出错)。在JMS中,智能序列化是效率和设计问题,但在AMQP中,它是一个更为紧迫的问题,因此请注意已配置的MessageConverter
。
您会在配置中找到四个在JMS示例中没有类似对象的组件。第一个是AmqpAdmin
。AMQP在协议级别定义了创建应用程序所需的所有内容的命令,包括交换机、队列和绑定。在Spring AMQP API中,AmqpAdmin
是这些命令的关键接口。
在customerQueue
方法中,我们配置了一个AMQP队列,并在customerExchange
方法中配置了一个DirectExchange
。最后,我们使用Spring AMQP 流畅的BindingBuilder
API将队列连接到我们的交换机。在我们的具体示例中——我们将带有路由键“customers”的消息发送到名为“customers”的队列。在我们的具体示例中,不需要声明除队列之外的任何内容,因为无名交换机将启动并根据路由键简单地路由消息。但是,即使有点冗余,了解如何操作也很有用。我们使用AmqpAdmin
实例来“声明”这些结构。这些对象是幂等的。您可以“声明”它们一百万次,如果它们已经存在,则除了其中一个声明之外,其他任何声明都不会发生任何事情,因此在应用程序启动时重复调用是无害的。此外,如果这些结构被设置为持久化,则甚至不需要每次都声明它们。
让我们看看如何发送消息。
package org.springsource.greenbeans.examples.amqp.core;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
@Component
public class Producer {
@Value("${amqp.customer.exchange}")
private String exchange;
@Value("${amqp.customer.queue}")
private String routingKey;
@Autowired
private RabbitTemplate rabbitTemplate;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void sendCustomerUpdate(Customer customer) {
log.info("sending customer update " + ToStringBuilder.reflectionToString(customer));
this.rabbitTemplate.convertAndSend(this.exchange , this.routingKey, customer);
}
}
在这个类中,我们使用RabbitTemplate
发送消息并将其转换为JSON。我们指定要使用的routingKey
和要使用的交换机(都是“customers”,与我们在配置中设置的绑定类型一致)。我们已将该类配置为使用@Transactional
,因此任何发送消息失败的行为都将与使用JMS失败时相同。
现在,让我们看看使用AMQP接收消息的选项。
package org.springsource.greenbeans.examples.amqp.amqptemplate;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
@Component
public class RawAmqpTemplatePollingMessageConsumer {
@Autowired
protected RabbitTemplate amqpTemplate;
@Value("${amqp.customer.queue}")
private String queue;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void receiveAndProcessCustomerUpdates() throws Exception {
Customer msg = (Customer)this.amqpTemplate.receiveAndConvert(this.queue);
log.info("receiving customer message: " + ToStringBuilder.reflectionToString( msg));
}
}
不出所料,这看起来几乎与第一个同步JMS示例相同(除了RabbitTemplate
)。我们避免了在第一个示例中必须处理的一些转换逻辑,但除此之外,它基本上是一样的。如果在消息接收时发生事务回滚,则消息将返回到队列的末尾,最终将被重新传递。
Spring AMQP也支持异步消息接收,就像Spring JMS支持一样。但是,由于Spring AMQP项目仍然是一个新兴项目,因此没有等效的命名空间支持。因此,我们需要自己配置对象。将以下内容添加到您的配置中。
@Autowired
private MessageListenerContainerConsumer messageListenerContainerConsumer;
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setTransactionManager(this.rabbitTransactionManager());
container.setConnectionFactory(singleConnectionFactory());
container.setQueueName(this.customerQueueName);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
this.messageListenerContainerConsumer, this.jsonMessageConverter());
container.setMessageListener(messageListenerAdapter);
return container;
}
此配置注入将执行处理的组件(如下所示,messageListenerContainerConsumer
实例通过组件扫描被拾取并自动注册到Spring,这就是我们在这里自动装配它的原因),然后配置一个SimpleMessageListenerContainer
实例,该实例将处理接收消息、管理事务以及在将消息传递给POJO之前转换传入的消息。
POJO本身如下所示
package org.springsource.greenbeans.examples.amqp.messagelistenercontainer;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;
@Component
public class MessageListenerContainerConsumer {
private Log log = LogFactory.getLog(getClass() );
public void handleMessage(Customer cu){
log.info("Received customer " + ToStringBuilder.reflectionToString(cu)) ;
}
}
此类比另一个类更能受益于消息转换器。在这里,我们可以声明一个采用Customer类型参数的方法,MessageListenerContainer
知道如何转换它然后将其传递给handleMessage
方法。但是,所有相同的规则都适用。异常将触发回滚等。
在这篇文章中,我们探讨了当今希望在其应用程序中结合企业消息传递的开发人员可用的两种选项,使用了Spring框架。我们介绍了用于处理企业消息的Java消息服务(JMS)API和高级消息队列协议(AMQP)。我们提供了使用核心Spring框架和Spring AMQP项目的同步和异步示例。我们讨论了消息传递如何帮助扩展应用程序以及它如何成为集成应用程序的一种便捷方式。我希望这将使您更容易理解使用企业消息传递时可能的选项,以及Spring如何使您更容易为您的应用程序做出正确的选择。像往常一样,这篇博文的代码可以在我们的Spring Samples存储库中找到。