RSocket入门:服务器调用客户端

工程 | Ben Wilcock | 2020年5月12日 | ...

阅读时间:约7分钟。 编码时间:约20分钟。

如果您一直在关注我关于 RSocket系列文章,您会多次听到我提及“客户端和服务器”。但是,对于 RSocket,客户端和服务器之间的界限是模糊的。使用 RSocket,服务器可以向客户端发送消息,并且客户端可以像服务器一样响应这些请求。

事实上,RSocket 文档不使用“客户端”或“服务器”这两个术语。文档使用的是“请求者”和“响应者”这两个术语。在 RSocket 中,任何组件都可以充当请求者,任何组件都可以充当响应者,甚至可以同时充当两者。在 RSocket 中,请求者和响应者之间所有这些双向通信都通过一个“双向”连接进行。

今天,您将利用这些特性,通过编程让您的 rsocket-client 响应来自服务器的请求。在服务器端代码中,您将监听客户端连接事件,当连接事件发生时,会将新客户端存储在已连接客户端列表中。您还将要求每个客户端在其连接处于活动状态期间,向服务器流式传输遥测消息。

如果您一直在关注这个系列文章,您可以按照下面的说明一起编码。代码也可以从 GitHub 下载。

步骤 1:更新 Spring Boot 和 RSocket

首先,做一些整理工作。Spring BootRSocket Java 库最近都进行了更新。这些更新包括常规的错误修复、增强功能和弃用项,因此升级对我们有利。

在 Maven 的 pom.xml 文件的 <parent> 部分,将 spring-boot-starter-parent 的版本更改为 2.3.0.RELEASE,如下所示。您需要这样做两次,因为您有两个单独的项目 — rsocket-clientrsocket-server

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.0.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

要刷新您的代码,请在两个项目的根文件夹中运行以下命令

./mvnw clean compile

现在您可以继续编码任务了。

步骤 2:向客户端添加一个请求-响应消息处理器

客户端需要能够响应来自服务器的消息。在 rsocket-client 项目的 RSocketShellClient.java 中,添加一个名为 ClientHandler 的新内部类,如下所示

@Slf4j
class ClientHandler {
 
 @MessageMapping("client-status")
 public Flux<String> statusUpdate(String status) {
   log.info("Connection {}", status);
   return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
 }
}

这个类包含一个名为 statusUpdate() 的方法,该方法使用 @MessageMapping 注解修饰,就像 rsocket-server 项目中的注解一样。客户端使用这个类和这个方法来捕获和响应来自服务器的请求。响应本身是一个消息流。每隔 5 秒,客户端就会告诉服务器当前的空闲内存。可以将其视为客户端遥测数据。

为了使其工作,您必须将这个类“注册”到您的 RSocket 连接中。您将在下一节中进行此操作。

步骤 3:在客户端的构造函数中注册 ClientHandler

在客户端能够响应来自服务器的消息之前,它必须将 ClientHandler 注册到 RSocket 连接中。修订后的构造函数代码如下所示。请注意构造函数的方法签名发生了变化,添加了 RSocketStrategies 变量。Spring 会将此变量提供给您的构造函数。用下面列出的新代码替换您旧的构造函数代码。

public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
 
 // (1)
 String client = UUID.randomUUID().toString();
 log.info("Connecting using client ID: {}", client);
  
 // (2)
 SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
  
 // (3) 
 this.rsocketRequester = rsocketRequesterBuilder
 .setupRoute("shell-client")
 .setupData(client)
 .rsocketStrategies(strategies)
 .rsocketConnector(connector -> connector.acceptor(responder))
 .connectTcp("localhost", 7000)
 .block();
  
 // (4)
 this.rsocketRequester.rsocket()
 .onClose()
 .doOnError(error -> log.warn("Connection CLOSED"))
 .doFinally(consumer -> log.info("Client DISCONNECTED"))
 .subscribe();
 }

在上面的代码中,您首先生成并存储一个唯一 ID,用于标识此客户端实例 (1)。接下来,使用 RSocket strategies 和一个新的 ClientHandler 实例创建一个新的 SocketAcceptor (2)。然后使用 RSocketRequesterBuilder 注册新的 SocketAcceptor (3)。最后,通过处理 RSocket 的 onClose() 事件来确保优雅地处理断开连接 (4)。

客户端代码就这些了。让我们继续服务器端的修改。

步骤 4:在服务器端记住客户端

rsocket-server 项目中要做的第一件事是通过向 RSocketController.java 类添加一个类级别变量来创建 RSocketRequester 客户端集合,如下所示

private final List<RSocketRequester> CLIENTS = new ArrayList<>();

接下来,通过添加一个新方法来添加一个连接处理器,如下所示

  @ConnectMapping("shell-client")
 void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
 // The code for the method will go HERE
 }

@ConnectMapping 注解允许您监听发生的客户端连接事件。使用此事件,您可以安排两项工作。第一项是将每个新客户端添加到 CLIENTS 列表中。第二项是调用每个客户端并启动其遥测流。

将以下代码添加到您刚刚创建的方法中

requester.rsocket()
        .onClose() // (1)
        .doFirst(() -> { 
            log.info("Client: {} CONNECTED.", client);
            CLIENTS.add(requester); // (2)
        })
        .doOnError(error -> { 
            log.warn("Channel to client {} CLOSED", client); // (3)
        })
        .doFinally(consumer -> { 
            CLIENTS.remove(requester);
            log.info("Client {} DISCONNECTED", client); // (4)
        })
        .subscribe();

首先要注意的是对 requester.rsocket().onClose() 方法的调用 (1)。此方法返回一个响应式的 Mono 对象,其中包含您需要的所有回调。

mono 的 doFirst() 方法在任何对 subscribe() 的调用之前被调用,但在 mono 初始创建之后。使用此方法将客户端的 requester 对象添加到 CLIENTS 列表 (2)。

这段代码可能感觉有点反直觉——在客户端连接时调用 onClose(),然后使用返回的 mono 来存储新客户端的引用。有时,事件驱动的 API 可能会感觉有点奇怪。但可以将其视为此 RSocket 连接的 mono 向您发送了一个“我还活着”的事件。您正在使用该创建事件来触发在列表中存储每个客户端的引用。

当连接出现问题时,RSocket 会调用 mono 的 doOnError() 方法。这包括客户端选择关闭连接的情况。您可以使用提供的 error 变量来决定采取什么操作。在上面的代码中,错误简单地被记录为警告 (3)。

mono 的 doFinally() 方法在 RSocket 连接关闭时触发。此方法是在列表 CLIENTS 中移除客户端的理想位置 (4)。

最后,subscribe() 激活您已添加到 mono 的响应式代码,并发出信号表示您已准备好处理事件。

步骤 5:从客户端获取空闲内存读数

当每个客户端连接时,请求一个遥测读数流。为此,您需要在 connectShellClientAndAskForTelemetry() 方法中再次工作,向您之前添加的 client-status 消息处理器发送请求。代码如下所示

requester.route("client-status")
        .data("OPEN")
        .retrieveFlux(String.class)
        .doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
        .subscribe();

使用 requester,定位路由 "client-status"。发送字符串 "OPEN" 作为消息数据,并检索一个 String 类型的 Flux。到达的每个字符串都包含客户端当前的空闲内存读数。将此读数记录到控制台。最后,不要忘记调用 subscribe() 来激活您的响应式代码。

步骤 6:构建并运行 RSocket 服务器

是时候测试您的代码了。打开一个终端窗口,切换到 rsocket-server 目录,并按如下方式运行服务器

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

服务器在 localhost7000 端口启动。

步骤 7:构建并运行 RSocket 客户端

打开第二个终端窗口,切换到 rsocket-client 目录。然后,按如下方式构建并运行客户端

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

工作原理

启动后,您会注意到客户端和服务器组件在控制台中都有新的日志条目。在 rsocket-client 窗口中,您会看到显示客户端唯一 ID(UUID 形式)和 "OPEN" 连接状态的日志条目,如下所示

Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>

至少等待 10 秒钟,然后在 shell:> 提示符下键入 exit。rsocket-client 现在会断开与服务器的连接并关闭。

现在切换到 rsocket-server 窗口。日志条目看起来像这样

Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED

客户端连接时,会被添加到客户端列表中,控制台会打印其客户端 ID 和状态“CONNECTED”。然后,每隔 5 秒,日志会显示客户端当前的“空闲内存”读数。最后,当客户端断开连接时,其 RSocket 通道的状态变为“CLOSED”,客户端状态变为“DISCONNECTED”。

您可以通过在 rsocket-server 的终端窗口中按 Ctrl-C 来停止该进程。

总结

调用客户端的能力非常强大。它适用于各种场景,包括移动设备、物联网或微服务。由于所有这些都可以通过 TCP 或 WebSockets 进行,您无需依赖消息代理等重量级解决方案,就已拥有所需的所有基础设施。

您在这里学习了很多内容。您学习了如何将服务器变成“请求者”,将客户端变成“响应者”。您了解了如何监听连接事件。您还学习了一点关于如何处理来自 rsocket 连接的错误和事件的知识。并且,尽管在此练习中您选择了“请求-流”作为您的服务器-客户端通信模式,但您可以根据您的需要使用四种 RSocket 交互模式中的任何一种。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助力您的进步。

了解更多

获得支持

Tanzu Spring 通过一个简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部