领先一步
VMware 提供培训和认证,助力您的进步。
了解更多阅读时间:约7分钟。 编码时间:约20分钟。
如果您一直在关注我关于 RSocket 的系列文章,您会多次听到我提及“客户端和服务器”。但是,对于 RSocket,客户端和服务器之间的界限是模糊的。使用 RSocket,服务器可以向客户端发送消息,并且客户端可以像服务器一样响应这些请求。
事实上,RSocket 文档不使用“客户端”或“服务器”这两个术语。文档使用的是“请求者”和“响应者”这两个术语。在 RSocket 中,任何组件都可以充当请求者,任何组件都可以充当响应者,甚至可以同时充当两者。在 RSocket 中,请求者和响应者之间所有这些双向通信都通过一个“双向”连接进行。
今天,您将利用这些特性,通过编程让您的 rsocket-client 响应来自服务器的请求。在服务器端代码中,您将监听客户端连接事件,当连接事件发生时,会将新客户端存储在已连接客户端列表中。您还将要求每个客户端在其连接处于活动状态期间,向服务器流式传输遥测消息。
首先,做一些整理工作。Spring Boot 和 RSocket Java 库最近都进行了更新。这些更新包括常规的错误修复、增强功能和弃用项,因此升级对我们有利。
在 Maven 的 pom.xml
文件的 <parent>
部分,将 spring-boot-starter-parent
的版本更改为 2.3.0.RELEASE
,如下所示。您需要这样做两次,因为您有两个单独的项目 — rsocket-client
和 rsocket-server
。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
要刷新您的代码,请在两个项目的根文件夹中运行以下命令
./mvnw clean compile
现在您可以继续编码任务了。
客户端需要能够响应来自服务器的消息。在 rsocket-client
项目的 RSocketShellClient.java
中,添加一个名为 ClientHandler
的新内部类,如下所示
@Slf4j
class ClientHandler {
@MessageMapping("client-status")
public Flux<String> statusUpdate(String status) {
log.info("Connection {}", status);
return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
}
}
这个类包含一个名为 statusUpdate()
的方法,该方法使用 @MessageMapping
注解修饰,就像 rsocket-server
项目中的注解一样。客户端使用这个类和这个方法来捕获和响应来自服务器的请求。响应本身是一个消息流。每隔 5 秒,客户端就会告诉服务器当前的空闲内存。可以将其视为客户端遥测数据。
为了使其工作,您必须将这个类“注册”到您的 RSocket 连接中。您将在下一节中进行此操作。
在客户端能够响应来自服务器的消息之前,它必须将 ClientHandler
注册到 RSocket 连接中。修订后的构造函数代码如下所示。请注意构造函数的方法签名发生了变化,添加了 RSocketStrategies
变量。Spring 会将此变量提供给您的构造函数。用下面列出的新代码替换您旧的构造函数代码。
public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
// (1)
String client = UUID.randomUUID().toString();
log.info("Connecting using client ID: {}", client);
// (2)
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
// (3)
this.rsocketRequester = rsocketRequesterBuilder
.setupRoute("shell-client")
.setupData(client)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
// (4)
this.rsocketRequester.rsocket()
.onClose()
.doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED"))
.subscribe();
}
在上面的代码中,您首先生成并存储一个唯一 ID,用于标识此客户端实例 (1)。接下来,使用 RSocket strategies
和一个新的 ClientHandler
实例创建一个新的 SocketAcceptor
(2)。然后使用 RSocketRequesterBuilder
注册新的 SocketAcceptor
(3)。最后,通过处理 RSocket 的 onClose()
事件来确保优雅地处理断开连接 (4)。
客户端代码就这些了。让我们继续服务器端的修改。
在 rsocket-server
项目中要做的第一件事是通过向 RSocketController.java
类添加一个类级别变量来创建 RSocketRequester
客户端集合,如下所示
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
接下来,通过添加一个新方法来添加一个连接处理器,如下所示
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
// The code for the method will go HERE
}
@ConnectMapping
注解允许您监听发生的客户端连接事件。使用此事件,您可以安排两项工作。第一项是将每个新客户端添加到 CLIENTS
列表中。第二项是调用每个客户端并启动其遥测流。
将以下代码添加到您刚刚创建的方法中
requester.rsocket()
.onClose() // (1)
.doFirst(() -> {
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester); // (2)
})
.doOnError(error -> {
log.warn("Channel to client {} CLOSED", client); // (3)
})
.doFinally(consumer -> {
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client); // (4)
})
.subscribe();
首先要注意的是对 requester.rsocket().onClose()
方法的调用 (1)。此方法返回一个响应式的 Mono
对象,其中包含您需要的所有回调。
mono 的 doFirst()
方法在任何对 subscribe()
的调用之前被调用,但在 mono 初始创建之后。使用此方法将客户端的 requester
对象添加到 CLIENTS
列表 (2)。
这段代码可能感觉有点反直觉——在客户端连接时调用
onClose()
,然后使用返回的 mono 来存储新客户端的引用。有时,事件驱动的 API 可能会感觉有点奇怪。但可以将其视为此 RSocket 连接的 mono 向您发送了一个“我还活着”的事件。您正在使用该创建事件来触发在列表中存储每个客户端的引用。
当连接出现问题时,RSocket 会调用 mono 的 doOnError()
方法。这包括客户端选择关闭连接的情况。您可以使用提供的 error
变量来决定采取什么操作。在上面的代码中,错误简单地被记录为警告 (3)。
mono 的 doFinally()
方法在 RSocket 连接关闭时触发。此方法是在列表 CLIENTS
中移除客户端的理想位置 (4)。
最后,subscribe()
激活您已添加到 mono 的响应式代码,并发出信号表示您已准备好处理事件。
当每个客户端连接时,请求一个遥测读数流。为此,您需要在 connectShellClientAndAskForTelemetry()
方法中再次工作,向您之前添加的 client-status
消息处理器发送请求。代码如下所示
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
.subscribe();
使用 requester
,定位路由 "client-status"
。发送字符串 "OPEN"
作为消息数据,并检索一个 String
类型的 Flux
。到达的每个字符串都包含客户端当前的空闲内存读数。将此读数记录到控制台。最后,不要忘记调用 subscribe()
来激活您的响应式代码。
是时候测试您的代码了。打开一个终端窗口,切换到 rsocket-server
目录,并按如下方式运行服务器
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
服务器在 localhost
的 7000
端口启动。
打开第二个终端窗口,切换到 rsocket-client
目录。然后,按如下方式构建并运行客户端
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
启动后,您会注意到客户端和服务器组件在控制台中都有新的日志条目。在 rsocket-client
窗口中,您会看到显示客户端唯一 ID(UUID 形式)和 "OPEN"
连接状态的日志条目,如下所示
Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>
至少等待 10 秒钟,然后在 shell:>
提示符下键入 exit
。rsocket-client 现在会断开与服务器的连接并关闭。
现在切换到 rsocket-server
窗口。日志条目看起来像这样
Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED
客户端连接时,会被添加到客户端列表中,控制台会打印其客户端 ID 和状态“CONNECTED”。然后,每隔 5 秒,日志会显示客户端当前的“空闲内存”读数。最后,当客户端断开连接时,其 RSocket 通道的状态变为“CLOSED”,客户端状态变为“DISCONNECTED”。
您可以通过在 rsocket-server
的终端窗口中按 Ctrl-C
来停止该进程。
调用客户端的能力非常强大。它适用于各种场景,包括移动设备、物联网或微服务。由于所有这些都可以通过 TCP 或 WebSockets 进行,您无需依赖消息代理等重量级解决方案,就已拥有所需的所有基础设施。
您在这里学习了很多内容。您学习了如何将服务器变成“请求者”,将客户端变成“响应者”。您了解了如何监听连接事件。您还学习了一点关于如何处理来自 rsocket 连接的错误和事件的知识。并且,尽管在此练习中您选择了“请求-流”作为您的服务器-客户端通信模式,但您可以根据您的需要使用四种 RSocket 交互模式中的任何一种。