领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多阅读时间:约7分钟。 编码时间:约20分钟。
如果您一直在关注我关于RSocket的系列文章,您已经多次听到我提到“客户端和服务器”。但是,在RSocket中,客户端和服务器之间的界限是模糊的。使用RSocket,服务器可以向客户端发送消息,客户端可以像服务器一样响应这些请求。
事实上,RSocket文档并没有使用“客户端”或“服务器”这两个术语。文档改用“请求者”和“响应者”这两个术语。在RSocket中,任何组件都可以充当请求者,任何组件都可以充当响应者,甚至可以同时充当两者。在RSocket中,请求者和响应者之间所有这些来回通信都通过单个“双向”连接进行。
今天,您将通过编程您的rsocket-client来响应来自服务器的请求,来利用这些特性。在服务器端代码中,您将监听客户端连接事件,当连接事件发生时,您将把新的客户端存储在已连接客户端列表中。您还将要求每个客户端在连接存活期间向服务器回传遥测消息流。
首先,一些准备工作。Spring Boot和RSocket Java库最近都进行了更新。这些更新包括通常的错误修复、增强和弃用,因此升级符合我们的利益。
在Maven pom.xml
的<parent>
部分中,将spring-boot-starter-parent
更改为版本2.3.0.RELEASE
,如下所示。您需要执行此操作两次,因为您有两个单独的项目——rsocket-client
和rsocket-server
。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
要刷新您的代码,请在两个项目的根文件夹中运行以下命令:
./mvnw clean compile
现在您可以继续进行编码任务。
客户端需要能够响应来自服务器的消息。在rsocket-client
项目的RSocketShellClient.java
中,添加一个名为ClientHandler
的新内部类,如下所示:
@Slf4j
class ClientHandler {
@MessageMapping("client-status")
public Flux<String> statusUpdate(String status) {
log.info("Connection {}", status);
return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
}
}
此类包含一个名为statusUpdate()
的方法,该方法使用与rsocket-server
项目中的方法类似的@MessageMapping
注解进行修饰。客户端使用此类和此方法来捕获和响应来自服务器的请求。响应本身是消息流。每5秒,客户端都会告诉服务器其当前的可用内存。您可以将其视为客户端遥测数据。
要使此方法有效,您必须将此类“注册”到您的RSocket连接中。您将在下一节中执行此操作。
在客户端能够响应来自服务器的消息之前,它必须使用RSocket连接注册ClientHandler
。修改后的构造函数代码如下所示。请注意构造函数方法签名中的更改,以便添加RSocketStrategies
变量。Spring会将此变量提供给您的构造函数。使用下面列出的新代码替换旧的构造函数代码。
public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
// (1)
String client = UUID.randomUUID().toString();
log.info("Connecting using client ID: {}", client);
// (2)
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
// (3)
this.rsocketRequester = rsocketRequesterBuilder
.setupRoute("shell-client")
.setupData(client)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
// (4)
this.rsocketRequester.rsocket()
.onClose()
.doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED"))
.subscribe();
}
在上面的代码中,您首先生成并存储一个唯一ID来标识此客户端实例(1)。接下来,使用RSocket strategies
和一个新的ClientHandler
实例创建一个新的SocketAcceptor
(2)。然后使用RSocketRequesterBuilder
注册新的SocketAcceptor
(3)。最后,确保通过处理RSocket onClose()
事件来优雅地处理断开连接(4)。
客户端代码就是这样。让我们继续进行服务器端的更改。
在rsocket-server
项目中,首先要做的是通过向RSocketController.java
类添加一个类级变量来创建一个RSocketRequester
客户端集合,如下所示:
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
接下来,通过添加一个新的方法来添加一个连接处理器,如下所示:
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
// The code for the method will go HERE
}
@ConnectMapping
注解允许您在客户端连接事件发生时监听它们。使用此事件,您可以安排两项工作。第一项是将每个新客户端添加到CLIENTS
列表中。第二项是调用每个客户端并启动它们的遥测流。
将以下代码添加到您刚刚创建的方法中:
requester.rsocket()
.onClose() // (1)
.doFirst(() -> {
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester); // (2)
})
.doOnError(error -> {
log.warn("Channel to client {} CLOSED", client); // (3)
})
.doFinally(consumer -> {
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client); // (4)
})
.subscribe();
首先要注意的是对requester.rsocket().onClose()
方法的调用(1)。此方法返回一个反应式Mono
对象,其中包含您需要的所有回调。
mono的doFirst()
方法在任何对subscribe()
的调用之前调用,但在mono的初始创建之后调用。使用此方法将客户端的requester
对象添加到CLIENTS
列表中(2)。
这段代码可能感觉有点违反直觉——在客户端连接时调用
onClose()
,然后使用生成的mono来存储对新客户端的引用。有时,事件驱动的API可能会感觉有点奇怪。但是,将其视为此RSocket连接的mono发送给您的“我还活着”事件。您正在使用该创建事件来触发在列表中存储每个客户端引用的操作。
每当连接出现问题时,RSocket都会调用mono的doOnError()
方法。这包括客户端选择关闭连接的情况。您可以使用提供的error
变量来决定采取什么操作。在上面的代码中,错误只是作为警告记录下来(3)。
当RSocket连接关闭时,会触发mono的doFinally()
方法。此方法是在运行从CLIENTS
列表中删除客户端的代码的理想位置(4)。
最后,subscribe()
激活您添加到mono中的反应式代码,并发出您已准备好处理事件的信号。
当每个客户端连接时,请求一个遥测读数流。为此,再次在connectShellClientAndAskForTelemetry()
方法中工作,您需要向前面添加的client-status
消息处理器发送请求。此代码如下所示:
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
.subscribe();
使用requester
,目标路由为"client-status"
。发送字符串"OPEN"
作为消息数据,并检索类型为String
的Flux
。到达的每个字符串都包含客户端当前的可用内存读数。将此读数记录到控制台。最后,不要忘记subscribe()
以激活您的反应式代码。
现在是测试代码的时候了。打开一个终端窗口,移动到rsocket-server
目录,并按如下方式运行服务器:
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
服务器在localhost
端口7000
启动。
打开一个第二个终端窗口,并移动到rsocket-client
目录。在此目录下,构建并运行客户端,如下所示:
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
启动后,您会注意到客户端和服务器组件在控制台中都有新的日志条目。在rsocket-client
窗口中,您将看到显示客户端唯一 ID(UUID 格式)和“OPEN”
连接状态的日志条目,如下所示:
Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>
等待至少 10 秒,然后在shell:>
提示符下键入exit
。rsocket-client 现在与服务器断开连接并关闭。
现在切换到rsocket-server
窗口。日志条目如下所示:
Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED
当客户端连接时,它会被添加到客户端列表中,控制台会打印其客户端 ID 和状态“CONNECTED”。然后,每 5 秒,日志会显示客户端当前的“可用内存”读数。最后,当客户端断开连接时,其 RSocket 通道的状态变为“CLOSED”,客户端变为“DISCONNECTED”。
您可以通过在其终端窗口中按下Ctrl-C
来停止rsocket-server
进程。
能够调用客户端非常强大。它非常适合各种场景,包括移动设备、物联网或微服务。并且由于所有这些都可以通过 TCP 或 WebSockets 发生,因此您已经拥有所需的所有基础设施,无需求助于消息代理之类的重量级解决方案。
您在这里涵盖了很多内容。您学习了如何将服务器转换为“请求者”,将客户端转换为“响应者”。您了解了如何监听连接事件。您还了解了如何处理来自 rsocket 连接的错误和事件。并且,尽管在此练习中,您选择“request-stream”作为服务器-客户端通信,但您可以根据需要使用四种 RSocket 交互模式中的任何一种。