RSocket 入门:Spring Boot Request-Stream

工程 | Ben Wilcock | 2020 年 3 月 23 日 | ...

阅读时间:约 15 分钟。

在本系列之前的文章中,您尝试了使用 Spring Boot 和 RSocket 进行请求-响应即发即忘的消息模式。本次,您将尝试 RSocket 的另一种全新的消息模型——request-stream (请求-流)。

在此练习中,您将学习如何使用传统的 'client-requests-a-server-stream'(客户端请求服务器流)方式进行数据流传输。

到目前为止我还没有提到的一点是,RSocket 允许您在两个方向上使用其消息模型。因此,如果您想使用不太常见的 'server-requests-a-client-stream'(服务器请求客户端流)模型,RSocket 也没有问题。此外,还有很多非 Java 的RSocket 实现可供选择,包括 Go、Javascript 和 .Net——如果您的架构包含 Java 可能不是最佳选择的平台,这些实现将是理想之选。

澄清这一点后,请按照以下步骤,将流数据功能添加到您之前文章中现有的 RSocket 客户端和服务器代码中。

如果您还没有阅读之前关于服务器端客户端请求-响应消息传递或即发即忘的文章,现在是您的机会!代码示例在GitHub 上

步骤 1:添加服务器端流方法

回到您的服务器端 RSocketController 类,添加一个名为 .stream() 的新方法,其签名符合 RSocket 对此消息模型的期望 — '接受一个对象,返回一个 flux'。使用 @MessageMapping 注解标记此新方法,并指定一个合适的映射名称 — 例如 "stream"。方法的示例代码如下

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("Received stream request: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(SERVER, STREAM, index))
                .log();
    }

RSocketController 位于 rsocket-server 文件夹中的 io.pivotal.rsocketserver 包内。

.stream() 方法的唯一参数 Message 来自之前讨论过的 io.pivotal.rsocketserver.data 包。此消息构成了客户端请求数据流的基础。上面的代码会在收到客户端请求时立即将其记录到控制台。

该方法返回的 Flux 对象是Project Reactor的一部分,并且也在 Spring Framework 的响应式支持中得到使用。

RSocket 使用 Flux,因为它极大地简化了响应式数据流的处理。Flux 是数据的一个“Publisher”(发布者)。它描述了 0 到 N 个元素的流,并提供了大量用于处理流数据的操作符 — 类似于Java 8 的 streaming APIs

在上面的代码中,每秒会有一个新的 Long 元素添加到 Flux 中 — 通过调用 .interval() 设置 — 从而实质上提供了一个持续的数据流。.map() 函数使用 Long 值作为索引创建了一个新的消息对象,而在最后一行,调用 .log() 方法会将流经 Flux 的所有元素(包括错误等)打印到控制台。

步骤 2:添加客户端流方法

在客户端项目的 RSocketShellClient 类中,首先添加一个 Disposable 对象的全局引用,如下所示

private static Disposable disposable;

RSocketShellClient 位于 rsocket-client 文件夹中的 io.pivotal.rsocketclient 包内。

一旦数据流开始,此 Disposable 对象允许您控制它。

接下来,向您的 RSocketShellClient 添加一个 .stream() 方法。使用 @ShellMethod 注解标记此方法。示例代码如下所示

    @ShellMethod("Send one request. Many responses (stream) will be printed.")
    public void stream() {
        log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
        this.disposable = this.rsocketRequester
                .route("stream")
                .data(new Message(CLIENT, STREAM))
                .retrieveFlux(Message.class)
                .subscribe(er -> log.info("Response received: {}", er));
    }

在上面的代码中,通过指定 "stream" 作为 .route()rsocketRequester 被告知将请求路由到服务器的 .stream() 方法。一个新的消息对象提供了请求的 .data()。因为您希望服务器返回一个流,所以您使用了 rsocketRequester 上的 .requestFlux() 方法,并指定返回的 Flux 包含类型为 Message 的元素。最后,您在 .subscribe() 方法中设置了一个日志函数作为流的订阅者。

注意 rsocketRequester 生成的 Disposable 是如何被保存的。您将需要它来停止流。

步骤 3:添加客户端停止流方法

通过保留对流的引用,您可以在想要停止流时将其处理掉(dispose)。要将流取消功能添加到您的 RSocketShellClient 中,请添加一个名为 .s() 的新方法,并使用 @ShellMethod 注解标记它,如下所示

    @ShellMethod("Stop streaming messages from the server.")
    public void s(){
        if(null != disposable){
            disposable.dispose();
        }
    }

在该方法内部,调用 disposable.dispose() 会取消流。有了此方法后,要停止流,请在 shell:> 提示符下输入 s,然后按 Enter 键。流就会停止。您的编码任务现已完成。接下来,测试客户端和服务器是否正常协同工作。

步骤 4:构建并运行 RSocket 服务器

打开终端窗口并进入 rsocket-server 目录。使用 Maven 和 Spring Boot 插件运行服务器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

服务器将在 localhost7000 端口启动。

步骤 5:构建并运行 RSocket 客户端

打开第二个终端窗口并进入 rsocket-client 目录。然后,如下构建并运行 RSocket 客户端应用程序

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

启动后,Spring Shell 会显示一个新的提示符

shell:>

您可以在提示符下输入 stream 来请求服务器的流。客户端发送一个 Message 作为其对流的请求。流中的每个 Message 在服务器发送时和客户端接收时都会被打印出来。客户端的控制台日志看起来像这样

shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)

要停止流,请在 shell:> 提示符下输入 s,然后按 Enter 键。

步骤 5:清理

您可以通过在 shell:> 提示符下输入 exit 来停止 rsocket-client,如下所示。

shell:>exit

您可以通过在其终端窗口中按下 Ctrl-C 来停止 rsocket-server 进程。

工作原理

RSocketShellClient 中的 .stream() 方法使用 RSocketRequester 向服务器发送单个请求消息。此请求会启动一个从服务器到客户端的数据流。然后客户端会将收到的每条消息记录到控制台。

服务器端的 RSocketController 检查请求消息的元数据以查找 route。此消息的路由设置为 "stream",因此服务器将消息传递给相应的 .stream(Message request) 方法。然后服务器开始每秒向客户端发送一条消息流,直到客户端要求其停止。

客户端可以随时停止流。在 .s() 方法中,这是通过调用原始流订阅返回的 Disposable 对象的 .dispose() 方法来实现的。

总结

在本文中,您学习了如何在 Spring Boot 中使用 RSocket 构建 request-stream (请求-流) 功能。在下一篇文章中,我们将添加 channel messaging (通道消息传递) 功能。下次见!

订阅 Spring 新闻通讯

订阅 Spring 新闻通讯,保持连接

订阅

领先一步

VMware 提供培训和认证,助力您的进步。

了解更多

获得支持

Tanzu Spring 通过一份简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

近期活动

查看 Spring 社区中的所有近期活动。

查看全部