使用 RabbitMQ 实现高性能和可扩展性的路由拓扑

工程 | Helena Edelson | 2011年4月1日 | ...

为高度可扩展的系统设计良好的路由拓扑就像绘制图表一样。需要考虑很多因素,例如问题、环境约束、消息传递实现的约束以及性能策略。我们经常遇到的问题是,路由的灵活性与表达能力不足以满足我们的需求。RabbitMQ 在这里脱颖而出。

基本概念

任何熟悉消息传递的人都知道将消息从 A 路由到 B 的概念。路由可以很简单,也可以非常复杂,在为可扩展的复杂系统设计路由拓扑时,它必须优雅。保持组件的干净和解耦,可以使它们在不同的负载下很好地进行节流。这可以用简单的映射或复杂的图形来表示。在最简单的形式中,路由拓扑可以表示为节点,例如分层节点。

Hierarchical nodes in message routing topology

对于 RabbitMQ 或 AMQP(请注意,Rabbit 支持许多协议,包括 STOMP、HTTP、HTTPS、XMPP 和 SMTP)的新手,以下是一些基本组件描述
  • 交换机(Exchange) 服务器内部的实体,它接收来自生产者应用程序的消息,并选择性地将这些消息路由到服务器内的消息队列。
  • 交换机类型(Exchange type) 特定交换机模型的算法和实现。与“交换机实例”相反,“交换机实例”是在服务器内接收和路由消息的实体。
  • 消息队列(Message queue) 一个命名的实体,用于保存消息并将消息转发给消费者应用程序。
  • 绑定(Binding) 创建消息队列和交换机之间关系的实体。
  • 路由键(Routing key) 交换机可能用来决定如何路由特定消息的虚拟地址。
对于点对点路由,路由键通常是消息队列的名称。对于主题发布-订阅路由,路由键通常具有分层性质。

api.agents.agent-{id}.operations.{operationName}

在更复杂的情况下,路由键可以与消息头字段和/或其内容上的路由相结合。交换机检查消息的属性、头字段、正文内容以及可能来自其他来源的数据,然后决定如何路由消息。从上述路由键思想派生的绑定模式可能如下所示: api.agents..operations.,其中我们将交换机 E1 绑定到队列 Q1,绑定模式为 api.agents..operations.,以便发送到 E1 的任何消息,如果其路由键与绑定模式匹配,则路由到 Q1

Rabbit 代理的结构与 JMS 代理不同。每个 RabbitMQ 服务器至少包含一个节点(代理),或者更常见的是,集群中的多个节点。每个节点都有一个默认的虚拟主机“/”,并且可以创建其他虚拟主机,例如“/develoment”。Rabbit 虚拟主机类似于 Tomcat 虚拟主机,并将代理数据划分为子集。在这些虚拟主机中是交换机和队列。当用户使用其凭据连接时,它正在连接到 Rabbit 节点上的虚拟主机。

在这里,我们连接到一个 Rabbit 节点,声明要发布到的交换机、要从中消费的队列、绑定模式,然后发布一些消息,使用 RabbitMQ Java 客户端 API

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

对“着陆”火箭的简单消费可能如下所示


 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
 channel.basicConsume(rocketQueue, false, queueingConsumer);

 int landed = 0;
 while (landed < launched) {
     QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     String rocketLanded = new String(delivery.getBody());

     if (rocketLanded.equalsIgnoreCase("Alderaan")) {
         System.out.println("That's no moon, that's a space station.");
     }
     landed++;
 }

问题

在考虑哪些路由策略在可扩展的环境中性能最佳(性能本身也可以得到改进)时,有很多选择。消息传递的一个优点是可用的配置种类繁多,并且可以找出解决当前和未来需求的正确配置。

为了简单起见,让我们考虑两种策略

  1. 使用分层路由键的高度分区路由,较少的主题交换机
  2. 大量直接交换机和队列,路由分区更少
每个场景都遵循以下用例:每个必须扩展的应用程序既是生产者又是消费者

从哪里开始

在深入研究随着时间推移可以干净高效地扩展的路由解决方案之前,最好先了解您的环境及其组件。例如,什么有助于扩展?通常,解耦、分布、异步、并行、抽象级别和间接级别等等。然后考虑哪些元素是当前或潜在的瓶颈。一个基本原则是,高流量/高容量的路径需要更高效的吞吐量,否则您将面临分布瓶颈的风险。一项练习是根据流量或热图对这些路径进行排名。接下来,您可以对您的流量进行分类——是否存在总体模式、主题或类似的消息类型,以及它们之间的关系是什么?现在开始考虑整合,如何在何处提高效率,并应用经过测试的模式来解决这些热点问题,为了扩展而解耦,并提高性能。

一般路由注意事项

所有交换机类型行为都不同。以下是一些通用规则
  • 如果应用程序图中路由键的域是有限的,那么许多扇出交换机可能是正确的选择(每个路由键的 1:1 映射)
  • 如果路由键的数量可能无限,请考虑主题交换机
  • 对于主题路由,绑定数量增加会导致性能下降
  • 扇出交换机速度非常快,因为它们没有要处理的路由,但如果绑定到大量队列,情况就会发生变化
  • 直接交换机是主题交换机的一种更快形式,前提是您不需要通配符
  • 跨 100,000 多个队列排除故障可能很繁琐,而具有更多绑定、更少交换机和队列的拓扑则不然
  • 数量非常多的交换机和队列会占用更多内存,这可能很重要,但这确实取决于具体情况
从 2011 年 3 月 23 日发布的 RabbitMQ 2.4.0 版本开始,提供了一种新的主题路由算法优化,在峰值时速度比以前的主题算法快 60 倍。因此,建议使用更少的交换机和队列,以及更多的路由,因为现在时间增加最少。

性能

什么是廉价的?

就内存成本而言,交换机和绑定。在 RabbitMQ 所构建的 Erlang 中,每个节点(代理)都是一个进程,每个队列也是如此。默认情况下,Erlang VM 进程限制设置为 1M,可以提高该限制。但是,交换机不是为了可扩展性而设计的进程,它只是 RabbitMQ 内置的 Mnesia 数据库中的一行。在集群中,声明交换机会导致它出现在集群的所有节点上,而声明队列则仅在一个节点上创建它。这解释了为什么交换机可以在节点重新启动或在集群中创建节点后继续存在,而队列却不能。

请注意绑定抖动。在策略 2 中,如果您创建了许多新的队列及其绑定,那么每当消费者连接时,您可能会遇到问题。例如,给定正在发布许多消息的交换机 E1...En,每当消费者 Cm 连接时,它都会从其自己的队列到所有 E1...En 创建绑定,这可能会导致问题,具体取决于连接速率。

为了缓解绑定抖动,请考虑从版本 2.3.1 开始的新功能:交换机到交换机的绑定。每个消费者都可以拥有自己的辅助交换机 Ym,该交换机不得自动删除。然后将所有 E1...En 绑定到 Ym。这样,这些绑定始终存在。在这种情况下,每当消费者 Cm 连接时,它只需要声明其队列并将该队列绑定到 Ym 即可。如果 Ym 是一个扇出交换机,它将非常快,并将绑定抖动率降低到每个连接 1 次,而不是可能每个连接 n 次。

Exchange-to-Exchange Binding

用例

交换机到交换机可扩展用例

考虑一个具有自治代理的服务器应用程序。每个代理都在一个虚拟机上,该虚拟机是弹性扩展系统的一部分。当每个代理启动时,它会向服务器发送一条消息,表明它已联机,然后发送许多其他消息,例如身份验证和数据传输。如果我们有 1,000 个代理,每个代理都声明 50 个直接交换机、队列和绑定,那么每个代理必须知道服务器的队列才能在 queue.declare 操作上履行绑定约定。这不是一个可扩展的解决方案。

现在考虑创建共享主题交换机:一个用于代理到服务器路径的交换机,另一个用于服务器到代理路径的交换机,以及一个用于处理未经身份验证的代理的交换机,它仅路由到不需要安全性的那些队列。现在我们使用绑定模式、消息路由键进行分区,并为每个服务器设置一组这些模式,以便与连接到它的所有代理共享。然后,在最简单的形式中,当每个代理上线时,它都会声明一个私有交换机和队列,并将它的交换机绑定到共享主题交换机。

我们现在通过交换机到交换机的映射来表达我们的关系,这减少了抖动率,并使代理与“了解”服务器队列脱钩。使用此模式,系统干净、解耦且可扩展。

弹性扩展用例

让我们将前面的场景更进一步。我们已经在方案 2 中使用主题发布-订阅路由:许多直接路由。现在假设系统需求增加到扩展数据中心中服务器应用程序的集群,其中有 50,000 个或更多代理。我们如何控制不同的负载?

经过身份验证的客户端交换路由代理到服务器的消息。它处理所有将消息发布到单一消费者队列的操作,包括那些产生最高消息频率的操作。在当前拓扑结构下,这可能是一个瓶颈,对于 10,000 个客户端来说,每分钟大约有 60,000 条消息,每天有 86,400,000 条消息。这个问题很容易解决,RabbitMQ 每天可以处理超过 10 亿条消息,具体取决于您的配置,例如您是否持久化消息。

我们的服务器应用程序正在运行一个 RabbitMQ 集群。请记住,在集群中,声明交换导致它出现在所有节点上,而声明队列仅在一个节点上创建它,因此我们必须配置一个解决方案。

生产者和消费者之间的负载均衡

为了有效地处理这些可能非常高的负载(随着更多客户端应用程序(代理)上线),我们可以通过多种方式修改此拓扑结构。首先,从上述配置到跨 Rabbit 集群负载均衡消息的优化。我们可以为 Rabbit 集群中的每个节点创建一个队列。如果我们有四个节点,对于每个高流量队列,我们为该操作创建 hfq.{0,1,2,3}。现在,每个代理可以通过 0 到 3 之间的数字随机选择一个节点,或者使用更复杂的循环轮询实现,来发布消息。在 RabbitMQ 中,存在 RPC 调用,或者您可以使用Rabbit 管理插件获取节点数量,这可以在您的循环轮询算法中使用。

带有循环轮询调度的 Worker 队列

Worker 队列或任务队列通常用于在多个工作进程之间有效地分配耗时的任务,并轻松地并行化工作。此外,此拓扑结构适用于消除执行资源密集型任务和必须阻塞直到它们完成的需要。运行多个工作进程队列允许这些任务在它们之间分配。

使用 Worker 队列,默认情况下,Rabbit 使用循环轮询分配方法,将每条消息发送到下一个消费者。每个消费者接收大致相同数量的消息。如果声明一个队列并启动 3 个竞争的消费者,将它们绑定到交换机,并发送 20,000 条消息,消息 0 将路由到第一个消费者,消息 1 到第二个,消息 2 到第三个,依此类推。如果我们开始积压任务,我们可以简单地添加更多工作进程,从而使系统能够轻松扩展。

性能

内存

以上两种选项都不一定会导致 RabbitMQ 中的高负载。交换机和队列的数量没有硬性限制,可以创建,在一个代理上运行 100,000 个队列是可以的。通过正确的调整和足够的 RAM,您可以运行超过一百万个队列。

RabbitMQ 会动态地将消息推送到磁盘以释放 RAM,因此队列的内存占用量不依赖于其内容。队列空闲 10 秒或更长时间后,它将“休眠”,这会导致该队列上的 GC。结果,队列所需的内存量可以大幅缩小。例如,1000 个空闲的空队列可能占用 10MB 的 RAM。当它们都处于活动状态时(即使是空的),它们当然也可能会根据内存碎片消耗更多的内存。强制它们回到休眠状态以测试行为非常困难,因为 Erlang VM 不会立即将内存还给操作系统。

但是,您可以观察到一个休眠且内存非常碎片化的巨大进程,因为回收的内存量可能足以迫使 VM 将内存还给操作系统。如果运行一个稳步增加 Rabbit 内存占用的测试,您可以观察到休眠对空闲进程的影响,因为它降低了内存使用率的增长速度。

Erlang 是一个多线程 VM,它利用了多个内核。它向开发人员提供绿色线程,称为“进程”,因为与线程不同,它们在概念上不共享地址空间。这里有一个关于Erlang VM 和进程的有趣自白。

事务

10,000 条消息的事务可能需要长达四分钟才能发布。一个名为发布者确认的新 RabbitMQ 功能比相同但具有事务性的代码快 100 多倍。如果您没有明确要求实现事务但确实需要验证,则可以考虑此选项。

要点

以下是一些最终要点,可帮助您从实现中获得最大的性能提升
  • 新的主题路由算法优化在峰值时快了 60 倍
  • 使用通配符“*”(匹配单个单词)的主题绑定模式比“#”(匹配零个或多个单词)快得多。通配符“#”在路由表中的处理时间比“*”长
  • 交换机到交换机的绑定提高了解耦,增加了拓扑结构的灵活性,减少了绑定变化,并有助于提高性能
  • 交换机和绑定非常轻量级
  • RabbitMQ 发布者确认比 AMQP 事务快 100 多倍
  • 队列空闲 >= 10 秒后,它将“休眠”,导致队列上的 GC,从而导致该队列所需的内存量大幅减少
  • Worker 队列有助于并行化和分配工作负载
  • 在 Rabbit 集群中分配工作进程队列有助于扩展
  • 负载均衡您的拓扑结构
这绝不是关于该主题的论文,实际上还有许多其他模式、拓扑结构和性能细节需要考虑。策略,一如既往,取决于许多因素,但我希望这足以提供帮助,或者至少让大家朝着正确的方向思考。

获取

GitHub 上的 RabbitMQ 源代码 RabbitMQ 二进制下载和插件 Erlang 下载 Spring AMQP Java 和 .NET 中 RabbitMQ 的 API Hyperic 监控 RabbitMQ Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>

获取 Spring 电子报

与 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部