领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多为高度可扩展的系统设计良好的路由拓扑就像绘制图表一样。需要考虑很多因素,例如问题、环境约束、消息传递实现的约束以及性能策略。我们经常遇到的问题是,路由的灵活性与表达能力不足以满足我们的需求。RabbitMQ 在这里脱颖而出。
api.agents.agent-{id}.operations.{operationName}
在更复杂的情况下,路由键可以与消息头字段和/或其内容上的路由相结合。交换机检查消息的属性、头字段、正文内容以及可能来自其他来源的数据,然后决定如何路由消息。从上述路由键思想派生的绑定模式可能如下所示: api.agents..operations.
,其中我们将交换机 E1
绑定到队列 Q1
,绑定模式为 api.agents..operations.
,以便发送到 E1
的任何消息,如果其路由键与绑定模式匹配,则路由到 Q1
。
Rabbit 代理的结构与 JMS 代理不同。每个 RabbitMQ 服务器至少包含一个节点(代理),或者更常见的是,集群中的多个节点。每个节点都有一个默认的虚拟主机“/”,并且可以创建其他虚拟主机,例如“/develoment”。Rabbit 虚拟主机类似于 Tomcat 虚拟主机,并将代理数据划分为子集。在这些虚拟主机中是交换机和队列。当用户使用其凭据连接时,它正在连接到 Rabbit 节点上的虚拟主机。
在这里,我们连接到一个 Rabbit 节点,声明要发布到的交换机、要从中消费的队列、绑定模式,然后发布一些消息,使用 RabbitMQ Java 客户端 API
package org.demo.simple.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public final class RocketSender {
public void sendRockets() throws IOException {
List<String> rocketsWithRoutings = new RocketRouter().build();
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
String rocketExchange = "rockets.launched";
channel.exchangeDeclare(rocketExchange, "topic");
String rocketQueue = channel.queueDeclare().getQueue();
channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");
for (String rocketTo : rocketsWithRoutings) {
channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
}
channel.close();
connection.close();
}
}
对“着陆”火箭的简单消费可能如下所示
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);
int landed = 0;
while (landed < launched) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
String rocketLanded = new String(delivery.getBody());
if (rocketLanded.equalsIgnoreCase("Alderaan")) {
System.out.println("That's no moon, that's a space station.");
}
landed++;
}
为了简单起见,让我们考虑两种策略
请注意绑定抖动。在策略 2 中,如果您创建了许多新的队列及其绑定,那么每当消费者连接时,您可能会遇到问题。例如,给定正在发布许多消息的交换机 E1...En
,每当消费者 Cm
连接时,它都会从其自己的队列到所有 E1...En
创建绑定,这可能会导致问题,具体取决于连接速率。
为了缓解绑定抖动,请考虑从版本 2.3.1 开始的新功能:交换机到交换机的绑定。每个消费者都可以拥有自己的辅助交换机 Ym
,该交换机不得自动删除。然后将所有 E1...En
绑定到 Ym
。这样,这些绑定始终存在。在这种情况下,每当消费者 Cm
连接时,它只需要声明其队列并将该队列绑定到 Ym
即可。如果 Ym 是一个扇出交换机,它将非常快,并将绑定抖动率降低到每个连接 1 次,而不是可能每个连接 n 次。
现在考虑创建共享主题交换机:一个用于代理到服务器路径的交换机,另一个用于服务器到代理路径的交换机,以及一个用于处理未经身份验证的代理的交换机,它仅路由到不需要安全性的那些队列。现在我们使用绑定模式、消息路由键进行分区,并为每个服务器设置一组这些模式,以便与连接到它的所有代理共享。然后,在最简单的形式中,当每个代理上线时,它都会声明一个私有交换机和队列,并将它的交换机绑定到共享主题交换机。
我们现在通过交换机到交换机的映射来表达我们的关系,这减少了抖动率,并使代理与“了解”服务器队列脱钩。使用此模式,系统干净、解耦且可扩展。
经过身份验证的客户端交换路由代理到服务器的消息。它处理所有将消息发布到单一消费者队列的操作,包括那些产生最高消息频率的操作。在当前拓扑结构下,这可能是一个瓶颈,对于 10,000 个客户端来说,每分钟大约有 60,000 条消息,每天有 86,400,000 条消息。这个问题很容易解决,RabbitMQ 每天可以处理超过 10 亿条消息,具体取决于您的配置,例如您是否持久化消息。
我们的服务器应用程序正在运行一个 RabbitMQ 集群。请记住,在集群中,声明交换导致它出现在所有节点上,而声明队列仅在一个节点上创建它,因此我们必须配置一个解决方案。
RabbitMQ 会动态地将消息推送到磁盘以释放 RAM,因此队列的内存占用量不依赖于其内容。队列空闲 10 秒或更长时间后,它将“休眠”,这会导致该队列上的 GC。结果,队列所需的内存量可以大幅缩小。例如,1000 个空闲的空队列可能占用 10MB 的 RAM。当它们都处于活动状态时(即使是空的),它们当然也可能会根据内存碎片消耗更多的内存。强制它们回到休眠状态以测试行为非常困难,因为 Erlang VM 不会立即将内存还给操作系统。
但是,您可以观察到一个休眠且内存非常碎片化的巨大进程,因为回收的内存量可能足以迫使 VM 将内存还给操作系统。如果运行一个稳步增加 Rabbit 内存占用的测试,您可以观察到休眠对空闲进程的影响,因为它降低了内存使用率的增长速度。
Erlang 是一个多线程 VM,它利用了多个内核。它向开发人员提供绿色线程,称为“进程”,因为与线程不同,它们在概念上不共享地址空间。这里有一个关于Erlang VM 和进程的有趣自白。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>