SpringOne Platform 2018 的反应式革命 (第 1/N 部分)

工程 | Josh Long | 2018 年 9 月 27 日 | ...

嗨,Spring 粉丝们!Spring 这周真是太疯狂了!我正在参加 SpringOne Platform 2018,沉浸在所有激动人心的社区活动中,与来自世界各地热爱 Pivotal 和 Spring 的人们互动!我刚在离展会五英里的地方——一个我碰巧参加社区晚餐的当地购物中心——被一位越南女士要求和她自拍!Pivotal 将来自不同文化和地区的这么多人聚集在一起,真是太棒了。

今天,2018 年 9 月 26 日,星期三,真是令人难忘的一天!当然,本周有很多很棒的事情发生,但今天对我来说非常特别。今天,我们揭开了 Pivotal 在两个方面所做的惊人工作的面纱。我仅希望在这篇文章中简要提及这些主题。毫无疑问,在接下来的几周里,您将从我们这里听到更多关于这些内容的信息!

使用 R2DBC 进行反应式 SQL 数据访问

首先,我们宣布了我们关于尝试支持使用 R2DBC 进行反应式 SQL 数据访问的标准的工作。R2DBC 仍处于早期阶段,但非常令人兴奋。到目前为止,当我们谈论数据访问时,我总是很快提醒人们,虽然他们可以在反应式应用程序中使用 JDBC,但他们需要自行承担扩展这种交互的问题。他们需要自行将更多线程配置到分配给发生任何 SQL 数据访问的反应式流的 Scheduler。这是必要的,因为 JDBC 本质上是一个阻塞式和同步 API。它没有能力执行 IO 并在有活动时回调你;客户端线程会一直等待回复。R2DBC 提供了一种替代方案。它不是旨在作为 JDBC 的包装器,而是要支持构建在原生反应式 SQL 数据库驱动程序上的函数式反应式数据访问。我们有一个 SPI 层和一个支持 PostgreSQL 的实现。

让我们看一个例子。为了使它工作,我访问了 Spring Initializr 并选择了 Reactive WebLombok。我确保选择了一个 Spring Boot 的 SNAPSHOT 版本。您本身并不需要 Spring Boot 的 SNAPSHOT 版本,您需要 Spring Initializr 将 Spring 快照存储库添加到您的构建中,以便您可以解析 r2dbc-postgresql 依赖项。然后,我(手动!太可怕了!)编辑了 Maven 构建文件 pom.xml,并在构建中添加了对 io.r2dbc:r2dbc-postgresql:1.0.0.BUILD-SNAPSHOT 依赖项的依赖关系。

package s1p.r2dbc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlResult;

import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.net.URI;

@Log4j2
@SpringBootApplication
public class PostgresqlApplication {

  public static void main(String args[]) {
    SpringApplication.run(PostgresqlApplication.class, args);
  }

  @Bean
  PostgresqlConnectionFactory connectionFactory(
      @Value("${spring.datasource.url}") String url) {

    URI uri = URI.create(url);
    String host = uri.getHost();
    String userInfo = uri.getUserInfo();
    String user = userInfo, pw = "";

    if (userInfo.contains(":")) {
      user = userInfo.split(":")[0];
      pw = userInfo.split(":")[1];
    }

    String name = uri.getPath().substring(1);
    PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration
        .builder() //
        .database(name) //
        .host(host) //
        .username(user) //
        .password(pw) //
        .build();
    return new PostgresqlConnectionFactory(configuration);
  }
}

@Log4j2
@Service
class CustomerService {

  private final ConnectionFactory connectionFactory;

  CustomerService(PostgresqlConnectionFactory pgc) {
    this.connectionFactory = pgc;
  }

  Flux<Result> delete(Long id) {
    return Mono.from(this.connectionFactory.create())
      .flatMapMany(connection -> connection
        .createStatement("DELETE FROM customers where id = $1")
        .bind("$1", id) //
        .execute());
  }

  Flux<Result> create(Long id, String email) {
    return Mono.from(this.connectionFactory.create())
      .flatMapMany(connection -> connection
        .createStatement("INSERT INTO customers(id,email) VALUES($1, $2)")
        .bind("$1", id) //
        .bind("$2", email) //
        .add().execute());
  }

  Flux<Customer> all() {

    return Mono
      .from(this.connectionFactory
        .create())
      .flatMapMany(connection -> Flux.from(
        connection.createStatement("select * from customers").execute())
        .flatMap(result -> result.map((row, rowMetadata) -> new Customer(row.get("id", Long.class),
          row.get("email", String.class)))));
  }

}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {

  private Long id;

  private String email;

} 

很酷吧?这些 API 本身就是反应式的,这意味着您可以利用这些 API 提供的重试和组合功能。

RSocket:反应式线协议

我们还推出了对 RSocket 的支持,RSocket 是一种由(除其他外)来自 Netflix 的人员开发的协议,这些人后来搬到了 Facebook。RSocket 是一种线协议,它将反应式处理的租户作为协议本身的一部分呈现出来。Facebook 开发了两个 RSocket 客户端:一个是用 C++ 编写的,另一个是用 Java 编写的。Java RSocket 客户端建立在 Reactor 项目之上!不过,RSocket 是一种二进制协议,因此理论上您也可以用其他语言构建客户端。

RSocket 是一种通用数据传送协议。它支持多种消息交换模式或样式,包括但不限于请求-响应、发送并忘记、发布-订阅和流式传输。一切皆有可能!这篇文章无法希望全面介绍所有选项,因此让我们来看一个简单的流式传输示例,它有两个组件,一个生产者和一个消费者。为了使它工作,我访问了 Spring Initializr,选择了 Lombok 并选择了 Spring Boot 的最新(稳定)版本。在构建文件 pom.xml 中,我添加了两个依赖项:io.rsocket:rsocket-transport-netty:0.11.5io.rsocket:rsocket-core:0.11.5

这里的目标是展示这些 API 非常简单且灵活。您可以直接使用它们,或者像 Netifi 所做的那样,将其作为支持更高级用例的整套基础设施的基础。

这个简单示例的关键在于,生产者将永远每秒发出一个新记录!只要宇宙还没有经历热寂,这个示例应该会一直运行下去!

首先,让我们看一个简单的生产者示例。我将其设置好,以便它将侦听 ApplicationReadyEvent,并且仅在然后开始服务请求。我想让 Java 进程保持运行(并因此侦听请求),因此,作为一种解决方法,我使用 System.in.read() 轮询控制台以获取输入。请记住,此 API 是非阻塞式和异步的!如果我们不帮助它,它不会使主线程保持活动状态。

package s1p.rsocket.producer;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

@SpringBootApplication
@Log4j2
public class ProducerApplication implements ApplicationListener<ApplicationReadyEvent> {

  public static void main(String[] args) throws IOException {
    SpringApplication.run(ProducerApplication.class, args);
    System.in.read();
  }

  @Override
  public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
    SocketAcceptor sa = (connectionSetupPayload, rSocket) ->
      Mono.just(new AbstractRSocket() {

        @Override
        public Flux<Payload> requestStream(Payload payload) { // produce a result when asked...
          return Flux
            .interval(Duration.ofMillis(1000))  // ...every one second
            .map(aLong -> DefaultPayload.create("interval: " + aLong));
        }
      });

    RSocketFactory
      .receive()
      .acceptor(sa)
      .transport(TcpServerTransport.create("localhost", 7000))
      .start()
      .onTerminateDetach()
      .subscribe(nettyContextCloseable -> log.info("started the server @ " + Instant.now().toString()));
  }
}

消费者也同样简单。它向服务发起请求,从服务获取反应式流 Publisher<T> 并遍历每个记录,将二进制有效负载解包为 String,并在其到达时记录它。在这里,我也使用 System.in.read() 作为一种使线程保持运行的方法。

package s1p.rsocket.consumer;

import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;

import java.io.IOException;

@Log4j2
@SpringBootApplication
public class ConsumerApplication implements ApplicationListener<ApplicationReadyEvent> {

  public static void main(String[] args) throws IOException {
    SpringApplication.run(ConsumerApplication.class, args);
    System.in.read();
  }

  @Override
  public void onApplicationEvent(ApplicationReadyEvent evt) {
    RSocketFactory
      .connect()
      .transport(TcpClientTransport.create("localhost", 7000))
      .start()
      .flatMapMany(socket ->
        socket
          .requestStream(DefaultPayload.create("Hello"))
          .map(Payload::getDataUtf8)
          .doFinally(signal -> socket.dispose())
      )
      .subscribe(name -> log.info("consuming " + name + "."));
  }
}

生产者和消费者都连接到 localhost:7000 并开始交互。RSocket 与 HTTP 一样,并不真正关心消息的有效负载是什么。毕竟,它是在网络上传输的二进制数据。

后续步骤

这里有很多关于这两个主题的演讲,这些演讲在 SpringOne Platform 上录制!它们很快就会上线。(你难道不希望你现在就在这里吗?)Spring 在这里发挥着重要作用,因为它具有端到端的反应性。想象一下!您可以构建使用 RSocket 进行服务间通信的反应式微服务。为什么不使用 @Tailable 反应式 Spring Data MongoDB 存储库方法对 MongoDB 数据集中的数据进行某种连续查询,并在数据到达 MongoDB 存储时通过 RSocket 流式传输这些结果?您也可以反过来做。您的 RSocket 客户端可以流式传输大量数据以写入 MongoDB,使用新发布的 Spring Data Lovelace 中 Spring Data MonogDB 版本中的新反应式事务支持!为什么不使用 R2DBC 并以流式方式通过 RSocket 返回大量数据。RSocket 也非常适合边缘!例如,您可以通过 WebSockets 进行 RSocket。想象一下:可以直接由 HTML5 客户端使用的基于 RSocket 的服务。或者,您可以使用 Spring WebFlux 创建反应式 HTTP 服务,这些服务依次调用基于 RSocket 的服务,这些服务又依次调用 R2DBC。正如他们所说,天空可能是无限的和异步的!

Reactor 团队 和 Spring 团队的 Ben Hale 以及其他人,在一年多的时间里一直勤奋地致力于这两个项目,因此我知道他们非常高兴能与您分享此消息,并回答您在参与反应式革命时可能遇到的任何问题!

获取 Spring Newsletter

与 Spring Newsletter 保持联系

订阅

领先一步

VMware 提供培训和认证,以助您加速发展。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部