抢先一步
VMware 提供培训和认证,以加速您的进步。
了解更多时间:大约 15 分钟。
在本系列之前的文章中,您尝试使用 Spring Boot 和 RSocket 进行 请求-响应 和 发布订阅 消息传递。这次,您将尝试 RSocket 的另一种新的消息传递模型——请求流。
在本练习中,您将学习如何使用传统的“客户端请求服务器流”方法来流式传输数据。
我之前没有提到的一件事是,RSocket 允许您在任一方向使用其消息传递模型。因此,如果您想使用不太常见的“服务器请求客户端流”模型,这对 RSocket 来说没有任何问题。此外,还有许多非 Java RSocket 实现 可供选择,包括 Go、Javascript 和 .Net——如果您的架构包含 Java 可能不是最佳选择的平台,则非常理想。
明确这一点后,请按照以下步骤将流数据功能添加到您之前文章中现有的 RSocket 客户端和服务器代码。
如果您没有阅读之前关于 服务器端、客户端 请求-响应消息传递或 发布订阅 的文章,现在是您的机会!代码示例位于 GitHub 上。
再次在您的服务器端RSocketController
类中,添加一个名为.stream()
的新方法,其签名为“接受一个对象,返回一个 Flux”,这是 RSocket 对此消息传递模型的期望。使用@MessageMapping
注解并指定合适的映射名称(例如"stream"
)来注解此新方法。该方法的示例代码如下所示
@MessageMapping("stream")
Flux<Message> stream(Message request) {
log.info("Received stream request: {}", request);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message(SERVER, STREAM, index))
.log();
}
RSocketController
位于rsocket-server
文件夹中的io.pivotal.rsocketserver
包中。
.stream()
方法的唯一参数Message
来自之前讨论过的io.pivotal.rsocketserver.data
包。此消息构成了客户端请求数据流的基础。上面的代码在接收到客户端请求后立即将其记录到控制台。
该方法返回的Flux
对象是Project Reactor的一部分,也用于Spring Framework的响应式支持。
RSocket 使用Flux
是因为它极大地简化了响应式数据流的处理。Flux 是数据的“发布者”。它描述了 0 到 N 个元素的流,并提供了许多用于处理流数据的运算符——类似于Java 8 的流式 API。
在上面的代码中,每秒都会向 Flux 添加一个新的Long
元素——通过.interval()
调用设置——基本上提供了一个恒定的数据流。.map()
函数使用Long
作为索引值创建一个新的消息对象,在最后一行,对.log()
方法的调用将流经 Flux 的所有元素打印到控制台,包括错误等。
在客户端项目的RSocketShellClient
类中,首先,添加对Disposable
对象的全局引用,如下所示
private static Disposable disposable;
RSocketShellClient
位于rsocket-client
文件夹中的io.pivotal.rsocketclient
包中。
此Disposable
对象允许您在数据流开始后控制它。
接下来,向您的RSocketShellClient
添加一个.stream()
方法。使用@ShellMethod
注解此方法。此方法的示例代码如下所示
@ShellMethod("Send one request. Many responses (stream) will be printed.")
public void stream() {
log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
this.disposable = this.rsocketRequester
.route("stream")
.data(new Message(CLIENT, STREAM))
.retrieveFlux(Message.class)
.subscribe(er -> log.info("Response received: {}", er));
}
在上面的代码中,通过将"stream"
指定为.route()
,rsocketRequester
被告知将请求路由到服务器的.stream()
方法。一个新的消息对象提供您的请求的.data()
。因为您希望服务器返回一个流,所以您在rsocketRequester
上使用了.requestFlux()
方法,指定返回的Flux
包含Message
类型的元素。最后,您设置了一个日志函数作为流的订阅者在.subscribe()
方法中。
请注意如何保留
rsocketRequester
生成的Disposable
。您需要它来停止流。
通过保留对流的引用,您可以根据需要处置它以停止流式传输。要将流取消功能添加到您的RSocketShellClient
,请添加一个名为.s()
的新方法,并使用@ShellMethod
进行如下注解
@ShellMethod("Stop streaming messages from the server.")
public void s(){
if(null != disposable){
disposable.dispose();
}
}
在方法内部,调用disposable.dispose()
会取消流。使用此方法后,要停止流,请在shell:>
提示符下键入s
,然后按Enter
键。然后流将停止。您的编码任务现在已完成。接下来,测试客户端和服务器是否一起工作。
打开一个终端窗口并移动到rsocket-server
目录。使用 Maven 和 Spring Boot 插件运行服务器,如下所示
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
服务器在localhost
端口7000
上启动。
打开**第二个**终端窗口并移动到rsocket-client
目录。从那里构建并运行 RSocket 客户端应用程序,如下所示
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
启动后,Spring Shell 会向您显示一个新的提示符
shell:>
您可以通过在提示符下键入stream
来请求服务器上的流。客户端发送一个Message
作为其对流的请求。流中的每个Message
都在服务器发送时以及客户端接收时都被打印出来。客户端上的控制台日志如下所示
shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)
要停止流,请在shell:>
提示符下键入s
,然后按Enter
键。
您可以通过在shell:>
提示符下键入exit
来停止rsocket-client
。
shell:>exit
您可以通过在其终端窗口中按Ctrl-C
来停止rsocket-server
进程。
RSocketShellClient
中的 .stream()
方法使用 RSocketRequester
向服务器发送单个请求消息。此请求启动从服务器到客户端的数据流。然后,客户端将其接收到的每条消息记录到控制台。
服务器端的 RSocketController
检查请求消息的元数据中的 route
。此消息的路由设置为 "stream"
,因此服务器将消息传递给相应的 .stream(Message request)
方法。然后,服务器每秒向客户端发送一系列消息,直到客户端请求其停止。
客户端可以随时停止流。在 .s()
方法中,这是通过对原始流订阅返回的 Disposable
调用 .dispose()
来完成的。
在这篇文章中,您学习了如何在 Spring Boot 中使用 RSocket 构建请求流功能。在下一篇文章中,我们将添加通道消息传递。下次再见!