响应式 Spring Cloud 之路

工程 | Josh Long | 2018 年 6 月 20 日 | ...

新的Spring Cloud Finchley GA 版本内容丰富,是通往响应式微服务之路上的一个重要里程碑。我不可能列出所有内容,因此请参考 Spencer Gibb 最新发布的版本公告。在此,我想重点介绍我们通往响应式 Spring Cloud 的历程。

我们于 2017 年 9 月发布了 Spring Framework 5。这是第一个引入新的响应式编程支持的版本,旨在构建更健壮、可扩展的服务。它建立在 Pivotal Reactor 项目(我们兼容响应式流的响应式运行时)之上。Spring Framework 5 还包含了大量的新功能,我在这里不会一一列举,而是专注于响应式支持。什么是响应式编程?它为什么重要?嗯,它对于构建网络服务非常重要。

简而言之,Spring 中服务集成的基础已经更新,以全面拥抱响应式编程。那么,“响应式编程”是什么?响应式编程的认识是,一旦你开始通过网络传输更多数据,用 API 调用填满你的 IO 缓冲区,那么你在任何给定请求中花费的时间将更多地用于 IO。

IO 本身不是问题。IO 传统上是阻塞的——线程必须等待 InputStream 产生新的 byte。(通常在一个 while 循环中通过 read() 读取 byte 缓冲区)。当一个线程等待时,它就无法被重新用于其他任何事情。线程是昂贵的!

想想传统的服务器(用 Java 或任何具有相同线程方法论的平台实现)是如何工作的。如果你的 Web 服务器线程池中有 100 个线程,而收到了 101 个请求,那么最后一个额外的请求将无法得到服务,直到其中一个请求完成处理。如果其他请求能够完成(从而释放它们所占用的线程)并且早于第 101 个请求到达,那很好!可能不需要响应式编程。如果你能够比新请求到达更快地释放线程,并且这些线程花费的时间主要是由于输入/输出,那么就没有必要进行响应式编程。

当你转向微服务、大数据和长连接会话(例如在 WebSocket 和服务器发送事件以及任何其他长连接的服务器端状态中)的世界时,你将开始通过网络传输更多数据。

线程与 IO 的这种耦合是不必要的。你的操作系统几十年来一直支持“后台”IO,并在你需要参与时通知你。事实上,Java 1.4(2000 年代初)支持 NIO(Channels),它为我们提供了这种异步 IO 机制。

在这个世界里,有什么东西会管理 IO,并在你需要参与时回调你的代码。如果有任何延迟,该线程可以自由地继续处理其他请求。它不是阻塞的。你的代码不再是从 InputStream拉取字节,而是异步地将字节推送给你。你有效地颠倒了与数据源的交互方式。

许多项目,如 @NetflixOSS 的 RxJava、@Pivotal 的 @ProjectReactor、Eclipse 的 @vertx_project 和 @lightbend 的 @akkateam,都致力于提供支持这种新异步现实的编程模型。它们之间存在共同点,而响应式流规范(这些项目都支持)就是在这个共同点上诞生的。

响应式流规范支持 Publisher 类型,它向订阅者发布项。当订阅者的 onNextIT) 方法被调用时,Subscribers 就会消耗这些项。当订阅者订阅时,它会获得一个 Subscription,它可以使用该 Subscription 来指示它一次可以处理多少条记录。最后这一点——指定订阅者一次准备处理多少条记录的能力——就是流控制Publisher 不能压倒 Subscriber。这提高了稳定性。在响应式编程的上下文中,流控制被称为背压

还有一个最终的接口,Processor,它只是一个桥梁;它实现了 PublisherSubscriber。Project Reactor 支持两种 Publisher 特殊化:Flux,它发出 0-N 个项,以及 Mono,它发出一个项,或不发出任何项。

这是对 IO 工作方式的根本性重新思考,因此它需要在上层的所有层进行集成;在数据访问层、安全层、Boot 中以及微服务层。

Spring Framework 5 还包含了一个全新的响应式 Web 运行时(甚至支持 Netty 项目),称为 Spring WebFlux。它甚至包含了新的、响应式的函数式端点。我在 2016 年的时候做过一个关于这个的 Spring Tips 视频!

Spring WebFlux 构建在响应式流规范之上,因此可以与任何其他支持库进行互操作。这是一个 Spring Tips 视频,我在其中演示了如何将响应式 Spring Webflux 与 Lightbend 的 Akka Streams(和 Scala)一起使用。

新的 Spring WebFlux 组件模型首先是响应式和异步的。它以传统处理同步场景的方式来支持异步场景,例如 WebSocket 和服务器发送事件。你最终会得到一种处理方式。想在几纳秒内发送一个包含 10 条记录的短 JSON 消息?使用 Publisher!想生成服务器发送事件?这里有一个关于服务器发送事件的 Spring Tips 视频。

这里有一个关于响应式 WebSocket 的 Spring Tips 视频。

新版本还包括一个新的响应式 HTTP 客户端,称为 WebClient。我也做了一个关于这个的 Spring Tips 视频!

Spring Data Kay 通过模板和存储库支持响应式数据访问,适用于具有异步 IO 支持的数据访问技术。这是一个使用响应式 Spring Data MongoDB 的例子。


interface ReservationRepository extends ReactiveMongoRepository<Reservation, String> {

		Flux<Reservation> findByEmail(String email);
}

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
class Reservation {
		@Id
		private String id;
		private String email;
}

Spring Security 5 支持针对传统用例(如下面所示)和 OAuth 的响应式身份验证和授权。


  @Bean
  MapReactiveUserDetailsService authentication() {
    // don't do this! this is a hardcoded username and password and it
    // would literally pain Spring Security lead @rob_winch to see this!
    //
    return new MapReactiveUserDetailsService(
      User.withDefaultPasswordEncoder().username("user").password("pw").roles("USER").build());
  }

  @Bean
  SecurityWebFilterChain authorization(ServerHttpSecurity security) {
  //@formatter:off
  return security
  .csrf().disable()
  .httpBasic()
  .and()
  .authorizeExchange()
    .pathMatchers("/proxy").authenticated()
    .anyExchange().permitAll()
  .and()
  .build();
  //@formatter:on
  }

Spring Boot 2 将所有这些整合在一起,这样构建 REST 端点、使用 Actuator、管理安全以及其他所有事情,无论你选择使用 Spring WebFlux 还是 Spring MVC,都能“正常工作”。

从代码库变更的角度来看,这也意味着 Spring Cloud 团队需要应对许多不确定的地方,这使得这次发布 SO 具有里程碑意义。

新版本将响应式编程无缝地整合到现有问题中:服务注册、发现、安全、CDC(T) 和测试、消息传递、微代理支持、断路器等等。让我们看一些例子。

你可以使用新的响应式 WebClient,让它使用 Spring Cloud DiscoveryClient 抽象支持的任何服务注册表(Netflix Eureka、Hashicorp Consul、Apache Zookeeper、Cloud Foundry 等)来解析主机。


@Bean
WebClient client(LoadBalancerExchangeFilterFunction eff) {
  return WebClient.builder().filter(eff).build();
}

然后,你可以使用这个响应式的、了解服务注册表的 WebClient。在下面的示例中,reservation-service 是一个在服务注册表中注册的服务,而不是实际的主机名。


Publisher<String> emails = client
	.get()
	.uri("http://reservation-service/reservations")
	.retrieve()
	.bodyToFlux(Reservation.class)
	.map(Reservation::getEmail);

你也可以使用 Spring Cloud Stream 中的响应式支持,从 Kafka 或 RabbitMQ 的主题或队列中消费消息。


@Configuration  
@EnableBinding(Sink.class)
public class MyStreamListener {

  @StreamListener
  public void incoming (@Input(Sink.INPUT) Flux<String> names ) {
    names
     .map ( x-> new Reservation( null, x))
     .flatMap ( this.reservationRepository::save )
     .subscribe( x -> log.info( "saved " + x.toString()));
   }
 }

你可以使用 Hystrix 断路器和响应式 Publishers 来保护和隔离潜在的错误服务调用。在下面的示例中,我使用响应式 WebClient 进行了一个可能失败的 HTTP 调用。如果失败,我希望能够提供一个备用的 Publisher 来返回。这就是将要发生的。这与发生的事情几乎同样重要。我的代码不会抛出异常。它会优雅地降级。该断路器有智能。它有状态。如果对该调用进行足够多的连续尝试失败,断路器最终将直接切换到备用 Publisher。如果下游服务重新上线(如果你使用 Cloud Foundry,它会重新上线),它最终会将其自身重新注册到注册表中,注册表会发送一个心跳事件,该心跳事件将用于使注册表中服务的本地视图无效。客户端将看到注册表中存在新实例,它将重置断路器,关闭它,并允许下一次调用通过,希望这次能成功。



Publisher<String> emails = client
  .get()
  .uri("http://reservation-service/reservations")
  .retrieve()
  .bodyToFlux(Reservation.class)
  .map(Reservation::getEmail);

Publisher<String> fallback = HystrixCommands
  .from( emails )
  .eager()
  .commandName("emails")
  .fallback ( Flux.just ("EEK!") )
  .build();

虽然能够响应式地使用这些现有技术很不错,但最令人兴奋的是响应式编程所带来的新可能性!两个新项目,Spring Cloud Gateway 和 Spring Cloud Function,都从中受益匪浅。

即使只是简要地看一下。

Spring Cloud Gateway 是我们全新的响应式 API 网关。它建立在 Spring 的响应式支持之上。毕竟,它的工作是将客户端的请求路由到下游服务。这是一个响应式编程的完美用例(和需求)。我也做了一个关于它的 Spring Tips 视频。

这是一个使用 Spring Cloud Gateway 将请求从 :9999/proxy 代理到一个服务(通过服务注册表解析和负载均衡)并进行速率限制的示例。(注意:此配置可以存在于(可刷新的)Spring Cloud Config Server 的配置中,或者实际上是任何你可以创建 Flux<Route> 的源。)

此示例将每个已认证用户限制为每秒 100 个请求。你不需要 Spring Security 即可使用此网关,但根据配置,它被暗示为已包含。

@Bean
RouteLocator gateway (RouteLocatorBuilder rlb, RedisRateLimiter rrl) {
  return rlb
    .routes()
    .route( spec ->
      spec
       .path("/rl")
       .flters( fs -> fs
         .requestRateLimiter( c -> c.setRateLimiter( this.redisRateLimiter() ))
         .setPath("/reservations")
       )
       .uri("lb://reservation-service/")
    )
    .build();
}


@Bean // 100 reqs per second, burstable to 150
RedisRateLimiter redisRateLimiter (){
  return new RedisRateLimiter(100, 150);
}

Spring Cloud Function 是我们新的函数即服务抽象。它将普通的函数适配为不同函数即服务运行时所需的类型。它可以用于 AWS Lambda、Microsoft Azure,当然还有我们自己的 Project Riff。Project Riff 是一个 Apache 2 许可的、基于 Kubernetes 的多语言函数即服务运行时。我也做了一个关于 Spring Cloud Function 和 Project Riff 的 Spring Tips 视频。

使用它非常简单!你需要创建 java.util.function.Function<I,O> 实例。在这种情况下,IO 都可以是 Publisher<X>

package com.example.uppercase;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;

@SpringBootApplication
public class UppercaseApplication {

		@Bean
		Function<Flux<String>, Flux<String>> uppercase() {
				return incoming -> incoming.map(String::toUpperCase);
		}

		public static void main(String[] args) {
				SpringApplication.run(UppercaseApplication.class, args);
		}
}

正如你希望已经了解到的,响应式编程已经真正地融入了 Spring!Spring Cloud 是最后一个需要支持它的主要项目,以便对响应式编程进行整体讨论。但这绝不是故事的结局。事实上,我们才刚刚开始!敬请期待。:-)

在即将到来的 SpringOne Platform 活动中,我们将讨论响应式编程和基于响应式 Spring Cloud 的微服务等主题。 加入我们

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有