领先一步
VMware 提供培训和认证,助你加速进步。
了解更多各位 Spring 粉丝们大家好!这是 Spring 疯狂的一周!我正在参加 SpringOne Platform 2018,沉浸在所有令人兴奋的社区活动中,与来自世界各地热爱 Pivotal 和 Spring 的人们交流!就在离会场五英里远的一个当地商场里(我恰好在那里参加社区晚宴),我被一位来自越南的女士要求合影自拍!Pivotal 能将如此多不同文化和地方的人们聚集在一起,真是太棒了。
今天,2018年9月26日,星期三,真是个**重磅日子**!这周当然有很多精彩的事情发生,但今天对我来说非常特别。今天我们揭开了 Pivotal 在两个方面所做出的惊人工作的面纱。在这篇文章中,我只想简要地提及这些话题。毫无疑问,未来几周你们会从我们这里听到更多关于它们的消息!
首先,我们宣布了我们正在努力支持使用 R2DBC 实现响应式 SQL 数据访问的标准。R2DBC 尚处于早期阶段,但非常令人兴奋。到目前为止,当我们谈论数据访问时,我一直迅速提醒人们,虽然他们可以在响应式应用程序中使用 JDBC,但扩展该交互的责任需要他们自己承担。他们必须配置更多线程到分配给发生任何 SQL 数据访问的响应式流的 `Scheduler`。这是必要的,因为 JDBC 本质上是一个阻塞和同步的 API。它无法执行 IO 然后在有活动时回调你;客户端线程必须等待响应。R2DBC 提供了一种替代方案。它**不是**旨在封装 JDBC,而是支持基于原生响应式 SQL 数据库驱动程序的函数式响应式数据访问。我们有一个 SPI 层和一个支持 PostgreSQL 的实现。
让我们来看一个例子。为了实现这一点,我去了 Spring Initializr 并选择了 `Reactive Web` 和 `Lombok`。我确保选择一个 Spring Boot 的 `SNAPSHOT` 版本。你不需要 Spring Boot 本身的 SNAPSHOT 版本,**严格来说**,但你会希望 Spring Initializr 将 Spring SNAPSHOT 仓库添加到你的构建中,以便你可以解析 `r2dbc-postgresql` 依赖。然后,我(手动!太可怕了!)编辑了 Maven 构建文件 `pom.xml`,并在构建中添加了对 `io.r2dbc:r2dbc-postgresql:1.0.0.BUILD-SNAPSHOT` 依赖的引用。
package s1p.r2dbc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlResult;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.net.URI;
@Log4j2
@SpringBootApplication
public class PostgresqlApplication {
public static void main(String args[]) {
SpringApplication.run(PostgresqlApplication.class, args);
}
@Bean
PostgresqlConnectionFactory connectionFactory(
@Value("${spring.datasource.url}") String url) {
URI uri = URI.create(url);
String host = uri.getHost();
String userInfo = uri.getUserInfo();
String user = userInfo, pw = "";
if (userInfo.contains(":")) {
user = userInfo.split(":")[0];
pw = userInfo.split(":")[1];
}
String name = uri.getPath().substring(1);
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration
.builder() //
.database(name) //
.host(host) //
.username(user) //
.password(pw) //
.build();
return new PostgresqlConnectionFactory(configuration);
}
}
@Log4j2
@Service
class CustomerService {
private final ConnectionFactory connectionFactory;
CustomerService(PostgresqlConnectionFactory pgc) {
this.connectionFactory = pgc;
}
Flux<Result> delete(Long id) {
return Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("DELETE FROM customers where id = $1")
.bind("$1", id) //
.execute());
}
Flux<Result> create(Long id, String email) {
return Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("INSERT INTO customers(id,email) VALUES($1, $2)")
.bind("$1", id) //
.bind("$2", email) //
.add().execute());
}
Flux<Customer> all() {
return Mono
.from(this.connectionFactory
.create())
.flatMapMany(connection -> Flux.from(
connection.createStatement("select * from customers").execute())
.flatMap(result -> result.map((row, rowMetadata) -> new Customer(row.get("id", Long.class),
row.get("email", String.class)))));
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {
private Long id;
private String email;
}
相当酷,对吧?这些 API 是原生响应式的,这意味着你可以利用这些 API 提供的重试和组合功能。
我们还首次展示了对 RSocket 的支持。RSocket 是由 Netflix 的一些人员(后来转到 Facebook)以及其他人开发的协议。RSocket 是一种线协议,将响应式处理的原则作为协议本身的一部分呈现出来。Facebook 开发了两个 RSocket 客户端:一个用 C++,另一个用 Java。Java RSocket 客户端是构建在 Reactor 项目之上的!不过,RSocket 是一种二进制协议,因此理论上你也可以使用其他语言构建客户端。
RSocket 是一种通用的数据传输协议。它支持多种消息交换模式或风格,包括但不限于请求-响应、即发即弃、发布-订阅和流式传输。可能性无限!这篇文章无法彻底介绍所有选项,所以让我们看一个简单的流式传输示例,它有两个组件:生产者和消费者。为了实现这一点,我去了 Spring Initializr,选择了 `Lombok` 并选择了最新的(稳定)Spring Boot 版本。在构建文件 `pom.xml` 中,我添加了两个依赖:`io.rsocket:rsocket-transport-netty:0.11.5` 和 `io.rsocket:rsocket-core:0.11.5`。
这里的目标是展示这些 API 的简单性和灵活性。你可以直接使用它们,或者像 Netifi 所做的那样,将它们作为支持更高级用例的一整套基础设施的基础。
这个简单示例的核心是生产者将每秒发射一条新记录,永远!只要宇宙还没有经历热寂,这个例子就会一直运行下去!
首先,我们来看一个简单的生产者示例。我将其设置为监听 `ApplicationReadyEvent`,然后才开始提供请求。我想保持 Java 进程运行(从而监听请求),所以作为一个小技巧,我使用 `System.in.read()` 来轮询控制台输入。记住,这个 API 是非阻塞和异步的!如果我们不帮助它,它不会保持主线程活跃。
package s1p.rsocket.producer;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
@SpringBootApplication
@Log4j2
public class ProducerApplication implements ApplicationListener<ApplicationReadyEvent> {
public static void main(String[] args) throws IOException {
SpringApplication.run(ProducerApplication.class, args);
System.in.read();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
SocketAcceptor sa = (connectionSetupPayload, rSocket) ->
Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) { // produce a result when asked...
return Flux
.interval(Duration.ofMillis(1000)) // ...every one second
.map(aLong -> DefaultPayload.create("interval: " + aLong));
}
});
RSocketFactory
.receive()
.acceptor(sa)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.onTerminateDetach()
.subscribe(nettyContextCloseable -> log.info("started the server @ " + Instant.now().toString()));
}
}
消费者同样也很直接。它向服务发起请求,从服务获取响应式流 `Publisher
package s1p.rsocket.consumer;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import java.io.IOException;
@Log4j2
@SpringBootApplication
public class ConsumerApplication implements ApplicationListener<ApplicationReadyEvent> {
public static void main(String[] args) throws IOException {
SpringApplication.run(ConsumerApplication.class, args);
System.in.read();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent evt) {
RSocketFactory
.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.flatMapMany(socket ->
socket
.requestStream(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.doFinally(signal -> socket.dispose())
)
.subscribe(name -> log.info("consuming " + name + "."));
}
}
生产者和消费者都连接到 `localhost:7000` 并开始交互。RSocket,就像 HTTP 一样,并不真正关心消息的负载是什么。毕竟,它在网络上是二进制数据。
SpringOne Platform 录制了大量详细阐述这两个主题的演讲!它们很快就会上线。(同时,你是不是很希望能亲临现场?)Spring 在这里发挥着重要作用,因为它实现了端到端的响应式。想象一下!你可以构建响应式微服务,使用 RSocket 进行服务间通信。为什么不使用 `@Tailable` 响应式 Spring Data MongoDB 仓库方法对 MongoDB 数据集进行连续查询,并通过 RSocket 流式传输那些数据到达 MongoDB 存储时产生的结果呢?你也可以反过来做。你的 RSocket 客户端可以流式传输大量数据写入 MongoDB,利用刚刚发布的 Spring Data Lovelace 版本中新增的**响应式**事务支持!为什么不使用 R2DBC 并通过 RSocket 流式传输大量数据呢?RSocket 对边缘应用也非常有利!你可以通过例如 websockets 进行 RSocket 通信。想象一下:基于 RSocket 的服务可以直接被 HTML5 客户端消费。或者,你可以使用 Spring WebFlux 创建响应式 HTTP 服务,这些服务反过来调用基于 RSocket 的服务,而这些服务又调用 R2DBC。正如他们所说,天空可能是无限且异步的!
Reactor 团队和 Spring 团队的 Ben Hale 等人一年多来一直勤奋地致力于这两个项目,所以我知道他们非常兴奋能与你们分享这个消息,并在你们投身响应式革命时回答你们提出的任何问题!