SpringOne Platform 2019 上的响应式革命 (第 2/N 部分)

工程 | Josh Long | 2019 年 10 月 22 日 | ...

去年,在 SpringOne Platform 2018 大会期间,我发布了本系列的第一篇博文,《SpringOne Platform 2018 上的响应式革命 (第 1/N 部分)》,回顾了我们在 SpringOne Platform 2018 上刚刚发布的一些重要特性。我想继续更新并重温那篇博文以及我去年介绍的概念,并展示我们在过去一年中取得了多大的进步。TL;DR:事情变得容易得多

R2DBC

去年,我们宣布致力于支持响应式 SQL 数据访问标准的工作,推出一个名为 R2DBC 的新项目。JVM 上传统的 SQL 数据访问方法,如 JDBC,是阻塞式 API。它们会独占线程来完成工作,从而抵消了 Spring 这样响应式平台带来的可伸缩性优势。我们希望为开发者改善这种情况,因此构建了 R2DBC。R2DBC 现已公开发布一年多,在此之前还在内部开发了大约同样长的时间。我们刚刚发布了 0.0.8 版本。我们正接近正式发布 (GA)。我不确定具体时间,但我预计在可能正式发布但尚未达到 1.0 的版本中,我们将拥有大多数开发者所需的大部分特性。存储过程等功能可能会稍后发布。

自去年以来,我们取得了巨大的进展。现在有许多不同的实现(有些已可用,有些正在积极开发中),来自各种供应商,支持基于 R2DBC 的数据访问,包括 PostgresSQL、MySQL、Google Cloud Spanner、H2、Microsoft SQL Server 等等。

入门也容易得多。只需访问我在互联网上第二喜欢的地方,仅次于生产环境,也就是Spring Initializr,然后选择 R2DBCPostgresSQL。点击 Generate。生成的压缩包几乎可以直接运行。你需要指定连接信息,但与所有 Spring Boot 自动配置一样,这些细节最好在 Java 代码之外处理。你可以在环境变量、-D 参数以及属性文件中处理它。对我来说,在 src/main/resources/application.properties 文件中是这样的。

spring.r2dbc.url=r2dbc:postgres://localhost/orders
spring.r2dbc.username=orders
spring.r2dbc.password=orders

这是比去年 Java 代码更强大的等效示例

package com.example.r2dbc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

@SpringBootApplication
public class R2dbcApplication {

	public static void main(String[] args) {
		SpringApplication.run(R2dbcApplication.class, args);
	}
}

interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {

	private Long id;

	private String email;

}

很酷吧?这些 API 本身就是响应式的,这意味着你可以利用它们提供的重试和组合功能。我们可以更进一步。现在 Spring Framework 5.2 中引入了新的 ReactiveTransactionManager 抽象,因此支持响应式事务管理。目前已经有该接口的实现,类似于 Spring Framework 中的 PlatformTransactionManager,用于 Spring Data Neo4j RX(新的响应式优先实现)、Spring Data MongoDB 以及当然还有 R2DBC。这里是一个基于我们 R2DBC 代码的响应式事务服务。


@Service
@RequiredArgsConstructor
class CustomerService {

	private final TransactionalOperator transactionalOperator;

	@Transactional
	Flux<Customer> saveAllWithTransactionalAnnotation(String... emails) {
		return this.validCustomersFromEmails(emails);
	}

	Flux<Customer> saveAllWithTransactionalOperator(String... emails) {
		return this.transactionalOperator.transactional(
			this.validCustomersFromEmails(emails));
	}

	private Flux<Customer> validCustomersFromEmails(String... emails) {
		return Flux.fromArray(emails)
			.map(email -> new Customer(null, email))
			.doOnNext(c -> Assert.isTrue(c.getEmail().contains("@"), "the email must contain a '@'"));
	}
}

你需要明确配置事务。方法如下

@Configuration
@EnableTransactionManagement
class TransactionConfiguration {

	@Bean
	ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory cf) {
		return new R2dbcTransactionManager(cf);
	}

	@Bean
	TransactionalOperator transactionalOperator(ReactiveTransactionManager txm) {
		return TransactionalOperator.create(txm);
	}
}

在文章评论区,有人问是否有人与 Jakarta EE 讨论过这一切。我提到了(现已废弃的)ADBA 项目。R2DBC 现在是实现响应式 SQL 数据访问的最佳选择。希望你能尝试一下,亲自体验,并给予反馈!

RSocket:响应式网络协议

去年,我们还首次亮相了对 RSocket 的支持。RSocket 是一种协议,由 Netflix(后来已转至 Facebook)和 Netifi 等公司的工程师开发。RSocket 是一种网络协议,它将响应式处理的原则作为协议本身的一部分体现出来。Facebook 开发了不同的 RSocket 客户端,例如支持 C++ 和 Java。Java RSocket 客户端构建在Reactor 项目之上!不过,RSocket 是一种二进制协议,因此理论上你也可以用其他语言构建客户端。

RSocket 是一种通用的数据传输协议。它与负载无关。它不关心你在网络上传输什么。它也是为操作构建的!它甚至在协议中有一个专门的消息帧来传递服务健康状况和正常运行时间等信息。它支持多种消息交换模式或风格,包括但不限于请求-响应、即发即弃、发布-订阅和流式传输。潜力无限!这篇博文无法全面介绍所有选项,所以我们来看一个简单的流式传输示例,它包含两个组件:生产者和消费者。为了让它工作,我去了 Spring Initializr,选择了 LombokRSocket。我做了两次,一个用于生产者,一个用于消费者。我们来看看。

和之前一样,你需要在 src/main/resources/application.properties 文件中提供一些配置。

spring.rsocket.server.port=7777

这将启动一个 RSocket 服务。这是实际的代码。

package com.example.rsocketservice;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;

import java.time.Duration;

@SpringBootApplication
public class RsocketServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(RsocketServiceApplication.class, args);
	}
}

@Controller
class GreetingsController {

	@MessageMapping("intervals")
	Flux<GreetingResponse> interval(GreetingRequest request) {
		return Flux
			.interval(Duration.ofMillis(1000))
			.map(interval -> new GreetingResponse("Hello (#" + interval + ") " + request.getName() + "!"));
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingRequest {
	private String name;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingResponse {
	private String message;
}

比去年的代码好多了!它的功能是等效的,但做了一些更有趣的事情,比如封送一个对象而不是简单的 String。记住,RSocket 连接是多路复用的、有状态的、长连接。你可以打开一个连接并重复使用它来处理多个请求。它们可能会断开连接,你需要自己处理重试逻辑。这对速度有深远的影响;你不会相信在典型的 HTTP 中连接和断开连接会花费多少时间!

让我们看看客户端,它将消费 Greetings 服务。前往 Spring Initializer,启动一个新项目,选择与之前相同的依赖项,只更改项目名称。

package com.example.rsocketclient;

import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Component;

@SpringBootApplication
@RequiredArgsConstructor
public class RsocketClientApplication {

	@SneakyThrows
	public static void main(String[] args) {
		SpringApplication.run(RsocketClientApplication.class, args);
		System.in.read();
	}

	@Bean
	RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
		return builder.connectTcp("localhost", 7777).block();
	}
}

@Log4j2
@Component
@RequiredArgsConstructor
class Client {

	private final RSocketRequester rSocketRequester;

	@EventListener(ApplicationReadyEvent.class)
	public void ready() {
		this.rSocketRequester
			.route("intervals")
			.data(new GreetingRequest("World"))
			.retrieveFlux(GreetingResponse.class)
			.subscribe(im -> log.info("consuming " + im.getMessage() + "."));
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingRequest {
	private String name;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingResponse {
	private String message;
}

不错吧?我们可以更进一步,使用 Spring Security 对 RSocket 的新支持来保护它,并且我们还可以使用 Spring Cloud Gateway 新增的 RSocket 支持来作为我们服务的前端。

我迫不及待地想在一年后用最新的内容更新这个系列!

订阅 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

快人一步

VMware 提供培训和认证,助你加速前进。

了解更多

获得支持

Tanzu Spring 通过一个简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区所有近期活动。

查看全部