领先一步
VMware 提供培训和认证,助力您的进步。
了解更多阅读时间:约 15 分钟。
在本系列之前的文章中,您尝试了使用 Spring Boot 和 RSocket 进行请求-响应和即发即忘的消息模式。本次,您将尝试 RSocket 的另一种全新的消息模型——request-stream (请求-流)。
在此练习中,您将学习如何使用传统的 'client-requests-a-server-stream'(客户端请求服务器流)方式进行数据流传输。
到目前为止我还没有提到的一点是,RSocket 允许您在两个方向上使用其消息模型。因此,如果您想使用不太常见的 'server-requests-a-client-stream'(服务器请求客户端流)模型,RSocket 也没有问题。此外,还有很多非 Java 的RSocket 实现可供选择,包括 Go、Javascript 和 .Net——如果您的架构包含 Java 可能不是最佳选择的平台,这些实现将是理想之选。
澄清这一点后,请按照以下步骤,将流数据功能添加到您之前文章中现有的 RSocket 客户端和服务器代码中。
如果您还没有阅读之前关于服务器端、客户端请求-响应消息传递或即发即忘的文章,现在是您的机会!代码示例在GitHub 上。
回到您的服务器端 RSocketController
类,添加一个名为 .stream()
的新方法,其签名符合 RSocket 对此消息模型的期望 — '接受一个对象,返回一个 flux'。使用 @MessageMapping
注解标记此新方法,并指定一个合适的映射名称 — 例如 "stream"
。方法的示例代码如下
@MessageMapping("stream")
Flux<Message> stream(Message request) {
log.info("Received stream request: {}", request);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message(SERVER, STREAM, index))
.log();
}
RSocketController
位于rsocket-server
文件夹中的io.pivotal.rsocketserver
包内。
.stream()
方法的唯一参数 Message
来自之前讨论过的 io.pivotal.rsocketserver.data
包。此消息构成了客户端请求数据流的基础。上面的代码会在收到客户端请求时立即将其记录到控制台。
该方法返回的 Flux
对象是Project Reactor的一部分,并且也在 Spring Framework 的响应式支持中得到使用。
RSocket 使用 Flux
,因为它极大地简化了响应式数据流的处理。Flux
是数据的一个“Publisher”(发布者)。它描述了 0 到 N 个元素的流,并提供了大量用于处理流数据的操作符 — 类似于Java 8 的 streaming APIs。
在上面的代码中,每秒会有一个新的 Long
元素添加到 Flux 中 — 通过调用 .interval()
设置 — 从而实质上提供了一个持续的数据流。.map()
函数使用 Long
值作为索引创建了一个新的消息对象,而在最后一行,调用 .log()
方法会将流经 Flux 的所有元素(包括错误等)打印到控制台。
在客户端项目的 RSocketShellClient
类中,首先添加一个 Disposable
对象的全局引用,如下所示
private static Disposable disposable;
RSocketShellClient
位于rsocket-client
文件夹中的io.pivotal.rsocketclient
包内。
一旦数据流开始,此 Disposable
对象允许您控制它。
接下来,向您的 RSocketShellClient
添加一个 .stream()
方法。使用 @ShellMethod
注解标记此方法。示例代码如下所示
@ShellMethod("Send one request. Many responses (stream) will be printed.")
public void stream() {
log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
this.disposable = this.rsocketRequester
.route("stream")
.data(new Message(CLIENT, STREAM))
.retrieveFlux(Message.class)
.subscribe(er -> log.info("Response received: {}", er));
}
在上面的代码中,通过指定 "stream"
作为 .route()
,rsocketRequester
被告知将请求路由到服务器的 .stream()
方法。一个新的消息对象提供了请求的 .data()
。因为您希望服务器返回一个流,所以您使用了 rsocketRequester
上的 .requestFlux()
方法,并指定返回的 Flux
包含类型为 Message
的元素。最后,您在 .subscribe()
方法中设置了一个日志函数作为流的订阅者。
注意
rsocketRequester
生成的Disposable
是如何被保存的。您将需要它来停止流。
通过保留对流的引用,您可以在想要停止流时将其处理掉(dispose)。要将流取消功能添加到您的 RSocketShellClient
中,请添加一个名为 .s()
的新方法,并使用 @ShellMethod
注解标记它,如下所示
@ShellMethod("Stop streaming messages from the server.")
public void s(){
if(null != disposable){
disposable.dispose();
}
}
在该方法内部,调用 disposable.dispose()
会取消流。有了此方法后,要停止流,请在 shell:>
提示符下输入 s
,然后按 Enter
键。流就会停止。您的编码任务现已完成。接下来,测试客户端和服务器是否正常协同工作。
打开终端窗口并进入 rsocket-server
目录。使用 Maven 和 Spring Boot 插件运行服务器,如下所示
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
服务器将在 localhost
的 7000
端口启动。
打开第二个终端窗口并进入 rsocket-client
目录。然后,如下构建并运行 RSocket 客户端应用程序
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
启动后,Spring Shell 会显示一个新的提示符
shell:>
您可以在提示符下输入 stream
来请求服务器的流。客户端发送一个 Message
作为其对流的请求。流中的每个 Message
在服务器发送时和客户端接收时都会被打印出来。客户端的控制台日志看起来像这样
shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)
要停止流,请在 shell:>
提示符下输入 s
,然后按 Enter
键。
您可以通过在 shell:>
提示符下输入 exit
来停止 rsocket-client
,如下所示。
shell:>exit
您可以通过在其终端窗口中按下 Ctrl-C
来停止 rsocket-server
进程。
RSocketShellClient
中的 .stream()
方法使用 RSocketRequester
向服务器发送单个请求消息。此请求会启动一个从服务器到客户端的数据流。然后客户端会将收到的每条消息记录到控制台。
服务器端的 RSocketController
检查请求消息的元数据以查找 route
。此消息的路由设置为 "stream"
,因此服务器将消息传递给相应的 .stream(Message request)
方法。然后服务器开始每秒向客户端发送一条消息流,直到客户端要求其停止。
客户端可以随时停止流。在 .s()
方法中,这是通过调用原始流订阅返回的 Disposable
对象的 .dispose()
方法来实现的。
在本文中,您学习了如何在 Spring Boot 中使用 RSocket 构建 request-stream (请求-流) 功能。在下一篇文章中,我们将添加 channel messaging (通道消息传递) 功能。下次见!