RSocket 入门:Spring Boot 请求流

工程 | Ben Wilcock | 2020年3月23日 | ...

时间:大约 15 分钟。

在本系列之前的文章中,您尝试使用 Spring Boot 和 RSocket 进行 请求-响应发布订阅 消息传递。这次,您将尝试 RSocket 的另一种新的消息传递模型——请求流。

在本练习中,您将学习如何使用传统的“客户端请求服务器流”方法来流式传输数据。

我之前没有提到的一件事是,RSocket 允许您在任一方向使用其消息传递模型。因此,如果您想使用不太常见的“服务器请求客户端流”模型,这对 RSocket 来说没有任何问题。此外,还有许多非 Java RSocket 实现 可供选择,包括 Go、Javascript 和 .Net——如果您的架构包含 Java 可能不是最佳选择的平台,则非常理想。

明确这一点后,请按照以下步骤将流数据功能添加到您之前文章中现有的 RSocket 客户端和服务器代码。

如果您没有阅读之前关于 服务器端客户端 请求-响应消息传递或 发布订阅 的文章,现在是您的机会!代码示例位于 GitHub 上。

步骤 1:添加服务器端流方法

再次在您的服务器端RSocketController类中,添加一个名为.stream()的新方法,其签名为“接受一个对象,返回一个 Flux”,这是 RSocket 对此消息传递模型的期望。使用@MessageMapping注解并指定合适的映射名称(例如"stream")来注解此新方法。该方法的示例代码如下所示

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("Received stream request: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(SERVER, STREAM, index))
                .log();
    }

RSocketController位于rsocket-server文件夹中的io.pivotal.rsocketserver包中。

.stream()方法的唯一参数Message来自之前讨论过的io.pivotal.rsocketserver.data包。此消息构成了客户端请求数据流的基础。上面的代码在接收到客户端请求后立即将其记录到控制台。

该方法返回的Flux对象是Project Reactor的一部分,也用于Spring Framework的响应式支持

RSocket 使用Flux是因为它极大地简化了响应式数据流的处理。Flux 是数据的“发布者”。它描述了 0 到 N 个元素的流,并提供了许多用于处理流数据的运算符——类似于Java 8 的流式 API

在上面的代码中,每秒都会向 Flux 添加一个新的Long元素——通过.interval()调用设置——基本上提供了一个恒定的数据流。.map()函数使用Long作为索引值创建一个新的消息对象,在最后一行,对.log()方法的调用将流经 Flux 的所有元素打印到控制台,包括错误等。

步骤 2:添加客户端流方法

在客户端项目的RSocketShellClient类中,首先,添加对Disposable对象的全局引用,如下所示

private static Disposable disposable;

RSocketShellClient位于rsocket-client文件夹中的io.pivotal.rsocketclient包中。

Disposable对象允许您在数据流开始后控制它。

接下来,向您的RSocketShellClient添加一个.stream()方法。使用@ShellMethod注解此方法。此方法的示例代码如下所示

    @ShellMethod("Send one request. Many responses (stream) will be printed.")
    public void stream() {
        log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
        this.disposable = this.rsocketRequester
                .route("stream")
                .data(new Message(CLIENT, STREAM))
                .retrieveFlux(Message.class)
                .subscribe(er -> log.info("Response received: {}", er));
    }

在上面的代码中,通过将"stream"指定为.route()rsocketRequester被告知将请求路由到服务器的.stream()方法。一个新的消息对象提供您的请求的.data()。因为您希望服务器返回一个流,所以您在rsocketRequester上使用了.requestFlux()方法,指定返回的Flux包含Message类型的元素。最后,您设置了一个日志函数作为流的订阅者在.subscribe()方法中。

请注意如何保留rsocketRequester生成的Disposable。您需要它来停止流。

步骤 3:添加客户端停止流方法

通过保留对流的引用,您可以根据需要处置它以停止流式传输。要将流取消功能添加到您的RSocketShellClient,请添加一个名为.s()的新方法,并使用@ShellMethod进行如下注解

    @ShellMethod("Stop streaming messages from the server.")
    public void s(){
        if(null != disposable){
            disposable.dispose();
        }
    }

在方法内部,调用disposable.dispose()会取消流。使用此方法后,要停止流,请在shell:>提示符下键入s,然后按Enter键。然后流将停止。您的编码任务现在已完成。接下来,测试客户端和服务器是否一起工作。

步骤 4:构建并运行 RSocket 服务器

打开一个终端窗口并移动到rsocket-server目录。使用 Maven 和 Spring Boot 插件运行服务器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

服务器在localhost端口7000上启动。

步骤 5:构建并运行 RSocket 客户端

打开**第二个**终端窗口并移动到rsocket-client目录。从那里构建并运行 RSocket 客户端应用程序,如下所示

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

启动后,Spring Shell 会向您显示一个新的提示符

shell:>

您可以通过在提示符下键入stream来请求服务器上的流。客户端发送一个Message作为其对流的请求。流中的每个Message都在服务器发送时以及客户端接收时都被打印出来。客户端上的控制台日志如下所示

shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)

要停止流,请在shell:>提示符下键入s,然后按Enter键。

步骤 5:清理

您可以通过在shell:>提示符下键入exit来停止rsocket-client

shell:>exit

您可以通过在其终端窗口中按Ctrl-C来停止rsocket-server进程。

工作原理

RSocketShellClient 中的 .stream() 方法使用 RSocketRequester 向服务器发送单个请求消息。此请求启动从服务器到客户端的数据流。然后,客户端将其接收到的每条消息记录到控制台。

服务器端的 RSocketController 检查请求消息的元数据中的 route。此消息的路由设置为 "stream",因此服务器将消息传递给相应的 .stream(Message request) 方法。然后,服务器每秒向客户端发送一系列消息,直到客户端请求其停止。

客户端可以随时停止流。在 .s() 方法中,这是通过对原始流订阅返回的 Disposable 调用 .dispose() 来完成的。

最终想法

在这篇文章中,您学习了如何在 Spring Boot 中使用 RSocket 构建请求流功能。在下一篇文章中,我们将添加通道消息传递。下次再见!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

抢先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部