SpringOne Platform 2018 上的反应式革命(第一部分/N)

工程 | Josh Long | 2018 年 9 月 27 日 | ...

您好,Spring 的粉丝们!Spring 这个星期过得太疯狂了!我正在 SpringOne Platform 2018 上,尽情享受激动人心的社区活动,与来自世界各地热爱 Pivotal 和 Spring 的人们交流!就在刚才,我在离会场五英里外的一家本地商场里,一边参加社区晚餐,一边被一位来自越南的女士拉着合影留念!Pivotal 能够汇聚来自如此不同文化和地域的人们,真是太棒了。

今天,也就是 2018 年 9 月 26 日,真是不平凡的一天!当然,本周有很多精彩的事情发生,但今天对我来说尤其特别。今天,我们揭开了 Pivotal 在两个方面所做的令人惊叹的工作。在这篇文章中,我只想简要地提及这些话题。毫无疑问,您将在接下来的几周内听到我们更多地介绍这些内容!

使用 R2DBC 进行反应式 SQL 数据访问

首先,我们宣布了我们在支持使用 R2DBC 进行反应式 SQL 数据访问标准方面的工作。R2DBC 仍处于早期阶段,但非常令人兴奋。到目前为止,当我们谈论数据访问时,我总是会提醒人们,虽然他们可以在反应式应用程序中使用 JDBC,但他们需要自己负责扩展该交互的规模。他们需要为任何 SQL 数据访问发生的反应式流分配的 Scheduler 配置更多线程。这是必要的,因为 JDBC 本质上是一个阻塞的同步 API。它无法执行 IO 然后在有活动时回调您;客户端线程会一直等待回复。R2DBC 提供了一个替代方案。它不是用来包装 JDBC 的,而是支持基于原生反应式 SQL 数据库驱动程序构建的功能性反应式数据访问。我们有一个 SPI 层和一个支持 PostgreSQL 的实现。

让我们来看一个例子。为了实现这一点,我去了 Spring Initializr,选择了 Reactive WebLombok。我确保选择了一个 Spring Boot 的 SNAPSHOT 版本。您并不需要 Spring Boot 本身是 SNAPSHOT 版本本身,但您希望 Spring Initializr 将 Spring snapshot 仓库添加到您的构建中,以便您可以解析 r2dbc-postgresql 依赖项。然后,我(手动!太恐怖了!)编辑了 Maven 构建 pom.xml,并在构建中添加了对 io.r2dbc:r2dbc-postgresql:1.0.0.BUILD-SNAPSHOT 依赖项的引用。

package s1p.r2dbc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlResult;

import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.net.URI;

@Log4j2
@SpringBootApplication
public class PostgresqlApplication {

  public static void main(String args[]) {
    SpringApplication.run(PostgresqlApplication.class, args);
  }

  @Bean
  PostgresqlConnectionFactory connectionFactory(
      @Value("${spring.datasource.url}") String url) {

    URI uri = URI.create(url);
    String host = uri.getHost();
    String userInfo = uri.getUserInfo();
    String user = userInfo, pw = "";

    if (userInfo.contains(":")) {
      user = userInfo.split(":")[0];
      pw = userInfo.split(":")[1];
    }

    String name = uri.getPath().substring(1);
    PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration
        .builder() //
        .database(name) //
        .host(host) //
        .username(user) //
        .password(pw) //
        .build();
    return new PostgresqlConnectionFactory(configuration);
  }
}

@Log4j2
@Service
class CustomerService {

  private final ConnectionFactory connectionFactory;

  CustomerService(PostgresqlConnectionFactory pgc) {
    this.connectionFactory = pgc;
  }

  Flux<Result> delete(Long id) {
    return Mono.from(this.connectionFactory.create())
      .flatMapMany(connection -> connection
        .createStatement("DELETE FROM customers where id = $1")
        .bind("$1", id) //
        .execute());
  }

  Flux<Result> create(Long id, String email) {
    return Mono.from(this.connectionFactory.create())
      .flatMapMany(connection -> connection
        .createStatement("INSERT INTO customers(id,email) VALUES($1, $2)")
        .bind("$1", id) //
        .bind("$2", email) //
        .add().execute());
  }

  Flux<Customer> all() {

    return Mono
      .from(this.connectionFactory
        .create())
      .flatMapMany(connection -> Flux.from(
        connection.createStatement("select * from customers").execute())
        .flatMap(result -> result.map((row, rowMetadata) -> new Customer(row.get("id", Long.class),
          row.get("email", String.class)))));
  }

}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {

  private Long id;

  private String email;

} 

很酷吧?API 是原生反应式的,这意味着您可以利用这些 API 提供的重试和组合功能。

RSocket:反应式线协议

我们还推出了对 RSocket 的支持,RSocket 是由 Netflix 等公司开发的一种协议,后来他们加入了 Facebook。RSocket 是一种线协议,它将反应式处理的租约作为协议本身的一部分暴露出来。Facebook 开发了两个 RSocket 客户端:一个是用 C++ 编写的,另一个是用 Java 编写的。Java RSocket 客户端建立在 Reactor 项目之上!RSocket 是一个二进制协议,因此理论上您也可以用其他语言构建客户端。

RSocket 是一种通用数据传输协议。它支持多种消息交换模式或风格,包括但不限于请求-响应、单向发送(fire-and-forget)、发布-订阅和流式传输。可能性无限!本文无法全面介绍所有选项,因此让我们来看一个简单的流式传输示例,该示例包含两个组件:生产者和消费者。为了实现这一点,我去了 Spring Initializr,选择了 Lombok,并选择了最新(稳定)版本的 Spring Boot。在构建文件 pom.xml 中,我添加了两个依赖项:io.rsocket:rsocket-transport-netty:0.11.5io.rsocket:rsocket-core:0.11.5

目标是演示这些 API 的简单性和灵活性。您可以直接使用它们,或者像 Netifi 那样,将其作为整个基础设施的基础,以支持更高级的用例。

这个简单示例的核心是,生产者将每秒发出一个新记录,永远如此!只要宇宙还没有经历热寂,这个示例就应该会继续运行!

首先,让我们看一个简单的生产者示例。我将其设置为监听 ApplicationReadyEvent,然后才开始处理请求。我想保持 Java 进程运行(因此监听请求),所以,作为一种临时解决方案,我使用 System.in.read() 来轮询控制台输入。请记住,这个 API 是非阻塞和异步的!如果我们不帮助它,它就不会保持主线程存活。

package s1p.rsocket.producer;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

@SpringBootApplication
@Log4j2
public class ProducerApplication implements ApplicationListener<ApplicationReadyEvent> {

  public static void main(String[] args) throws IOException {
    SpringApplication.run(ProducerApplication.class, args);
    System.in.read();
  }

  @Override
  public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
    SocketAcceptor sa = (connectionSetupPayload, rSocket) ->
      Mono.just(new AbstractRSocket() {

        @Override
        public Flux<Payload> requestStream(Payload payload) { // produce a result when asked...
          return Flux
            .interval(Duration.ofMillis(1000))  // ...every one second
            .map(aLong -> DefaultPayload.create("interval: " + aLong));
        }
      });

    RSocketFactory
      .receive()
      .acceptor(sa)
      .transport(TcpServerTransport.create("localhost", 7000))
      .start()
      .onTerminateDetach()
      .subscribe(nettyContextCloseable -> log.info("started the server @ " + Instant.now().toString()));
  }
}

消费者也同样简单。它向服务发起请求,从服务获取反应式流 Publisher<T>,并遍历每个记录,将二进制有效负载解包为 String,并在到达时将其记录下来。在这里,我也使用 System.in.read() 来保持线程运行。

package s1p.rsocket.consumer;

import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;

import java.io.IOException;

@Log4j2
@SpringBootApplication
public class ConsumerApplication implements ApplicationListener<ApplicationReadyEvent> {

  public static void main(String[] args) throws IOException {
    SpringApplication.run(ConsumerApplication.class, args);
    System.in.read();
  }

  @Override
  public void onApplicationEvent(ApplicationReadyEvent evt) {
    RSocketFactory
      .connect()
      .transport(TcpClientTransport.create("localhost", 7000))
      .start()
      .flatMapMany(socket ->
        socket
          .requestStream(DefaultPayload.create("Hello"))
          .map(Payload::getDataUtf8)
          .doFinally(signal -> socket.dispose())
      )
      .subscribe(name -> log.info("consuming " + name + "."));
  }
}

生产者和消费者都连接到 localhost:7000 并开始交互。RSocket,就像 HTTP 一样,并不真正关心消息的有效负载是什么。毕竟,它在网络上传输的是二进制数据。

下一步

在这里的 SpringOne Platform 上,有大量的演讲深入探讨了这两个主题。它们很快就会在线上提供。(在此期间,您难道不想亲临现场吗?)Spring 在此扮演着重要角色,因为它实现了端到端的反应式处理。想象一下!您可以构建反应式微服务,使用 RSocket 进行服务到服务的通信。为什么不使用 @Tailable 反应式 Spring Data MongoDB 存储库方法,对 MongoDB 数据集中的数据进行某种连续查询,并通过 RSocket 流式传输那些到达 MongoDB 存储的结果呢?您也可以反过来操作。您的 RSocket 客户端可以流式传输大量数据到 MongoDB 中进行写入,使用 Spring Data MongoDB 最新发布的 Spring Data Lovelace 版本中新增的反应式事务支持!为什么不使用 R2DBC 并以流式传输的方式通过 RSocket 返回大量数据呢?RSocket 在边缘计算方面也是最佳选择!您可以通过 WebSocket(例如)来传输 RSocket。想象一下:基于 RSocket 的服务可以直接被 HTML5 客户端消费。或者,您可以创建使用 Spring WebFlux 的反应式 HTTP 服务,这些服务又会调用基于 RSocket 的服务,而这些服务又会调用 R2DBC。正如他们所说,天空潜在地是无限的和异步的!

Reactor 团队和 Spring 团队的 Ben Hale 等人,一年多来一直在努力推进这两个项目,所以我知道他们非常乐意与您分享这个消息,并在您拿起反应式革命的武器时,为您解答任何疑问!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有