响应式 Spring Cloud 之路

工程 | Josh Long | 2018 年 6 月 20 日 | ...

新发布的 Spring Cloud Finchley GA 版本 干货满满,代表着通往响应式微服务之旅的一个重要里程碑。我不可能一一列举所有内容,因此请您参考 Spencer Gibb 发布的这份新鲜出炉的版本公告。相反,在这篇文章中,我想重点探讨我们通往响应式 Spring Cloud 所走过的道路。

我们于 2017 年 9 月发布了 Spring Framework 5。这是首个引入新的响应式编程支持的版本,旨在帮助构建更健壮、更可扩展的服务。它基于 Pivotal Reactor 项目构建,该项目是我们的兼容 Reactive Streams 的响应式运行时。Spring Framework 5 还包含大量新功能,我也不会尝试一一列出,而是选择在此重点关注响应式支持。什么是响应式编程?为什么它很重要?嗯,当您构建网络服务时,它就很重要了。

简而言之,Spring 中服务集成的基础已焕然一新,全面拥抱响应式编程。那么,“响应式编程”是什么?响应式编程认识到,一旦您开始通过网络传输更多数据,用 API 调用填满您的 IO 缓冲区,您在任何给定请求中进行 IO 的时间就会更多。

IO 本身不是问题。IO 传统上是阻塞的——一个线程必须等待一个 InputStream 生成新的 byte。(通常在 while 循环中 read() 读取 byte 缓冲区)。当一个线程等待时,它不能被用于其他任何事情。线程是昂贵的!

想想看,一个用 Java 或任何采用相同线程处理方法的平台实现的传统服务器是如何工作的。如果您的 Web 服务器线程池中有 100 个线程,并且有 101 个请求到达,那么最后一个额外的请求将不会得到服务,直到其他请求中的一个完成处理其请求为止。如果其他请求能够在第 101 个请求到达之前完成(从而释放它们独占的线程),那就太好了!可能就不需要响应式编程了。如果您释放线程的速度快于新请求到达的速度,并且这些线程中花费的时间主要是在输入/输出上,那么就不需要响应式编程了。

当您迈向微服务、大数据和长连接会话(例如 WebSocket、服务器发送事件以及任何其他长时运行的服务器端状态)的世界时,您将开始通过网络传输更多数据。

这种线程与 IO 的耦合是不必要的。几十年来,您的操作系统一直支持“后台”IO,并在您需要参与时通知您。事实上,Java 1.4(从 21 世纪初开始)就支持 NIO (Channels),它为我们提供了这种异步 IO 机制。

在这个世界里,有东西管理着 IO,并在需要时回调您的代码。如果存在任何延迟,该线程可以自由地继续处理其他请求。它不会阻塞。您的代码不是从 InputStream拉取字节,而是字节会被异步地推送给它。您实际上颠倒了与数据源的交互方式。

许多项目,例如来自 @NetflixOSS 的 RxJava、来自 @Pivotal 的 @ProjectReactor、来自 Eclipse 的 @vertx_project 和来自 @lightbend 的 @akkateam,都致力于提供一种支持这种新的异步现实的编程模型。它们之间存在共同点,而 Reactive Streams 规范就诞生于这些共同点之上,所有这些项目都支持该规范。

Reactive Streams 规范支持 Publisher 类型,它向订阅者发布条目。Subscribers 在其 onNext() 方法被调用时消费条目。当订阅者订阅时,会获得一个 Subscription,它可以使用该对象来表示可以处理多少条记录。最后一点——指定订阅者一次准备处理多少条记录的能力——就是流量控制Publisher 不能压垮 Subscriber。这增强了稳定性。在响应式编程的语境下,流量控制被称为背压

还有一个最终的接口,Processor,它只是一个桥梁;它同时实现了 PublisherSubscriber。Project Reactor 支持两种 Publisher 的特化类型:Flux,它发出 0 到 N 个条目,以及 Mono,它发出一个条目或不发出条目。

这是对 IO 发生方式的根本性重新思考,因此需要上层每一层进行集成;在数据访问层、安全层、Boot 中以及微服务层。

Spring Framework 5 还包含一个全新的响应式 Web 运行时(甚至支持 Netty 项目),称为 Spring WebFlux。它甚至包含了新的函数式响应式端点。我早在 2016 年就做过一个关于这个的 Spring Tips 视频!

Spring WebFlux 基于 Reactive Streams 规范构建,因此可以与任何其他支持库互操作。这里有一个 Spring Tips 视频,我在其中演示了如何将响应式 Spring Webflux 与 Lightbend 的 Akka Streams(以及 Scala)一起使用。

新的 Spring WebFlux 组件模型首先是响应式的和异步的。它以与您传统处理同步情况相同的方式支持异步情况,例如 WebSocket 和服务器发送事件。最终您得到的是一种类型的东西。想要在几纳秒内发送包含 10 条记录的短 JSON 节?使用 Publisher!想要生成服务器发送事件?这里有一个关于服务器发送事件的 Spring Tips 视频。

这里有一个关于响应式 WebSocket 的 Spring Tips 视频。

新版本还包含一个新的响应式 HTTP 客户端,称为 WebClient。我也做了一个关于它的 Spring Tips 视频!

Spring Data Kay 通过模板和 Repository 为支持异步 IO 的数据访问技术提供响应式数据访问支持。这里有一个使用响应式 Spring Data MongoDB 的示例。


interface ReservationRepository extends ReactiveMongoRepository<Reservation, String> {

		Flux<Reservation> findByEmail(String email);
}

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
class Reservation {
		@Id
		private String id;
		private String email;
}

Spring Security 5 为传统用例(如下所示)和 OAuth 提供响应式身份验证和授权支持。


  @Bean
  MapReactiveUserDetailsService authentication() {
    // don't do this! this is a hardcoded username and password and it
    // would literally pain Spring Security lead @rob_winch to see this!
    //
    return new MapReactiveUserDetailsService(
      User.withDefaultPasswordEncoder().username("user").password("pw").roles("USER").build());
  }

  @Bean
  SecurityWebFilterChain authorization(ServerHttpSecurity security) {
  //@formatter:off
  return security
  .csrf().disable()
  .httpBasic()
  .and()
  .authorizeExchange()
    .pathMatchers("/proxy").authenticated()
    .anyExchange().permitAll()
  .and()
  .build();
  //@formatter:on
  }

Spring Boot 2 将所有这些整合在一起,无论您选择使用 Spring WebFlux 还是 Spring MVC,构建 REST 端点、使用 Actuator、管理安全性以及所有其他功能都能“正常工作”。

从代码库变更的角度来看,这也意味着 Spring Cloud 团队需要应对许多不确定性,这正是这个版本如此重要的原因。

新版本将响应式编程无缝地融入到现有关注点中:服务注册、发现、安全性、CDC(T) 和测试、消息传递、微代理支持、断路器等等。让我们看一些示例。

您可以使用新的响应式 WebClient,并通过 Spring Cloud 的 DiscoveryClient 抽象支持的任何服务注册中心(Netflix Eureka、Hashicorp Consul、Apache Zookeeper、Cloud Foundry 等)来解析主机。


@Bean
WebClient client(LoadBalancerExchangeFilterFunction eff) {
  return WebClient.builder().filter(eff).build();
}

然后您可以使用那个响应式、感知服务注册中心的 WebClient。在下面的示例中,reservation-service 是在服务注册中心注册的服务,而不是实际的主机名。


Publisher<String> emails = client
	.get()
	.uri("http://reservation-service/reservations")
	.retrieve()
	.bodyToFlux(Reservation.class)
	.map(Reservation::getEmail);

您还可以使用 Spring Cloud Stream 中的响应式支持,分别消费 Kafka 或 RabbitMQ 中主题或队列的消息。


@Configuration  
@EnableBinding(Sink.class)
public class MyStreamListener {

  @StreamListener
  public void incoming (@Input(Sink.INPUT) Flux<String> names ) {
    names
     .map ( x-> new Reservation( null, x))
     .flatMap ( this.reservationRepository::save )
     .subscribe( x -> log.info( "saved " + x.toString()));
   }
 }

您可以使用 Hystrix 断路器和响应式 Publisher 来保护和隔离潜在的错误服务调用。在下面的示例中,我使用响应式 WebClient 进行了一个可能失败的 HTTP 调用。如果它失败了,我希望能够提供一个备用的 Publisher 返回。这就是会发生的事情。这几乎和不会发生的事情一样重要。我的代码不会抛出异常。它会优雅地降级。这个断路器很智能。它有状态。如果足够多的连续调用尝试失败,断路器最终会直接切换到备用的 Publisher。如果下游服务重新上线(如果您使用 Cloud Foundry 就会发生),那么它最终会重新向注册中心注册自己,注册中心会发送一个心跳事件,心跳事件将用于使本地对注册中心中服务的视图失效。客户端会看到注册中心有新的实例,并会将断路器重置为关闭状态,允许下一个调用通过,希望这次调用能成功。



Publisher<String> emails = client
  .get()
  .uri("http://reservation-service/reservations")
  .retrieve()
  .bodyToFlux(Reservation.class)
  .map(Reservation::getEmail);

Publisher<String> fallback = HystrixCommands
  .from( emails )
  .eager()
  .commandName("emails")
  .fallback ( Flux.just ("EEK!") )
  .build();

能够在响应式环境中利用这些现有技术固然很好,但最令人兴奋的是响应式编程开启了哪些新的可能性!Spring Cloud Gateway 和 Spring Cloud Function 这两个新项目都从中受益匪浅。

让我们稍微看看它们。

Spring Cloud Gateway 是我们全新的响应式 API 网关。它构建在 Spring 的响应式支持之上。毕竟,它的工作是将客户端请求导向到下游服务。这正是响应式编程的完美用例(和需求)。我也做了一个关于它的 Spring Tips 视频。

这里有一个使用 Spring Cloud Gateway 的示例,将来自 :9999/proxy 的请求代理到一个服务(通过服务注册中心解析和负载均衡)并进行速率限制。(注意:此配置可以存储在 Spring Cloud Config Server 中的(可刷新)配置中,或者任何可以创建 Flux<Route> 的来源。)

这个示例限制每个已认证用户每秒最多发起 100 个请求。使用网关不需要 Spring Security,但按照此配置则隐含了这一点。

@Bean
RouteLocator gateway (RouteLocatorBuilder rlb, RedisRateLimiter rrl) {
  return rlb
    .routes()
    .route( spec ->
      spec
       .path("/rl")
       .flters( fs -> fs
         .requestRateLimiter( c -> c.setRateLimiter( this.redisRateLimiter() ))
         .setPath("/reservations")
       )
       .uri("lb://reservation-service/")
    )
    .build();
}


@Bean // 100 reqs per second, burstable to 150
RedisRateLimiter redisRateLimiter (){
  return new RedisRateLimiter(100, 150);
}

Spring Cloud Function 是我们新的函数即服务抽象。它将普通的函数适配成不同函数即服务运行时所需的类型。它可以用在 AWS Lambda、Microsoft Azure 以及我们自己的 Project Riff 等许多平台上。Project Riff 是一个基于 Kubernetes 的 Apache 2 许可的多语言函数即服务运行时。我也做了一个关于 Spring Cloud Function 和 Project Riff 的 Spring Tips 视频。

使用它再简单不过了!您需要创建 java.util.function.Function<I,O> 实例。在这种情况下,IO 都可以是 Publisher<X>

package com.example.uppercase;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;

@SpringBootApplication
public class UppercaseApplication {

		@Bean
		Function<Flux<String>, Flux<String>> uppercase() {
				return incoming -> incoming.map(String::toUpperCase);
		}

		public static void main(String[] args) {
				SpringApplication.run(UppercaseApplication.class, args);
		}
}

正如您现在可能已经了解到的,响应式编程已经在 Spring 中扎根!Spring Cloud 是最后一个需要支持响应式编程才能进行全面讨论的主要项目。但这绝不是故事的结尾。事实上,我们才刚刚开始!敬请期待。 :-)

在即将举行的 SpringOne Platform 活动中,我们将讨论响应式编程和基于响应式 Spring Cloud 的微服务,以及许多其他话题。加入我们

订阅 Spring 时事通讯

通过 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助力您快速提升。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅即可提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区所有即将举行的活动。

查看全部