SpringOne Platform 2019 的反应式革命(第 2/N 部分)

工程 | Josh Long | 2019 年 10 月 22 日 | ...

去年,在 SpringOne Platform 2018 的中间,我发布了本系列中的第一篇文章,_SpringOne Platform 2018 的反应式革命(第 1/N 部分)_,其中探讨了我们在 SpringOne Platform 2018 上发布的一些重要功能。我想跟进并重新审视该博客和我去年介绍的概念,并展示我们在过去一年中取得了多大的进步。简而言之:事情变得 _容易_ 多了!

R2DBC

去年,我们宣布了我们正在努力支持一个用于反应式 SQL 数据访问的标准,并启动了一个名为R2DBC的新项目。JVM 上传统的 SQL 数据访问方法(如 JDBC)是阻塞式 API。它们独占线程来完成工作,并抵消了像 Spring 这样的反应式平台的可扩展性优势。我们希望改善开发人员的这种情况,因此我们构建了 R2DBC。R2DBC 现在已经公开发布了一年多,在此之前,内部开发了大约同样长的时间。我们刚刚发布了 0.0.8 版本。我们即将发布 GA 版本。我不确定是什么时候,但我认为我们将拥有大多数开发人员在可能为 GA 但尚未为 1.0 的版本中需要的功能。像存储过程这样的功能可能会晚些时候发布。

自去年以来,我们取得了巨大进步。现在有许多不同的实现(有些可用,有些正在积极开发中)来自各个供应商支持基于 R2DBC 的数据访问,包括 PostgresSQL、MySQL、Google Cloud Spanner、H2、Microsoft SQL Server 等。

入门也 _容易_ 多了。只需访问我在互联网上第二喜欢的地方(仅次于生产环境),Spring Initializr,然后选择R2DBC并选择PostgresSQL。点击生成。生成的归档文件几乎可以立即运行。您需要指定连接信息,但与所有 Spring Boot 自动配置一样,此类详细信息最好在 Java 代码之外进行处理。您可以通过环境、-D 参数和属性文件来处理它。以下是我在src/main/resources/application.properties中使用的示例。

spring.r2dbc.url=r2dbc:postgres://127.0.0.1/orders
spring.r2dbc.username=orders
spring.r2dbc.password=orders

以下是以往年份 Java 代码的等效代码

package com.example.r2dbc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

@SpringBootApplication
public class R2dbcApplication {

	public static void main(String[] args) {
		SpringApplication.run(R2dbcApplication.class, args);
	}
}

interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {

	private Long id;

	private String email;

}

很酷,对吧?这些 API 本身就是反应式的,这意味着您可以利用这些 API 提供的重试和组合功能。我们可以更进一步。由于 Spring Framework 5.2 中引入的新ReactiveTransactionManager抽象,现在支持反应式事务管理。此接口的实现类似于 Spring Framework 中的PlatformTransactionManager,适用于 Spring Data Neo4j RX(新的、反应式优先实现)、Spring Data MongoDB,当然还有 R2DBC。以下是一个基于我们的 R2DBC 代码的反应式事务服务。


@Service
@RequiredArgsConstructor
class CustomerService {

	private final TransactionalOperator transactionalOperator;

	@Transactional
	Flux<Customer> saveAllWithTransactionalAnnotation(String... emails) {
		return this.validCustomersFromEmails(emails);
	}

	Flux<Customer> saveAllWithTransactionalOperator(String... emails) {
		return this.transactionalOperator.transactional(
			this.validCustomersFromEmails(emails));
	}

	private Flux<Customer> validCustomersFromEmails(String... emails) {
		return Flux.fromArray(emails)
			.map(email -> new Customer(null, email))
			.doOnNext(c -> Assert.isTrue(c.getEmail().contains("@"), "the email must contain a '@'"));
	}
}

您需要显式配置事务。操作方法如下

@Configuration
@EnableTransactionManagement
class TransactionConfiguration {

	@Bean
	ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory cf) {
		return new R2dbcTransactionManager(cf);
	}

	@Bean
	TransactionalOperator transactionalOperator(ReactiveTransactionManager txm) {
		return TransactionalOperator.create(txm);
	}
}

在文章的评论中,有人询问是否有人在与 Jakarta EE 讨论所有这些。我提到了(现已失效的)ADBA 项目。R2DBC 现在是实现反应式 SQL 数据访问的最佳选择。我希望您能尝试一下,试用一下并提供反馈!

RSocket:反应式线协议

去年,我们还推出了对 RSocket 的支持,RSocket 是一种由 Netflix(他们后来转到 Facebook)和 Netifi 的人员(其中包括)开发的协议。RSocket 是一种线协议,它将反应式处理的租户作为协议本身的一部分呈现出来。Facebook 开发了不同的 RSocket 客户端,例如支持 C++ 和 Java。Java RSocket 客户端构建在Reactor 项目之上!RSocket 是一种二进制协议,因此理论上您也可以使用其他语言构建客户端。

RSocket 是一种通用数据传送协议。它是与有效负载无关的。它不关心您通过网络发送什么内容。它也是为操作而构建的!它甚至在协议中有一个专门的消息帧来传送服务健康状况和正常运行时间等信息。它支持多种消息交换模式或样式,包括但不限于请求-响应、触发-忘记、发布-订阅和流式传输。天空才是极限!本文无法希望全面介绍所有选项,因此让我们来看一个简单的流式传输示例,该示例有两个组件:生产者和消费者。为了使它工作,我转到 Spring Initializr,选择了LombokRSocket。我做了两次,一次用于生产者,另一次用于消费者。让我们看看。

您需要像以前一样在src/main/resources/application.properties中提供一些配置。

spring.rsocket.server.port=7777

这将启动一个 RSocket 服务。以下是实际代码。

package com.example.rsocketservice;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;

import java.time.Duration;

@SpringBootApplication
public class RsocketServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(RsocketServiceApplication.class, args);
	}
}

@Controller
class GreetingsController {

	@MessageMapping("intervals")
	Flux<GreetingResponse> interval(GreetingRequest request) {
		return Flux
			.interval(Duration.ofMillis(1000))
			.map(interval -> new GreetingResponse("Hello (#" + interval + ") " + request.getName() + "!"));
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingRequest {
	private String name;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingResponse {
	private String message;
}

比去年的代码好多了!它在功能上是等效的,但它还做了一些更有趣的事情,例如编组对象而不是仅仅String。请记住,RSocket 连接是多路复用、有状态、长期存在的连接。您可以打开连接并重复使用它来处理多个请求。它们可能会断开连接,您需要在那里加入重试逻辑。这对速度有深远的影响;您不会相信在典型的 HTTP 中花费了多少时间进行连接和断开连接!

让我们看看客户端,即用于使用 Greetings 服务的客户端。转到 Spring Initializer,启动一个新项目,选择与之前相同的依赖项,仅更改项目名称。

package com.example.rsocketclient;

import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Component;

@SpringBootApplication
@RequiredArgsConstructor
public class RsocketClientApplication {

	@SneakyThrows
	public static void main(String[] args) {
		SpringApplication.run(RsocketClientApplication.class, args);
		System.in.read();
	}

	@Bean
	RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
		return builder.connectTcp("localhost", 7777).block();
	}
}

@Log4j2
@Component
@RequiredArgsConstructor
class Client {

	private final RSocketRequester rSocketRequester;

	@EventListener(ApplicationReadyEvent.class)
	public void ready() {
		this.rSocketRequester
			.route("intervals")
			.data(new GreetingRequest("World"))
			.retrieveFlux(GreetingResponse.class)
			.subscribe(im -> log.info("consuming " + im.getMessage() + "."));
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingRequest {
	private String name;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingResponse {
	private String message;
}

不错吧?我们可以更进一步,使用 Spring Security 中对 RSocket 的新支持对其进行保护,我们还可以使用 Spring Cloud Gateway 的新 RSocket 支持来为我们的服务提供前端。

我迫不及待地想用一年后的最新和最棒的功能更新这个系列!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多信息

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部