领先一步
VMware 提供培训和认证,以助您加速发展。
了解更多嗨,Spring 粉丝们!Spring 这周真是太疯狂了!我正在参加 SpringOne Platform 2018,沉浸在所有激动人心的社区活动中,与来自世界各地热爱 Pivotal 和 Spring 的人们互动!我刚在离展会五英里的地方——一个我碰巧参加社区晚餐的当地购物中心——被一位越南女士要求和她自拍!Pivotal 将来自不同文化和地区的这么多人聚集在一起,真是太棒了。
今天,2018 年 9 月 26 日,星期三,真是令人难忘的一天!当然,本周有很多很棒的事情发生,但今天对我来说非常特别。今天,我们揭开了 Pivotal 在两个方面所做的惊人工作的面纱。我仅希望在这篇文章中简要提及这些主题。毫无疑问,在接下来的几周里,您将从我们这里听到更多关于这些内容的信息!
首先,我们宣布了我们关于尝试支持使用 R2DBC 进行反应式 SQL 数据访问的标准的工作。R2DBC 仍处于早期阶段,但非常令人兴奋。到目前为止,当我们谈论数据访问时,我总是很快提醒人们,虽然他们可以在反应式应用程序中使用 JDBC,但他们需要自行承担扩展这种交互的问题。他们需要自行将更多线程配置到分配给发生任何 SQL 数据访问的反应式流的 Scheduler
。这是必要的,因为 JDBC 本质上是一个阻塞式和同步 API。它没有能力执行 IO 并在有活动时回调你;客户端线程会一直等待回复。R2DBC 提供了一种替代方案。它不是旨在作为 JDBC 的包装器,而是要支持构建在原生反应式 SQL 数据库驱动程序上的函数式反应式数据访问。我们有一个 SPI 层和一个支持 PostgreSQL 的实现。
让我们看一个例子。为了使它工作,我访问了 Spring Initializr 并选择了 Reactive Web
和 Lombok
。我确保选择了一个 Spring Boot 的 SNAPSHOT
版本。您本身并不需要 Spring Boot 的 SNAPSHOT 版本,但您需要 Spring Initializr 将 Spring 快照存储库添加到您的构建中,以便您可以解析 r2dbc-postgresql
依赖项。然后,我(手动!太可怕了!)编辑了 Maven 构建文件 pom.xml
,并在构建中添加了对 io.r2dbc:r2dbc-postgresql:1.0.0.BUILD-SNAPSHOT
依赖项的依赖关系。
package s1p.r2dbc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlResult;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.net.URI;
@Log4j2
@SpringBootApplication
public class PostgresqlApplication {
public static void main(String args[]) {
SpringApplication.run(PostgresqlApplication.class, args);
}
@Bean
PostgresqlConnectionFactory connectionFactory(
@Value("${spring.datasource.url}") String url) {
URI uri = URI.create(url);
String host = uri.getHost();
String userInfo = uri.getUserInfo();
String user = userInfo, pw = "";
if (userInfo.contains(":")) {
user = userInfo.split(":")[0];
pw = userInfo.split(":")[1];
}
String name = uri.getPath().substring(1);
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration
.builder() //
.database(name) //
.host(host) //
.username(user) //
.password(pw) //
.build();
return new PostgresqlConnectionFactory(configuration);
}
}
@Log4j2
@Service
class CustomerService {
private final ConnectionFactory connectionFactory;
CustomerService(PostgresqlConnectionFactory pgc) {
this.connectionFactory = pgc;
}
Flux<Result> delete(Long id) {
return Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("DELETE FROM customers where id = $1")
.bind("$1", id) //
.execute());
}
Flux<Result> create(Long id, String email) {
return Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("INSERT INTO customers(id,email) VALUES($1, $2)")
.bind("$1", id) //
.bind("$2", email) //
.add().execute());
}
Flux<Customer> all() {
return Mono
.from(this.connectionFactory
.create())
.flatMapMany(connection -> Flux.from(
connection.createStatement("select * from customers").execute())
.flatMap(result -> result.map((row, rowMetadata) -> new Customer(row.get("id", Long.class),
row.get("email", String.class)))));
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {
private Long id;
private String email;
}
很酷吧?这些 API 本身就是反应式的,这意味着您可以利用这些 API 提供的重试和组合功能。
我们还推出了对 RSocket 的支持,RSocket 是一种由(除其他外)来自 Netflix 的人员开发的协议,这些人后来搬到了 Facebook。RSocket 是一种线协议,它将反应式处理的租户作为协议本身的一部分呈现出来。Facebook 开发了两个 RSocket 客户端:一个是用 C++ 编写的,另一个是用 Java 编写的。Java RSocket 客户端建立在 Reactor 项目之上!不过,RSocket 是一种二进制协议,因此理论上您也可以用其他语言构建客户端。
RSocket 是一种通用数据传送协议。它支持多种消息交换模式或样式,包括但不限于请求-响应、发送并忘记、发布-订阅和流式传输。一切皆有可能!这篇文章无法希望全面介绍所有选项,因此让我们来看一个简单的流式传输示例,它有两个组件,一个生产者和一个消费者。为了使它工作,我访问了 Spring Initializr,选择了 Lombok
并选择了 Spring Boot 的最新(稳定)版本。在构建文件 pom.xml
中,我添加了两个依赖项:io.rsocket:rsocket-transport-netty:0.11.5
和 io.rsocket:rsocket-core:0.11.5
。
这里的目标是展示这些 API 非常简单且灵活。您可以直接使用它们,或者像 Netifi 所做的那样,将其作为支持更高级用例的整套基础设施的基础。
这个简单示例的关键在于,生产者将永远每秒发出一个新记录!只要宇宙还没有经历热寂,这个示例应该会一直运行下去!
首先,让我们看一个简单的生产者示例。我将其设置好,以便它将侦听 ApplicationReadyEvent
,并且仅在然后开始服务请求。我想让 Java 进程保持运行(并因此侦听请求),因此,作为一种解决方法,我使用 System.in.read()
轮询控制台以获取输入。请记住,此 API 是非阻塞式和异步的!如果我们不帮助它,它不会使主线程保持活动状态。
package s1p.rsocket.producer;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
@SpringBootApplication
@Log4j2
public class ProducerApplication implements ApplicationListener<ApplicationReadyEvent> {
public static void main(String[] args) throws IOException {
SpringApplication.run(ProducerApplication.class, args);
System.in.read();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
SocketAcceptor sa = (connectionSetupPayload, rSocket) ->
Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) { // produce a result when asked...
return Flux
.interval(Duration.ofMillis(1000)) // ...every one second
.map(aLong -> DefaultPayload.create("interval: " + aLong));
}
});
RSocketFactory
.receive()
.acceptor(sa)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.onTerminateDetach()
.subscribe(nettyContextCloseable -> log.info("started the server @ " + Instant.now().toString()));
}
}
消费者也同样简单。它向服务发起请求,从服务获取反应式流 Publisher<T>
并遍历每个记录,将二进制有效负载解包为 String
,并在其到达时记录它。在这里,我也使用 System.in.read()
作为一种使线程保持运行的方法。
package s1p.rsocket.consumer;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import java.io.IOException;
@Log4j2
@SpringBootApplication
public class ConsumerApplication implements ApplicationListener<ApplicationReadyEvent> {
public static void main(String[] args) throws IOException {
SpringApplication.run(ConsumerApplication.class, args);
System.in.read();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent evt) {
RSocketFactory
.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.flatMapMany(socket ->
socket
.requestStream(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.doFinally(signal -> socket.dispose())
)
.subscribe(name -> log.info("consuming " + name + "."));
}
}
生产者和消费者都连接到 localhost:7000
并开始交互。RSocket 与 HTTP 一样,并不真正关心消息的有效负载是什么。毕竟,它是在网络上传输的二进制数据。
这里有很多关于这两个主题的演讲,这些演讲在 SpringOne Platform 上录制!它们很快就会上线。(你难道不希望你现在就在这里吗?)Spring 在这里发挥着重要作用,因为它具有端到端的反应性。想象一下!您可以构建使用 RSocket 进行服务间通信的反应式微服务。为什么不使用 @Tailable
反应式 Spring Data MongoDB 存储库方法对 MongoDB 数据集中的数据进行某种连续查询,并在数据到达 MongoDB 存储时通过 RSocket 流式传输这些结果?您也可以反过来做。您的 RSocket 客户端可以流式传输大量数据以写入 MongoDB,使用新发布的 Spring Data Lovelace 中 Spring Data MonogDB 版本中的新反应式事务支持!为什么不使用 R2DBC 并以流式方式通过 RSocket 返回大量数据。RSocket 也非常适合边缘!例如,您可以通过 WebSockets 进行 RSocket。想象一下:可以直接由 HTML5 客户端使用的基于 RSocket 的服务。或者,您可以使用 Spring WebFlux 创建反应式 HTTP 服务,这些服务依次调用基于 RSocket 的服务,这些服务又依次调用 R2DBC。正如他们所说,天空可能是无限的和异步的!
Reactor 团队 和 Spring 团队的 Ben Hale 以及其他人,在一年多的时间里一直勤奋地致力于这两个项目,因此我知道他们非常高兴能与您分享此消息,并回答您在参与反应式革命时可能遇到的任何问题!