用于构建 RabbitMQ AMQP 应用程序的简单 Groovy DSL

工程 | Jon Brisbin | 2011年6月1日 | ...

异步应用程序在开发过程中有时会带来挑战,因为您通常需要两个独立的组件才能查看完整的报文发布和消费生命周期。您经常会编写一个能够将报文转储到 System.out 或日志文件的消费者,只是为了确保您的发布者正在执行正确的操作。如果您能够在一个组件中模拟报文发布和消费交互,以便实际查看正在发生的情况,那就非常方便了。

RabbitMQ Groovy DSL 旨在通过提供一种非常简洁且易于使用的 DSL 语言来创建报文消费者和生产者来帮助解决这个问题,以便您可以快速模拟组件之间的报文交互,而无需编写任何样板代码。

使用交换机

RabbitMQ DSL 中的顶级节点是exchange节点。除了设置其作用域内节点继承的名称外,它还在您的代理中声明交换机。


mq.exchange(name: "myexchange") {
  
}

默认情况下,它将声明一个direct交换机。其他交换机类型通过type属性支持。


mq.exchange(name: "myexchange", type: "topic") {
  
}

每当您在exchange节点的作用域内使用queueconsumepublish节点(我们将在稍后讨论)时,您的交换机的名称将从该节点“继承”,因此您无需重复它。

使用队列

在模拟应用程序中发送和接收报文的下一步是声明一个队列,您的报文将被传递到该队列。您可以使用queue节点执行此操作。


mq.exchange(name: "myexchange") {

  queue(name: "myqueue", routingKey: "test") {
    
  }
  
}

exchange节点的作用域内声明此队列也将使其绑定到封闭的交换机。routingKey属性的值将用于声明此绑定。

此示例使用命名队列,但您也可以通过将name属性设置为 null 来获得匿名的、服务器生成的队列。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
  }
  
}

此匿名队列的名称在内部跟踪,因此,只要您在此节点的作用域内声明消费者和发布者,您就不需要知道它是什么。但是,如果您想编写一些需要匿名队列名称的辅助函数,只需将您的节点设置为变量。此变量的值将是一个 Spring AMQP Queue 对象,该对象上有一个名为(具有讽刺意味的是)name的属性。


mq.exchange(name: "myexchange") {

  Q = queue(name: null, routingKey: "test") {
    
  }
  
  println "queue name is: ${Q.name}"
  
}

创建队列消费者

为了处理传入的报文,您需要声明一个消费者。消费者的 DSL 在您将代码附加到接收到报文时执行的方式上非常灵活。在幕后,consume 只是一个 Spring AMQP SimpleMessageListenerContainer),并且表示它的 consume 节点采用几种不同的形式。

使用 Groovy Closure

声明消费者的最简单方法是只使用 Closure 作为方法,该方法在接收到报文时执行。此 closure 的唯一参数将是一个 Spring AMQP Message 对象。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
    }
    
  }
  
}

使用事件

RabbitMQ Groovy DSL 实际上功能齐全,足以编写完整的生产应用程序,尽管我们在这篇文章中只关注模拟应用程序。DSL 的一个特性是事件的概念。事件在报文生命周期的某些固定时间(在发布报文之前和之后以及发生错误时)调度,并且可以像报文消费者一样处理自定义事件。

要声明事件处理程序,您可以使用on节点(按照约定,您可能希望将其放在源文件的顶部)。


mq.on   error: { err -> err.printStackTrace() },	
      myevent: { msg -> println "myevent: ${new String(msg.body)}" }

这声明了两个事件处理程序:一个用于发生的任何异常,另一个在我们收到报文时可以委托给它。由于在这种情况下我们只将报文打印到 System.out,因此我们可以轻松地在消费者之间共享代码。

为了告诉我们的消费者在收到报文时使用此事件处理程序,我们使用consume节点的onmessage属性。


mq.on   error: { err -> err.printStackTrace() },	
      myevent: { msg -> println "myevent: ${new String(msg.body)}" }

mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume onmessage: "myevent"
    
  }
  
}

使用 Closure、MessageListener 或 POJO

但是,您可以将 onmessage 属性设置为不仅仅是一个字符串。为了灵活,您可以将其设置为以下之一:

继续侦听报文

除非您从 Closure 或事件处理程序返回false或 null 值,否则您的消费者将继续侦听报文。要使您的消费者保持活动状态并等待报文,只需返回true或非 null 值即可。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
      // Keep listening for messages and don't exit
      return true
    }
    
  }
  
}

如果从 Closure 返回false或 null,消费者将退出。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
      // I'm done with you, please exit
      return false
    }
    
  }
  
}

命令行执行程序使用引用计数系统来确定任何消费者是否仍然处于活动状态。从标准 Groovy Closure 返回false或 null 将告诉调用者停止消费者的内部MessageListenerContainer。但是,在使用MessageListener实现时需要注意的一点是,您必须自行关闭消费者。

consume DSL 节点将返回一个特殊的Consumer对象,该对象公开了一个名为shutdown的方法,该方法负责关闭MessageListenerContainer。如果您自己实现MessageListener,则必须在您希望消费者退出时调用此方法,否则系统将不知道您何时完成,并且永远不会为您关闭消费者。

consume节点设置为变量,并在其上调用shutdown方法。


mq.exchange(name: "myexchange") {

	def consumer
	def listener = [
    onMessage: { msg ->
      println "Invoked from a standard MessageListener"
      consumer?.shutdown()
    }
  ] as MessageListener

	queue(name: null, routingKey: "test.key") {
		consumer = consume onmessage: listener
	}
  
}

发布报文

使用 DSL 发布报文与使用 DSL 消费报文一样容易。publish基本上有两种变体。


mq.exchange(name: "myexchange") {

  // Return a String, a byte array, or an instance of a Spring AMQP Message
	publish(routingKey: "test.key") {
		"this is from a publish"
	}

  // Write raw bytes to a ByteArrayOutputStream
	publish(routingKey: "test2.key", myHeaderValue: "customHeader", contentType: "text/plain") { out ->
		out.write("these are test bytes".bytes)
	}
  
}

在第一个示例中,我们返回一个用作报文正文的字符串(我们也可以使用byte[])。在第二个示例中,我们设置标准报文标头(在本例中为contentType)以及自定义应用程序标头,并且我们可以写入传递给 Closure 的ByteArrayOutputStream

将所有内容整合在一起

请注意,您不必将发布和消费拆分为两个单独的源文件。您可以将这两个函数一起包含,以便很好地了解您首先尝试使用报文实现的目标。


mq.on error: { err -> err.printStackTrace() }

mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
    }    
  }

	publish(routingKey: "test") {
		"this is from a publish"
	}
  
}

尽管我们专注于模拟以后可能使用纯 Java(甚至完全使用其他语言)更强大地构建的应用程序,但 RabbitMQ DSL 也可用于编写简单的维护应用程序,或任何需要报文消费和发布但又不想花费大量精力编写完整报文应用程序的报文应用程序。

在哪里可以获取它?

RabbitMQ DSL 在 GitHub 上可用,并采用 Apache 许可证。安装说明在自述文件中。

并且感谢 Joris Kuipers 最近的贡献,您可以在 Eclipse 和 STS 中获得 一些 IDE 代码完成支持,方法是使用包含的 rabbitmq.dsld

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

获取支持

Tanzu Spring在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多信息

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部