RSocket 入门:Spring Boot 通道

工程 | Ben Wilcock | 2020年4月6日 | ...

阅读时间:约6分钟。实践时间:约20分钟。

如果你和我一样,刚刚开始你的 RSocket 之旅,不妨了解一下 RSocket 协议背后的动机。这份简短但富有洞察力的文档中有一句话让我深有感触——“抽象不匹配会增加系统开发成本”。

从软件设计的角度来看,RSocket 的四种交互模型提供了显著优势。这意味着我们可以针对每个用例,使用正确的交互模型来建模组件间的通信。这种更高效的模型可以为你节省大量的编码时间和精力!

到目前为止,在本系列中,我们已经探讨了请求-响应即发即忘以及请求-流消息传递。今天你将为客户端和服务器代码添加通道

什么是通道?

通道是双向管道,允许数据流在两个方向流动。使用通道,客户端到服务器的数据流可以与服务器到客户端的数据流共存。通道在现实世界中有许多用途。通道可以承载视频会议流、发送和接收双向聊天消息、使用增量和差异同步数据,或者提供报告、观察和监控系统的方式。

RSocket 中的通道并不比流或请求-响应复杂。话虽如此,下面将要实现的场景比你之前尝试的要稍微复杂一些,所以在开始之前最好先理解它。

在接下来的练习中,服务器将消息流式传输到客户端。客户端控制服务器流中消息的频率。它通过使用“延迟”设置流来实现这一点。客户端流中的设置告诉服务器发送每条消息之间应该暂停多长时间。可以将其视为一个消息频率调节盘。频率设置高时,暂停时间短,你会看到很多服务器发送的消息。频率设置低时,暂停时间长,你会看到较少的服务器发送的消息。考虑到这个结果,我们开始编码吧。

完整的代码示例可在 GitHub 上获取。RSocketController 位于 rsocket-server 文件夹的 io.pivotal.rsocketserver 包中。RSocketShellClient 位于 rsocket-client 文件夹的 io.pivotal.rsocketclient 包中。

步骤 1:将 Channel 方法添加到 RSocketController

在服务器端的 RSocketController 类中,添加一个名为 channel() 的方法,该方法接受一个 Flux<Duration> 并返回一个 Flux<Message>。这种方法签名——Flux 输入,Flux 输出——将其标识为 RSocket 通道方法。使用值 "channel" 为该方法添加 @MessageMapping() 注解。此方法的代码如下。

    @MessageMapping("channel")
    Flux<Message> channel(final Flux<Duration> settings) {
        return settings
                    .doOnNext(setting -> log.info("\nFrequency setting is {} second(s).\n", setting.getSeconds()))
                    .switchMap(setting -> Flux.interval(setting)
                                                   .map(index -> new Message(SERVER, CHANNEL, index)))
                                                   .log();
    }

在代码中,.doOnNext() 监听来自客户端的设置流。每当新的 delay 设置到达时,它会将消息写入日志。.switchMap() 为每个新设置创建一个新的 Flux。这个新的 Flux 基于 delay 设置中包含的 .interval() 延迟发出新的 Message 对象。服务器将这些新消息通过流发送回客户端。

步骤 2:将 Channel 方法添加到 RSocketShellClient

在客户端的 RSocketShellClient 类中,添加一个新的 channel() 方法,并使用 @ShellMethod() 注解对其进行标注。将方法用途的描述作为注解的值,如下例所示。

    @ShellMethod("Stream some settings to the server. Stream of responses will be printed.")
    public void channel(){

// Code goes here

}

接下来,在该方法中,添加创建设置流的代码。代码如下:

Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));

Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                                        .doOnNext(d -> log.info("\nSending setting for {}-second interval.\n", d.getSeconds()));

每个 Mono 包含一个 Duration 设置。每个持续时间控制来自服务器的每条消息之间的暂停。总共有 3 个 Mono。第一个包含 1 秒的短暂停设置。第二个有更宽松的 3 秒暂停设置,但使用 .delayElement() 方法指示此 Mono 延迟 5 秒再产生此持续时间。第三个 Mono 包含 5 秒的暂停设置,但在 15 秒过去之前不会发出其持续时间。这 3 个 Mono 使用 .concat() 方法连接成一个 Flux。使用 .doOnNext() 添加了日志记录语句,以便在代码运行时可以看到发生了什么。

注意:构建基于 Flux 的流有很多方法,但对于本教程来说,只是一个简单的示例。

现在,Flux 中有了设置流,在该方法中添加与服务器通信所需的代码。

disposable = this.rsocketRequester
                    .route("channel")
                    .data(settings)
                    .retrieveFlux(Message.class)
                    .subscribe(message -> log.info("Received: {} \n(Type 's' to stop.)", message));

如果你一直跟着本系列,现在应该对这段代码很熟悉了。rsocketRequester 是你在构造函数中创建的全局变量。它提供了你与服务器的 RSocket 通信链接。.route() 设置为 "channel",以匹配服务器端代码中的消息映射。.data() 是你在上面创建的 mono 流。.retrieveFlux() 指定你期望收到 Message 对象的流,而 .subscribe() 开始你的订阅,并确保收到的每条消息都打印到日志中,以便你看到发生了什么。订阅创建的 Disposable 对象被保留并用于控制通道。

你可以在这里查看该方法的完整代码。这就是我们需要的所有代码。让我们运行它!

步骤 3:构建并运行 RSocket 服务器

打开一个终端窗口,进入 rsocket-server 目录,然后按如下方式运行服务器:

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

服务器将在 localhost7000 端口启动。

步骤 4:构建并运行 RSocket 客户端

打开一个第二个终端窗口,进入 rsocket-client 目录。然后按如下方式构建并运行客户端:

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

启动后,Spring Shell 将为你提供一个新的提示符:

shell:>

在提示符下输入 channel 来向服务器请求一个通道。

客户端创建延迟计时器设置流并开始将其发送到服务器。出站流中的每个持续时间都会被客户端和服务器打印出来。服务器发送回消息流,客户端将其打印到日志中。客户端的终端看起来像这样:

shell:>channel
i.p.rsocketclient.RSocketShellClient :

Sending setting for 1-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304561)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 3-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304568)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 5-second interval.

2020-03-27 10:23:00.243 INFO 5680 --- [tor-tcp-epoll-1] i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=4, created=1585304580)
(Type 's' to stop.)

# removed log-lines

要停止通道传输,输入 s 然后按 Enter 键。

步骤 5:清理

你可以在 shell:> 提示符下输入 exit 来退出 rsocket-client,像这样:

shell:>exit

你可以在 rsocket-server 的终端窗口中按下 Ctrl-C 来停止其进程。

工作原理

客户端创建一个包含 3 个持续时间元素的序列。第一个持续时间设置立即发出,第二个在 5 秒后发出,第三个在 15 秒后发出。每次发出新的持续时间时,都会记录到控制台。这个设置流被发送到服务器,并控制服务器生成的消息流。

在服务器端,每当从客户端流中提取新的持续时间设置时,就会创建一个新的消息流并返回。客户端发送的最新的设置控制着此服务器发送流中每条消息之间的时间延迟。

当客户端释放订阅的 disposable 对象时,通道传输停止。

结语

如果你完整地阅读了本系列,你现在已经看到了 RSocket 的所有四种交互模型的实际应用:请求-响应即发即忘请求-流,以及现在的通道。

拥有这四种通信风格,你将大大降低遇到我们在开头讨论的那些恼人的“抽象不匹配”场景的可能性。将 RSocket 纳入你的工具箱,你将拥有一种灵活、低开销、高性能的消息协议,可以在你的软件中使用——一个专为微服务架构而构建的协议。

订阅 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助力你快速提升。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,仅需一份简单订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部