迈向响应式 Spring Cloud

工程 | Josh Long | 2018年6月20日 | ...

新的Spring Cloud Finchley GA 版本 充满了优秀特性,代表着迈向响应式微服务的重大里程碑。我无法一一列举所有内容,因此建议您阅读 Spencer Gibb 撰写的最新发布公告。相反,在这篇文章中,我想重点关注我们走向响应式 Spring Cloud 的历程。

我们在2017年9月发布了Spring Framework 5。这是第一个引入新的响应式编程支持的版本,有助于构建更健壮、更可扩展的服务。它基于 Pivotal Reactor 项目,这是我们兼容 Reactive Streams 的响应式运行时。Spring Framework 5 还包含大量新特性,我也无法一一列举,这里只关注响应式支持。什么是响应式编程?它为什么重要?当您构建网络服务时,它就非常重要。

简而言之,Spring 中服务集成的基础已更新为完全拥抱响应式编程。那么,“响应式编程”是什么呢?响应式编程认识到,一旦您开始通过网络传输更多数据,用 API 调用饱和您的 IO 缓冲区,您就会在任何给定请求中花费更多时间进行 IO。

IO 本身并非问题。传统上,IO 会阻塞——线程必须等待InputStream产生新的byte。(通常在读取byte缓冲区的 while 循环read()中)。当线程等待时,它无法用于其他任何用途。线程很昂贵!

考虑一下用 Java 或任何其他采用相同线程方法的平台实现的传统服务器是如何工作的。如果您的 Web 服务器线程池中有 100 个线程,并且到达了 101 个请求,那么最后一个额外请求将不会被服务,直到其他请求完成处理。如果其他请求能够完成(从而释放它们独占的线程),然后第 101 个请求到达,那就太好了!可能不需要响应式编程。如果您能够比新请求到达的速度更快地释放线程,并且在这些线程中花费的时间主要用于输入/输出,那么就不需要响应式编程。

当您转向微服务、大数据和长期会话(例如在 WebSockets、服务器发送事件和任何其他长期存在的服务器端状态中)的世界时,您将开始通过网络传输更多数据。

线程与 IO 的这种耦合是不必要的。您的操作系统几十年来一直支持“后台”IO,并在您应该参与时通知您。事实上,Java 1.4(来自 2000 年代初)支持 NIO(通道),这为我们提供了这种异步 IO 机制。

在这个世界中,某些东西管理 IO,并在应该参与时回调您的代码。如果有任何延迟,该线程可以继续处理其他请求。它不会阻塞。您的代码不是从InputStream拉取字节,而是异步地将字节推送到它。您有效地反转了与数据源的交互。

许多项目,例如来自@NetflixOSS 的 RxJava、来自@Pivotal 的@ProjectReactor、来自 Eclipse 的@vertx_project 以及来自@lightbend 的@akkateam,都试图提供一个支持这种新的异步现实的编程模型。它们有共同点,并且从这个共同点诞生了 Reactive Streams 规范,这些项目都支持该规范。

Reactive Streams 规范支持Publisher类型,该类型将项目发布给订阅者。Subscribers在调用其onNextIT)方法时使用项目。当订阅者订阅时,它会得到一个Subscription,它可以使用该Subscription来指示它可以处理多少记录。最后一点——能够精确指定订阅者一次准备处理多少记录——是流控制Publisher不会压垮Subscriber。这提高了稳定性。在响应式编程的上下文中,流控制称为背压

还有一个最终接口Processor,它只是一个桥梁;它同时实现了PublisherSubscriber。Project Reactor 支持两种Publisher特化:Flux,它发出 0-N 个项目,以及Mono,它发出单个项目或无项目。

这是对 IO 发生方式的根本性重新思考,因此它需要在上面的每一层进行集成;在数据访问层、安全层、Boot 和微服务层。

Spring Framework 5 还包含一个全新的响应式 Web 运行时(甚至支持 Netty 项目),称为 Spring WebFlux。它甚至包括新的、功能性的响应式端点。早在 2016 年,我就为此制作了一个 Spring Tips 视频!

Spring WebFlux 基于 Reactive Streams 规范,因此可以与任何其他支持库互操作。这是一个 Spring Tips 视频,我在其中演示了将响应式 Spring Webflux 与 Lightbend 的 Akka Streams(和 Scala)一起使用。

新的 Spring WebFlux 组件模型首先是响应式和异步的。它以与传统处理同步情况相同的方式支持异步情况,例如 WebSockets 和服务器发送事件。最终您只有一种东西。想要在几纳秒内发送包含 10 条记录的简短 JSON 片段?使用Publisher!想要生成服务器发送事件?这是一个关于服务器发送事件的 Spring Tips 视频。

这是一个关于响应式 WebSockets 的 Spring Tips 视频。

新版本还包括一个新的响应式 HTTP 客户端,称为WebClient。我也为此制作了一个 Spring Tips 视频!

Spring Data Kay 通过模板和存储库支持响应式数据访问,用于具有异步 IO 支持的数据访问技术。这是一个使用响应式 Spring Data MongoDB 的示例。


interface ReservationRepository extends ReactiveMongoRepository<Reservation, String> {

		Flux<Reservation> findByEmail(String email);
}

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
class Reservation {
		@Id
		private String id;
		private String email;
}

Spring Security 5 支持传统用例(如下所示)和 OAuth 的响应式身份验证和授权。


  @Bean
  MapReactiveUserDetailsService authentication() {
    // don't do this! this is a hardcoded username and password and it
    // would literally pain Spring Security lead @rob_winch to see this!
    //
    return new MapReactiveUserDetailsService(
      User.withDefaultPasswordEncoder().username("user").password("pw").roles("USER").build());
  }

  @Bean
  SecurityWebFilterChain authorization(ServerHttpSecurity security) {
  //@formatter:off
  return security
  .csrf().disable()
  .httpBasic()
  .and()
  .authorizeExchange()
    .pathMatchers("/proxy").authenticated()
    .anyExchange().permitAll()
  .and()
  .build();
  //@formatter:on
  }

Spring Boot 2 将所有这些整合在一起,以便无论您选择使用 Spring WebFlux 还是 Spring MVC,“一切正常”,例如构建 REST 端点、使用 Actuator、管理安全以及其他所有内容。

从代码库更改的角度来看,这也意味着 Spring Cloud 团队需要应对许多不稳定因素,这使得这个版本如此具有里程碑意义。

新版本将响应式编程无缝地集成到现有问题中:服务注册、发现、安全、CDC(T) 和测试、消息传递、微代理支持、断路器等等。让我们来看一些例子。

您可以使用新的响应式WebClient,并使用 Spring Cloud 的DiscoveryClient抽象(Netflix Eureka、HashiCorp Consul、Apache Zookeeper、Cloud Foundry 等)支持的任何服务注册表来解析主机。


@Bean
WebClient client(LoadBalancerExchangeFilterFunction eff) {
  return WebClient.builder().filter(eff).build();
}

然后您可以使用该响应式、服务注册表感知的WebClient。在下面的示例中,reservation-service是在服务注册表中注册的服务,而不是实际的主机名。


Publisher<String> emails = client
	.get()
	.uri("http://reservation-service/reservations")
	.retrieve()
	.bodyToFlux(Reservation.class)
	.map(Reservation::getEmail);

您还可以分别使用 Spring Cloud Stream 中的响应式支持来使用 Kafka 或 RabbitMQ 中的主题或队列中的消息。


@Configuration  
@EnableBinding(Sink.class)
public class MyStreamListener {

  @StreamListener
  public void incoming (@Input(Sink.INPUT) Flux<String> names ) {
    names
     .map ( x-> new Reservation( null, x))
     .flatMap ( this.reservationRepository::save )
     .subscribe( x -> log.info( "saved " + x.toString()));
   }
 }

您可以使用带有响应式Publisher的 Hystrix 断路器来保护和隔离可能出错的服务调用。在下面的示例中,我使用响应式WebClient进行 HTTP 调用,该调用可能失败。如果失败,我想能够提供一个备用Publisher来返回。这就是将会发生的事情。它几乎与不会发生的事情一样重要。我的代码不会抛出异常。它会优雅地降级。该断路器具有智能。它具有状态。如果足够多的连续尝试进行该调用失败,断路器最终将直接切换到备用 Publisher。如果下游服务应该重新上线(如果您使用 Cloud Foundry,则会),那么它最终会重新向注册表注册自身,注册表将发送心跳事件,并且心跳事件将用于使注册表中服务的本地视图失效。客户端将看到注册表中有新的实例,它将断路器重置为关闭状态,并允许下一个调用通过,希望该调用会成功。



Publisher<String> emails = client
  .get()
  .uri("http://reservation-service/reservations")
  .retrieve()
  .bodyToFlux(Reservation.class)
  .map(Reservation::getEmail);

Publisher<String> fallback = HystrixCommands
  .from( emails )
  .eager()
  .commandName("emails")
  .fallback ( Flux.just ("EEK!") )
  .build();

能够在响应式上下文中使用这些现有技术固然很好,但最令人兴奋的是响应式编程打开了哪些新的可能性!两个新项目,Spring Cloud Gateway 和 Spring Cloud Function,都从中受益匪浅。

让我们简要地看一下它们。

Spring Cloud Gateway 是我们全新的响应式 API 网关。它建立在 Spring 的响应式支持之上。毕竟,它的工作是从客户端向下游服务发出请求。这是一个完美的用例(和需求)来使用响应式编程。我也为此制作了一个 Spring Tips 视频。

这是一个使用 Spring Cloud Gateway 将来自:9999/proxy的请求代理到服务(通过服务注册表解析和负载均衡)并进行速率限制的示例。(注意:此配置可以存在于(可刷新)Spring Cloud Config Server 中的配置中,或者您可以为其创建Flux<Route>的任何来源。)

此示例将每个已认证用户的每秒请求限制为 100 个。您不需要 Spring Security 即可使用网关,但在配置中它是隐含的。

@Bean
RouteLocator gateway (RouteLocatorBuilder rlb, RedisRateLimiter rrl) {
  return rlb
    .routes()
    .route( spec ->
      spec
       .path("/rl")
       .flters( fs -> fs
         .requestRateLimiter( c -> c.setRateLimiter( this.redisRateLimiter() ))
         .setPath("/reservations")
       )
       .uri("lb://reservation-service/")
    )
    .build();
}


@Bean // 100 reqs per second, burstable to 150
RedisRateLimiter redisRateLimiter (){
  return new RedisRateLimiter(100, 150);
}

Spring Cloud Function 是我们新的函数即服务抽象。它将普通的函数适配为不同函数即服务运行时所需的类型。它可以在许多其他平台上使用,包括AWS Lambda、Microsoft Azure,当然还有我们自己的Project Riff。Project Riff 是一个基于 Apache 2 许可证、基于 Kubernetes 的多语言函数即服务运行时。我还制作了一个 Spring Tips 视频,介绍了 Spring Cloud Function 和 Project Riff。

使用它非常简单!您需要创建java.util.function.Function<I,O>实例。在这种情况下,IO都可以是Publisher<X>!

package com.example.uppercase;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;

@SpringBootApplication
public class UppercaseApplication {

		@Bean
		Function<Flux<String>, Flux<String>> uppercase() {
				return incoming -> incoming.map(String::toUpperCase);
		}

		public static void main(String[] args) {
				SpringApplication.run(UppercaseApplication.class, args);
		}
}

正如您现在可能已经了解到的那样,响应式编程已经完全融入 Spring!Spring Cloud 是最后一个需要支持它才能对响应式编程进行全面讨论的主要项目。但这并非故事的结尾,我们才刚刚开始!敬请期待 :-)

在即将到来的 SpringOne Platform 活动中,我们将讨论响应式编程和基于 Spring Cloud 的响应式微服务等许多其他内容。加入我们

获取 Spring 新闻通讯

关注 Spring 新闻通讯

订阅

领先一步

VMware 提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部