RSocket入门:服务器调用客户端

工程 | Ben Wilcock | 2020年5月12日 | ...

阅读时间:约7分钟。 编码时间:约20分钟。

如果您一直在关注我关于RSocket系列文章,您已经多次听到我提到“客户端和服务器”。但是,在RSocket中,客户端和服务器之间的界限是模糊的。使用RSocket,服务器可以向客户端发送消息,客户端可以像服务器一样响应这些请求。

事实上,RSocket文档并没有使用“客户端”或“服务器”这两个术语。文档改用“请求者”和“响应者”这两个术语。在RSocket中,任何组件都可以充当请求者,任何组件都可以充当响应者,甚至可以同时充当两者。在RSocket中,请求者和响应者之间所有这些来回通信都通过单个“双向”连接进行。

今天,您将通过编程您的rsocket-client来响应来自服务器的请求,来利用这些特性。在服务器端代码中,您将监听客户端连接事件,当连接事件发生时,您将把新的客户端存储在已连接客户端列表中。您还将要求每个客户端在连接存活期间向服务器回传遥测消息流。

如果您一直在关注该系列文章,您可以按照以下说明进行编码。代码也可以从GitHub下载。

步骤1:更新Spring Boot和RSocket

首先,一些准备工作。Spring BootRSocket Java库最近都进行了更新。这些更新包括通常的错误修复、增强和弃用,因此升级符合我们的利益。

在Maven pom.xml<parent>部分中,将spring-boot-starter-parent更改为版本2.3.0.RELEASE,如下所示。您需要执行此操作两次,因为您有两个单独的项目——rsocket-clientrsocket-server

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.0.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

要刷新您的代码,请在两个项目的根文件夹中运行以下命令:

./mvnw clean compile

现在您可以继续进行编码任务。

步骤2:向客户端添加请求-响应消息处理器

客户端需要能够响应来自服务器的消息。在rsocket-client项目的RSocketShellClient.java中,添加一个名为ClientHandler的新内部类,如下所示:

@Slf4j
class ClientHandler {
 
 @MessageMapping("client-status")
 public Flux<String> statusUpdate(String status) {
   log.info("Connection {}", status);
   return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
 }
}

此类包含一个名为statusUpdate()的方法,该方法使用与rsocket-server项目中的方法类似的@MessageMapping注解进行修饰。客户端使用此类和此方法来捕获和响应来自服务器的请求。响应本身是消息流。每5秒,客户端都会告诉服务器其当前的可用内存。您可以将其视为客户端遥测数据。

要使此方法有效,您必须将此类“注册”到您的RSocket连接中。您将在下一节中执行此操作。

步骤3:在客户端的构造函数中注册ClientHandler

在客户端能够响应来自服务器的消息之前,它必须使用RSocket连接注册ClientHandler。修改后的构造函数代码如下所示。请注意构造函数方法签名中的更改,以便添加RSocketStrategies变量。Spring会将此变量提供给您的构造函数。使用下面列出的新代码替换旧的构造函数代码。

public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
 
 // (1)
 String client = UUID.randomUUID().toString();
 log.info("Connecting using client ID: {}", client);
  
 // (2)
 SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
  
 // (3) 
 this.rsocketRequester = rsocketRequesterBuilder
 .setupRoute("shell-client")
 .setupData(client)
 .rsocketStrategies(strategies)
 .rsocketConnector(connector -> connector.acceptor(responder))
 .connectTcp("localhost", 7000)
 .block();
  
 // (4)
 this.rsocketRequester.rsocket()
 .onClose()
 .doOnError(error -> log.warn("Connection CLOSED"))
 .doFinally(consumer -> log.info("Client DISCONNECTED"))
 .subscribe();
 }

在上面的代码中,您首先生成并存储一个唯一ID来标识此客户端实例(1)。接下来,使用RSocket strategies和一个新的ClientHandler实例创建一个新的SocketAcceptor(2)。然后使用RSocketRequesterBuilder注册新的SocketAcceptor(3)。最后,确保通过处理RSocket onClose()事件来优雅地处理断开连接(4)。

客户端代码就是这样。让我们继续进行服务器端的更改。

步骤4:记住服务器上的客户端

rsocket-server项目中,首先要做的是通过向RSocketController.java类添加一个类级变量来创建一个RSocketRequester客户端集合,如下所示:

private final List<RSocketRequester> CLIENTS = new ArrayList<>();

接下来,通过添加一个新的方法来添加一个连接处理器,如下所示:

  @ConnectMapping("shell-client")
 void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
 // The code for the method will go HERE
 }

@ConnectMapping注解允许您在客户端连接事件发生时监听它们。使用此事件,您可以安排两项工作。第一项是将每个新客户端添加到CLIENTS列表中。第二项是调用每个客户端并启动它们的遥测流。

将以下代码添加到您刚刚创建的方法中:

requester.rsocket()
        .onClose() // (1)
        .doFirst(() -> { 
            log.info("Client: {} CONNECTED.", client);
            CLIENTS.add(requester); // (2)
        })
        .doOnError(error -> { 
            log.warn("Channel to client {} CLOSED", client); // (3)
        })
        .doFinally(consumer -> { 
            CLIENTS.remove(requester);
            log.info("Client {} DISCONNECTED", client); // (4)
        })
        .subscribe();

首先要注意的是对requester.rsocket().onClose()方法的调用(1)。此方法返回一个反应式Mono对象,其中包含您需要的所有回调。

mono的doFirst()方法在任何对subscribe()的调用之前调用,但在mono的初始创建之后调用。使用此方法将客户端的requester对象添加到CLIENTS列表中(2)。

这段代码可能感觉有点违反直觉——在客户端连接时调用onClose(),然后使用生成的mono来存储对新客户端的引用。有时,事件驱动的API可能会感觉有点奇怪。但是,将其视为此RSocket连接的mono发送给您的“我还活着”事件。您正在使用该创建事件来触发在列表中存储每个客户端引用的操作。

每当连接出现问题时,RSocket都会调用mono的doOnError()方法。这包括客户端选择关闭连接的情况。您可以使用提供的error变量来决定采取什么操作。在上面的代码中,错误只是作为警告记录下来(3)。

当RSocket连接关闭时,会触发mono的doFinally()方法。此方法是在运行从CLIENTS列表中删除客户端的代码的理想位置(4)。

最后,subscribe()激活您添加到mono中的反应式代码,并发出您已准备好处理事件的信号。

步骤5:请求客户端的可用内存读数

当每个客户端连接时,请求一个遥测读数流。为此,再次在connectShellClientAndAskForTelemetry()方法中工作,您需要向前面添加的client-status消息处理器发送请求。此代码如下所示:

requester.route("client-status")
        .data("OPEN")
        .retrieveFlux(String.class)
        .doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
        .subscribe();

使用requester,目标路由为"client-status"。发送字符串"OPEN"作为消息数据,并检索类型为StringFlux。到达的每个字符串都包含客户端当前的可用内存读数。将此读数记录到控制台。最后,不要忘记subscribe()以激活您的反应式代码。

步骤 6:构建并运行 RSocket 服务器

现在是测试代码的时候了。打开一个终端窗口,移动到rsocket-server目录,并按如下方式运行服务器:

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

服务器在localhost端口7000启动。

步骤 7:构建并运行 RSocket 客户端

打开一个第二个终端窗口,并移动到rsocket-client目录。在此目录下,构建并运行客户端,如下所示:

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

工作原理

启动后,您会注意到客户端和服务器组件在控制台中都有新的日志条目。在rsocket-client窗口中,您将看到显示客户端唯一 ID(UUID 格式)和“OPEN”连接状态的日志条目,如下所示:

Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>

等待至少 10 秒,然后在shell:>提示符下键入exit。rsocket-client 现在与服务器断开连接并关闭。

现在切换到rsocket-server窗口。日志条目如下所示:

Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED

当客户端连接时,它会被添加到客户端列表中,控制台会打印其客户端 ID 和状态“CONNECTED”。然后,每 5 秒,日志会显示客户端当前的“可用内存”读数。最后,当客户端断开连接时,其 RSocket 通道的状态变为“CLOSED”,客户端变为“DISCONNECTED”。

您可以通过在其终端窗口中按下Ctrl-C来停止rsocket-server进程。

最终想法

能够调用客户端非常强大。它非常适合各种场景,包括移动设备、物联网或微服务。并且由于所有这些都可以通过 TCP 或 WebSockets 发生,因此您已经拥有所需的所有基础设施,无需求助于消息代理之类的重量级解决方案。

您在这里涵盖了很多内容。您学习了如何将服务器转换为“请求者”,将客户端转换为“响应者”。您了解了如何监听连接事件。您还了解了如何处理来自 rsocket 连接的错误和事件。并且,尽管在此练习中,您选择“request-stream”作为服务器-客户端通信,但您可以根据需要使用四种 RSocket 交互模式中的任何一种。

获取Spring新闻

关注Spring新闻

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部