使用 RabbitMQ 和 Riak 进行事件数据处理

工程 | Jon Brisbin | 2011年4月21日 | ...

随着新的应用程序利用像 RabbitMQ 这样的消息代理和像 Riak 这样的云规模数据存储的扩展优势,这两个组件不可避免地会成为亲密伙伴(那种真正面对面交流的伙伴,而不是只通过 Facebook 联系的伙伴)。

我们现在编写的许多应用程序都在同一个应用程序中包含了这两个功能。很多时候,我们希望根据消息更新数据,或者根据更新的数据发送消息。两个新的实用程序促进了 RabbitMQ 和 Riak 集成,允许您直接在各自的服务器中执行这些操作。

RabbitMQ 自定义交换机

实验性的 RabbitMQ Riak 自定义交换机的目的是将 AMQP 消息从代理发送到 Riak 集群。当然,您可以简单地将消费者绑定到特定的交换机并自行完成此操作。如果您正在执行任何类型的消息转换,那么您可能仍然需要在特殊的消费者中执行此操作。但是,为此目的使用专用的交换机类型使您可以灵活地控制消息最终到达的位置,而无需进行最少的配置或开销。您可以在传递给交换机声明操作的参数中指定 Riak 服务器的主机和 Protocol Buffers 端口,或者可以将这些信息作为 AMQP 消息头传递——或者您可以同时执行这两项操作。您可以在 AMQP 消息头中指定桶和键信息,或者您可以让交换机名称和路由键分别推断这些信息——或者您可以通过覆盖例如消息将存储在其中的桶名称来组合使用这两种方法。

消息传递到 Riak 后,交换机调用主题交换机路由逻辑,这意味着此交换机的工作方式与普通主题交换机相同——除了它会将接收到的所有消息存储在 Riak 中。在不久的将来,将添加对 RabbitMQ 支持的所有交换机类型的支持,而不仅仅是主题交换机类型。

在内部,交换机使用连接池将消息发送到 Riak。要将池中可用客户端的数量扩展到默认的五个之外,只需在声明交换机时设置“maxclients”参数。

RabbitMQ Riak postcommit Hook

另一方面是您可以安装到 Riak 服务器中的 postcommit hook,每当修改该条目时,它都会将任何更新的 Riak 对象发送到 RabbitMQ 服务器。

要指定发送此消息的位置,您可以在条目的“X-Riak-Meta-”类型的各种元数据头中包含许多不同的内容。以下是可以在每个单独的条目上或具有“AMQP-Meta”键的该桶中的特殊文档上设置的所有可能选项的完整列表。如果实际条目上不存在任何元数据头,这将提供一组默认的元数据头,系统将检查这些头。

识别的标头完整列表为

  • X-Riak-Meta-Amqp-Exchange
  • X-Riak-Meta-Amqp-Routing-Key
  • X-Riak-Meta-Amqp-Host
  • X-Riak-Meta-Amqp-Port
  • X-Riak-Meta-Amqp-Vhost
  • X-Riak-Meta-Amqp-User
  • X-Riak-Meta-Amqp-Password
  • X-Riak-Meta-Amqp-Ignore

大多数这些选项都是自记录的。需要注意的一个选项是“X-Riak-Meta-Amqp-Ignore”标头。通过将其设置为“true”值,RabbitMQ postcommit hook 将忽略对该条目的任何更新,并且不会像通常那样发送消息。

我可以用它做什么?

当然,这个概念很简单,但其影响却很深远。

处理 RabbitMQ 的集群时遇到的问题之一是它在幕后使用 mnesia。在许多分布式设置中,这并不总是理想的。特别是偶尔连接的 WAN 节点可能会因为没有与其他代理建立稳定的连接而受到影响。

通过指定发送 Riak 更新的 RabbitMQ 服务器,实际上可以设置如下所示的场景

Riak Shovel Diagram

此图中的两个 RabbitMQ 服务器没有集群。通过结合 RabbitMQ 的 Riak 交换机类型和 Riak 的 RabbitMQ postcommit hook,这两个服务器上的消费者将以类似于 RabbitMQ 的 shovel 插件 的方式接收消息。

请记住,此自定义交换机类型不会解决跨 WAN 通信(或在节点随意进出任何场景,例如动态扩展情况)的根本问题。当然,每件事都有权衡,所以您的里程可能会有所不同。目前,这两个实用程序都没有处理重试或重新发送。还没有官方路线图,但如果有的话,重试将位列首位。

使用 Riak 支持的消息交换机的一个好处是所有消息都已存储。由于 Riak 是一个云规模的数据存储,因此您可以保留交换机接收的每条消息,而无需担心存储开销(只需添加更多 Riak 服务器即可获得更大的容量)。这也意味着您可以编写一个简单的 Web 界面来显示这些消息,并且只需更新消息,您就可以重新发送任何(或所有!)消息。这可能是一种很好的方法,可以重播一组消息以进行测试,或者如果您想通过更改一个元数据头以指向不同的代理来将所有消息重新发送到其他位置。

这只是对一个全新的但可能功能强大的工具的快速介绍。我相信您已经对如何将此方法应用于您自己的问题领域有了想法。我很乐意听听您打算如何做到这一点!发送一条推文 (@j_brisbin) 告诉我您正在做什么。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部