Spring 提示:Apache RocketMQ

工程 | Josh Long | 2020 年 2 月 25 日 | ...

嗨,Spring 粉丝!在本期 Spring 提示中,我们将了解阿里巴巴的 Apache RocketMQ。我们之前在 Spring 提示中讨论过一些关于阿里巴巴的内容。查看之前 Spring 提示的版本,其中我们探讨了一些 Spring Cloud Alibaba

运行 Apache RocketMQ

为了使用 Apache RocketMQ,您需要按照RocketMQ 快速入门中的步骤操作。本期 Spring 提示介绍了 Apache RocketMQ,它最初是由阿里巴巴内部开发和使用的技术,并在 11/11(著名的中国购物节,有点像美国的“网络星期一”或“黑色星期五”)的考验中得到了验证。有点像那样,但规模要大得多。2019 年,阿里巴巴(仅阿里巴巴一家,没有其他电子商务引擎参与)在 24 小时内创造了近 400 亿美元的销售额。这需要通过能够扩展以满足需求的某种东西发送数万亿条消息。RocketMQ 是他们唯一可以信任的东西。

运行 Apache RocketMQ 时,您需要使用 Java 8。(当然,在编写连接到 Apache RocketMQ 的 Spring 应用程序时,您可以使用任何版本的 Java。)我使用 SDK Manager(“SDKman” - sdk)切换到适当版本的 Java。

sdk use java 8.0.242.hs-adpt

如果尚未安装,这将安装一个可用的版本。完成后,您需要运行 NameServer。

${ROCKETMQ_HOME}/bin/mqnamesrv 

然后,您需要运行 Broker 本身。

${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876

如果要使用基于 SQL 的过滤,则需要向代理的配置$ROCKETMQ_HOME/conf/broker.conf中添加一个属性,然后告诉 RocketMQ 使用该配置。

enablePropertyFilter = true

我使用类似这样的脚本启动所有内容。

export JAVA_HOME=$HOME/.sdkman/candidates/java/8.0.242.hs-adpt
${ROCKETMQ_HOME}/bin/mqnamesrv &  
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c ${ROCKETMQ_HOME}/conf/broker.conf

从 Java 代码中使用 Apache RocketMQ

让我们来看一个简单的生产者类,它使用 Spring Boot 自动配置和RocketMQTemplate

为了使用它,您需要在Spring Initializr上创建一个新项目。我使用最新版本的 Java 生成了一个新项目,然后确保包含了Lombok。我们还需要 Apache RocketMQ 客户端和相应的 Spring Boot 自动配置

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.0.4</version>
</dependency>

自动配置将根据某些属性创建一个到正在运行的 Apache RocketMQ 代理的连接。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=greetings-producer-group

第一个属性name-server告诉应用程序 Apache RocketMQ nameserver 在哪里。然后,nameserver 知道代理在哪里。您还需要为生产者和消费者指定一个组。这里,我们使用greetings-producer-group


package com.example.producer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;

import java.time.Instant;

@RequiredArgsConstructor
@SpringBootApplication
public class ProducerApplication {

	@Bean
	ApplicationListener<ApplicationReadyEvent> ready(RocketMQTemplate template) {
		return event -> {

			var now = Instant.now();
			var destination = "greetings-topic";

			for (var name : "Tammie,Kimly,Josh,Rob,Mario,Mia".split(",")) {

				var payload = new Greeting("Hello @ " + name + " @ " + now.toString());
				var messagePostProcessor = new MessagePostProcessor() {

					@Override
					public Message<?> postProcessMessage(Message<?> message) {
						var headerValue = Character.toString(name.toLowerCase().charAt(0));
						return MessageBuilder
							.fromMessage(message)
							.setHeader("letter", headerValue)
							.build();
					}
				};
				template.convertAndSend(destination, payload, messagePostProcessor);
			}
		};
	}

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
	private String message;
}

我不知道是否还能比这更简单了!这是一个简单的 for 循环,处理每个名称,创建一个新的Greeting对象,然后使用RocketMQTemplate将有效负载发送到 Apache RocketMQ 主题greetings-topic。这里,我们使用了RocketMQTemplate对象的重载,该对象接受一个MessagePostProcessorMessagePostProcessor是一个回调,我们可以在其中转换将要发送的 Spring Framework Message对象。在此示例中,我们贡献了一个标题值letter,其中包含名称的第一个字母。我们将在消费者中使用它。

让我们看看消费者。从 Spring Initializr 生成一个新的 Spring Boot 应用程序,并确保添加了 Apache RocketMQ 自动配置。您还需要在客户端的application.properties中指定 nameserver。

自动配置支持定义实现RocketMQListener<T>的 bean,其中T是消费者将接收的有效负载的类型。在这种情况下,有效负载是Greeting

package com.example.consumer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

import static org.apache.rocketmq.spring.annotation.SelectorType.SQL92;

@SpringBootApplication
public class ConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConsumerApplication.class, args);
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
	private String message;
}

@Log4j2
@Service
@RocketMQMessageListener(
	topic = "greetings-topic",
	consumerGroup = "simple-group"
)
class SimpleConsumer implements RocketMQListener<Greeting> {

	@Override
	public void onMessage(Greeting greeting) {
		log.info(greeting.toString());
	}
}

在此示例中,SimpleConsumer仅记录来自 Apache RocketMQ 中greetings-topic主题的所有传入消息。在这里,消费者将处理主题上的每条消息。让我们看看另一个不错的功能——选择器——它让我们可以选择性地处理传入的消息。让我们用两个新的替换现有的 RocketMQ 监听器。每个监听器都将使用一个兼容 SQL92 的谓词来确定是否应处理传入的消息。一个监听器仅处理标题lettermkt匹配的消息。另一个仅匹配标题letterj匹配的消息。


@Log4j2
@Service
@RocketMQMessageListener(
	topic = "greetings-topic",
	selectorExpression = " letter = 'm' or letter = 'k' or letter = 't' ",
	selectorType = SQL92,
	consumerGroup = "sql-consumer-group-mkt"
)
class MktSqlSelectorConsumer implements RocketMQListener<Greeting> {

	@Override
	public void onMessage(Greeting greeting) {
		log.info("'m', 'k', 't': " + greeting.toString());
	}
}


@Log4j2
@Service
@RocketMQMessageListener(
	topic = "greetings-topic",
	selectorExpression = " letter = 'j' ",
	selectorType = SQL92,
	consumerGroup = "sql-consumer-group-j"
)
class JSqlSelectorConsumer implements RocketMQListener<Greeting> {

	@Override
	public void onMessage(Greeting greeting) {
		log.info("'j': " + greeting.toString());
	}
}

不错吧?Apache RocketMQ 支持很多其他功能(除了在 24 小时内处理数万亿条消息之外!)。它可以将长尾消息存储在磁盘上,而不会降低性能。它支持消息的序列化(排序)、事务、批处理等。它甚至支持计划消息——仅在特定时间间隔后才传递的消息。不用说,我是 Apache RocketMQ 的忠实粉丝。

获取 Spring 电子邮件简报

通过 Spring 电子邮件简报保持联系

订阅

领先一步

VMware 提供培训和认证,以助您快速提升。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部