RSocket 入门:Spring Boot Channels

工程 | Ben Wilcock | 2020年4月6日 | ...

阅读时间:约6分钟。实践时间:约20分钟。

如果您像我一样,仍在RSocket学习的初期,请查看RSocket协议背后的动机。这份简短但富有洞见的文档包含一条与我强烈共鸣的信息——“不匹配的抽象增加了系统开发的成本”。

从软件设计的角度来看,RSocket的四种交互模型提供了显著的优势。这意味着我们可以使用每个用例正确的交互模型来模拟组件间的通信。这种更高效的模型可以节省您大量的编码时间和精力!

到目前为止,在本系列文章中,我们已经探讨了请求-响应单向发送请求-流消息传递。今天,您将向客户端和服务器代码添加 *channels*。

什么是 Channels?

Channels 是双向管道,允许数据流在任一方向流动。使用 Channels,客户端到服务器的数据流可以与服务器到客户端的数据流共存。Channels 有许多实际用途。Channels 可以传输视频会议流,发送和接收双向聊天消息,使用增量和差异同步数据,或提供一种报告、观察和监控系统的方法。

RSocket 中的 Channels 与流或请求-响应一样简单。也就是说,您将在下面实现的场景比您之前尝试的稍微复杂一些,因此最好在开始之前理解它。

在接下来的练习中,服务器将消息流式传输到客户端。客户端使用“延迟”设置流来控制服务器流中消息的频率。客户端流中的设置告诉服务器在发送每条消息之间应暂停多长时间。把它想象成一个消息频率调节器。频率设置越高,暂停时间越短,您将看到许多服务器发送的消息。频率设置越低,暂停时间越长,您将看到较少的服务器发送的消息。考虑到这个结果,让我们开始编码。

完整的代码示例可在GitHub上找到。RSocketController位于rsocket-server文件夹中的io.pivotal.rsocketserver包中。RSocketShellClient位于rsocket-client文件夹中的io.pivotal.rsocketclient包中。

步骤1:向RSocketController添加Channel方法

在服务器端的RSocketController类中,添加一个名为channel()的方法,该方法接受一个Flux<Duration>并返回一个Flux<Message>。此方法签名(输入flux,输出flux)将此方法标识为RSocket channel方法。使用值为“channel”的@MessageMapping()注解该方法。此方法的代码如下所示。

    @MessageMapping("channel")
    Flux<Message> channel(final Flux<Duration> settings) {
        return settings
                    .doOnNext(setting -> log.info("\nFrequency setting is {} second(s).\n", setting.getSeconds()))
                    .switchMap(setting -> Flux.interval(setting)
                                                   .map(index -> new Message(SERVER, CHANNEL, index)))
                                                   .log();
    }

在代码中,.doOnNext()正在监听来自客户端的设置流。每次收到新的delay设置时,它都会向日志写入一条消息。.switchMap()为每个新设置创建一个新的Flux。这个新的flux根据delay设置中包含的.interval()延迟发出新的Message对象。服务器将这些新消息通过流发送回客户端。

步骤2:向RSocketShellClient添加Channel方法

在客户端的RSocketShellClient类中,添加一个新的channel()方法并使用@ShellMethod()注解它。将方法目的的描述作为注解值添加,如下例所示。

    @ShellMethod("Stream some settings to the server. Stream of responses will be printed.")
    public void channel(){

// Code goes here

}

接下来,在方法中,添加创建设置流的代码。代码如下所示

Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));

Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                                        .doOnNext(d -> log.info("\nSending setting for {}-second interval.\n", d.getSeconds()));

每个Mono包含单个Duration设置。每个持续时间控制服务器发出的每条消息之间的暂停。总共有3个Mono。第一个包含1秒的短暂停设置。第二个具有3秒的更长的暂停设置,但此mono被告知使用.delayElement()方法将此持续时间的产生延迟5秒。第三个mono包含5秒的暂停设置,但在15秒过去之前不会发出其持续时间。这3个mono使用.concat()方法连接成单个Flux。使用.doOnNext()添加日志语句,以便您可以在代码运行时查看发生了什么。

注意:有很多方法可以构建基于Flux的流,但对于本教程,只是一个简单的示例。

现在您已经在flux中有了设置流,请向方法中添加与服务器通信所需的代码

disposable = this.rsocketRequester
                    .route("channel")
                    .data(settings)
                    .retrieveFlux(Message.class)
                    .subscribe(message -> log.info("Received: {} \n(Type 's' to stop.)", message));

如果您一直在关注本系列,那么这段代码现在看起来很熟悉了。rsocketRequester是您在构造函数中构建的全局变量。它提供您与服务器的RSocket通信链路。.route()设置为“channel”以匹配服务器端代码中的消息映射。.data()是您上面创建的mono流。.retrieveFlux()指定您期望的是Message对象的流,而.subscribe()启动您的订阅并确保接收到的每条消息都打印到日志中,以便您可以看到发生了什么。订阅创建的Disposable对象被保留并用于控制channel。

您可以在这里查看该方法的完整代码here。这就是我们需要的所有代码。让我们运行它!

步骤3:构建并运行RSocket服务器

打开一个终端窗口,移动到rsocket-server目录,然后按如下方式运行服务器

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

服务器在localhost端口7000上启动。

步骤4:构建并运行RSocket客户端

打开 *第二个* 终端窗口,并移动到rsocket-client目录。在那里,构建并运行客户端,如下所示

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

启动后,Spring Shell 将为您提供一个新的提示符

shell:>

您可以通过在提示符下键入channel来请求服务器上的channel。

客户端创建其延迟计时器设置流并开始将其发送到服务器。客户端和服务器都会打印出出站流中的每个持续时间。服务器发送回消息流,客户端将其打印到日志中。客户端的终端看起来像这样

shell:>channel
i.p.rsocketclient.RSocketShellClient :

Sending setting for 1-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304561)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 3-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304568)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 5-second interval.

2020-03-27 10:23:00.243 INFO 5680 --- [tor-tcp-epoll-1] i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=4, created=1585304580)
(Type 's' to stop.)

# removed log-lines

要停止通道,请键入s,然后按Enter

步骤5:清理

您可以通过在shell:>提示符下键入exit来退出rsocket-client,如下所示。

shell:>exit

您可以通过在其终端窗口中按Ctrl-C来停止rsocket-server进程。

工作原理

客户端创建一系列3个持续时间元素。第一个持续时间设置立即发出,第二个在5秒后发出,第三个在15秒后发出。每次发出新的持续时间时,都会将其记录到控制台。此设置流被发送到服务器,并控制服务器生成的的消息流。

在服务器端,每次从客户端的流中提取新的持续时间设置时,都会创建一个新的消息流并返回。客户端发送的最新设置控制此服务器发送的流中每条消息之间的时间延迟。

当客户端处理订阅的可释放对象时,信道传输停止。

最终想法

如果您关注了整个系列,那么您现在已经看到了RSocket 的所有四种交互模型:请求-响应单向请求请求-流,以及现在的通道。

有了这四种通信方式,您就不太可能遇到我们在开头讨论的那些恼人的“不匹配抽象”场景。使用RSocket,您将拥有一个灵活、低摩擦、高性能的消息传递协议,可用于您的软件——该协议专为微服务架构而设计。

获取Spring通讯

通过Spring通讯保持联系

订阅

领先一步

VMware提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部