领先一步
VMware 提供培训和认证,助您加速进步。
了解更多您好,Spring 爱好者!在本期 Spring Tips 中,我们将探讨阿里巴巴的 Apache RocketMQ。我们之前在 Spring Tips 中讨论过一些关于阿里巴巴的内容。请查看早期 Spring Tips 的内容,其中我们探讨了 部分 Spring Cloud Alibaba 内容。
要使用Apache RocketMQ,您需要按照RocketMQ快速入门指南中的步骤进行操作。本期Spring Tips介绍了Apache RocketMQ,它最初是阿里巴巴内部开发和使用的技术,并在11/11(著名的中国购物节,类似于美国的“网络星期一”或“黑色星期五”)的考验下得到了验证。但比那要大得多。2019年,阿里巴巴(仅凭自身,不涉及其他电子商务引擎)在24小时内创造了近400亿美元的收入。这需要数万亿条消息通过能够满足需求的系统发送。RocketMQ是他们唯一信赖的东西。
运行Apache RocketMQ时,您需要使用Java 8。(当然,在编写连接到Apache RocketMQ的Spring应用程序时,您可以使用任何Java版本。)我使用SDK Manager(“SDKman”-sdk)来切换到适当的Java版本。
sdk use java 8.0.242.hs-adpt
这将安装一个可用的版本,如果尚未安装的话。完成后,您将需要运行NameServer。
${ROCKETMQ_HOME}/bin/mqnamesrv
然后您需要运行Broker本身。
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876
如果您想使用基于SQL的过滤,您需要在Broker的配置文件$ROCKETMQ_HOME/conf/broker.conf中添加一个属性,然后告诉RocketMQ使用该配置。
enablePropertyFilter = true
我使用一个像这样的脚本来启动所有东西。
export JAVA_HOME=$HOME/.sdkman/candidates/java/8.0.242.hs-adpt
${ROCKETMQ_HOME}/bin/mqnamesrv &
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c ${ROCKETMQ_HOME}/conf/broker.conf
让我们来看一个使用Spring Boot自动配置和RocketMQTemplate的简单生产者类。
为了使用这个,您需要创建一个新的项目在Spring Initializr上。我生成了一个包含最新Java版本的新项目,然后确保包含了Lombok。我们还需要Apache RocketMQ客户端和相应的Spring Boot自动配置。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
自动配置将创建一个连接到正在运行的Apache RocketMQ Broker的连接,该连接由某些属性告知。
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=greetings-producer-group
第一个属性name-server告诉应用程序Apache RocketMQ NameServer的位置。然后NameServer知道Broker的位置。您还需要为生产者和消费者指定一个组。在这里,我们使用greetings-producer-group。
package com.example.producer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import java.time.Instant;
@RequiredArgsConstructor
@SpringBootApplication
public class ProducerApplication {
@Bean
ApplicationListener<ApplicationReadyEvent> ready(RocketMQTemplate template) {
return event -> {
var now = Instant.now();
var destination = "greetings-topic";
for (var name : "Tammie,Kimly,Josh,Rob,Mario,Mia".split(",")) {
var payload = new Greeting("Hello @ " + name + " @ " + now.toString());
var messagePostProcessor = new MessagePostProcessor() {
@Override
public Message<?> postProcessMessage(Message<?> message) {
var headerValue = Character.toString(name.toLowerCase().charAt(0));
return MessageBuilder
.fromMessage(message)
.setHeader("letter", headerValue)
.build();
}
};
template.convertAndSend(destination, payload, messagePostProcessor);
}
};
}
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
private String message;
}
我不知道它是否能变得更简单!它是一个简单的for循环,处理每个名字,创建一个新的Greeting对象,然后使用RocketMQTemplate将有效负载发送到Apache RocketMQ主题greetings-topic。在这里,我们使用了接受MessagePostProcessor的RocketMQTemplate对象的重载。MessagePostProcessor是一个回调,我们可以在其中转换将要发送的Spring Framework Message对象。在此示例中,我们贡献了一个头部值letter,其中包含名字的首字母。我们将在消费者中使用它。
让我们来看看消费者。从Spring Initializr生成一个新的Spring Boot应用程序,并确保也添加Apache RocketMQ自动配置。您还需要在application.properties中为客户端指定NameServer。
自动配置支持定义实现RocketMQListener<T>的Bean,其中T是消费者将接收的有效负载的类型。在这种情况下,有效负载是Greeting。
package com.example.consumer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.spring.annotation.SelectorType.SQL92;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
private String message;
}
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
consumerGroup = "simple-group"
)
class SimpleConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info(greeting.toString());
}
}
在此示例中,SimpleConsumer只是记录来自Apache RocketMQ的greetings-topic主题的所有传入消息。在这里,消费者将处理主题上的*每条*消息。让我们看看另一个不错的特性——选择器——它允许我们选择性地处理传入的消息。让我们用两个新的监听器替换现有的RocketMQ监听器。每个监听器都将使用一个SQL92兼容的谓词来确定是否应该处理传入的消息。一个监听器只处理letter头部与m、k或t匹配的消息。另一个只匹配其letter头部与j匹配的消息。
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
selectorExpression = " letter = 'm' or letter = 'k' or letter = 't' ",
selectorType = SQL92,
consumerGroup = "sql-consumer-group-mkt"
)
class MktSqlSelectorConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info("'m', 'k', 't': " + greeting.toString());
}
}
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
selectorExpression = " letter = 'j' ",
selectorType = SQL92,
consumerGroup = "sql-consumer-group-j"
)
class JSqlSelectorConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info("'j': " + greeting.toString());
}
}
不错吧?Apache RocketMQ还有很多其他支持(除了在24小时内处理数万亿条消息!)它可以在不降低性能的情况下将长尾消息存储在磁盘上。它支持消息的序列化、事务、批量处理等。它甚至支持计划消息——在特定间隔后才传递的消息。不用说,我是Apache RocketMQ的忠实粉丝。