测试 Spring Cloud Stream 应用 - 第 1 部分

工程 | David Turanski | 2020 年 12 月 15 日 | ...

本文是探讨基于 Java Function 的全新设计的 Spring Cloud Stream 应用的系列博客的一部分。本期文章分为两部分,探讨用于实现流应用的 Function 的测试策略。我们将特别关注与外部资源集成的 Function,这会带来额外的测试挑战。大多数预打包的 Source 和 Sink 应用都属于这种情况。为了说明这一点,我们将讲解一个示例 couchbase-sink 应用。在第 1 部分中,我们将重点介绍 Sink 所基于的核心 Function。在第 2 部分中,我们将探讨如何为该应用编写测试。

以下是本博客系列中的所有先前文章。

测试考量

Function 和应用

对于基于 Function 的流应用,核心功能会暴露为一个 Function。预构建的 Spring Cloud Stream 应用的核心 Function 被打包为独立的组件,以便它们可以被任何应用使用,而无需依赖 Spring Cloud Stream。Spring Cloud Stream 原生支持 Java Function,并将绑定到任何实现了核心 java.util.function 类型之一的 Bean:ConsumerSupplierFunction。作为一个独立组件,该 Function 不需要依赖 Spring 或任何其他东西。如果你在任何包含 Spring Cloud Stream 绑定器(binder)依赖的应用中将任何 Function 注册为 Bean,Spring Cloud Stream 都会将其绑定到一个配置的消息目的地(message destination)。

在数据管道中,数据流起源于 Source 并流入 Sink,中间可能包含零个或多个处理步骤。实际上,Source 充当从某些外部资源(如数据存储、支持标准协议的任何服务或消息代理)提供数据的 Supplier。Sink 充当消费数据到其他外部资源的 Consumer。由于 Spring 为大多数常用外部资源提供了首屈一指的支持,因此预打包的 Source 和 Sink 大多数依赖于 Spring Integration、Spring Data 和 Spring Boot 的某种组合也就不足为奇了。此外,它们被设计为通过 @ConfigurationProperties 配置,以适应许多环境、领域和用例。虽然这些 Function 本身不是 Spring Boot 应用,但它们必须被导入到 Spring Boot 应用中才能运行。

由于所有核心功能都由 Function 实现,我们希望将大部分测试工作集中在此层面。为了确保我们的 Function 在所有预期的成功和错误条件下都能正常工作,我们需要编写测试来覆盖这些场景。这些测试需要创建一个自动配置的应用上下文,并准备或模拟所需的外部资源。

如果 Function 可以通过 @ConfigurationProperties 进行配置,那么我们可以将每种属性组合视为一个不同的测试用例。有些属性是必需的,有些是可选的。由于使用该 Function 需要最终用户提供这些属性,因此预期的场景包括有效和无效配置,例如缺少必需的属性、无效值或无效组合(互斥属性)。

单元测试 vs 集成测试

这里没有广泛接受的定义来帮助我们。特别是对于 Source 和 Sink 来说,其核心功能就是集成,因此很难区分单元测试和集成测试的界限。一方面,Java Function 是一个单元,因为它是一个单一的接口。然而,如果它的唯一目的是与远程系统集成,那么就很难(如果不是不可能)进行独立测试。不过,我认为我们可以就一些通用特征达成一致。

单元测试

  • 作为构建的一部分,在任何开发者或 CI 环境中自动运行,无需任何外部配置

  • 速度合理

  • 由开发者编写并频繁运行

集成测试

  • 在集成环境中自动运行

  • 需要部署被测组件以及外部依赖

  • 可能速度较慢

  • 运行频率较低

根据这个单元测试的定义,第 1 部分是关于 Function 的单元测试。

Testcontainers

Testcontainers 是一个近期流行起来的 Java 库,它允许你以编程方式启动和销毁任何可在 Docker 容器中运行的外部资源。它包含了数十个针对常用资源的开箱即用模块。你还可以使用该库通过编程、Dockerfile 或 docker-compose yaml 创建自定义容器。虽然它主要用于集成测试,但在模拟需要花费大量精力时,它对于编写单元测试也非常有用。当然,我们不得不牺牲一些速度,并放宽“无外部依赖”的规则,以允许主机上安装并运行 Docker 守护进程。考虑到许多开发和 CI 环境现在已经要求使用和构建镜像,这是一个合理的假设。

示例

Couchbase Consumer Function

为了说明问题,我们将编写一个 Couchbase consumer Function,使用 upsert 操作向 Couchbase 键值存储添加一些数据。

为了提高效率,我们将使用 Couchbase Java 客户端的响应式 API 来实现该 Function。这个 API 返回一个 MutationResult 的 Publisher,因此我们的核心接口是 Function<Flux<Message<?>>, Flux<MutationResult>>。这个 Function 将使用 Spring 进行配置,并且可以嵌入到任何 Spring Boot 应用中。为了支持 couchbase-sink,我们将把这个 Function 包装到一个 Consumer<Flux<Message<?>>> 中。

upsert 操作在 Bucket 中插入或更新数据,Bucket 是 Couchbase 主要的数据存储抽象。在我们的例子中,是一个 ReactiveBucket。Bucket 是按名称指定的,并且必须事先存在于 Couchbase 集群中。从 v6.5 开始,Couchbase 支持 Collections。因此 Bucket 可以被划分为多个 Collection,但这一个可选功能,必须在集群中启用。upsert 方法针对的是一个命名的 Collection 或 defaultCollection

我们将 key 和 value 通过 Spring Message 传递给 Function,Message 由 payload 和 headers 组成。payload 可以是任何对象,而 headers 本质上是一个 Map。为了使此 Function 具有通用性,我们可以使用 SpEL 表达式来指定 key。key 表达式会针对 Message 进行求值,可以引用 payload 中的字段或方法,或者 header 中的值。value 就是 payload。Function 还要求用户指定 bucket 和 collection 的名称。为了最大限度地提高灵活性,我们进一步使用 SpEL,将所有内容都变为表达式。现在,如果需要,该 Function 可以在运行时从 message 中提取所有输入值,以便在任何 bucket 的任何 collection 中 upsert 任何数据。在最简单的情况下,bucket 和 collection 可以静态定义。

因此 Function 需要一些配置属性

@ConfigurationProperties("couchbase.consumer")
@Validated
public class CouchbaseConsumerProperties {
    private static final String DEFAULT_VALUE_EXPRESSION = "payload";
    private final SpelExpressionParser parser = new SpelExpressionParser();

   /**
    * A SpEL expression to specify the bucket.
    */
    private Expression bucketExpression;

   /**
      * A SpEL expression to specify the key.
     */
    private Expression keyExpression;

  /**
    * A SpEL expression to specify the collection.
    */
    private Expression collectionExpression;

  /**
    * A SpEL expression to specify the value (default is payload).
    */
    private Expression valueExpression =
                parser.parseExpression(DEFAULT_VALUE_EXPRESSION);
    ...

提示

要静态配置其中一些值,请使用字面量表达式,将值用单引号括起来,例如 couchbase.consumer.bucketExpression='mybucket'。通常,你会从 message 内容中提取 key 和 value。

我们使用 Spring 配置响应式 Function 和相应的 Consumer

@Configuration
@EnableConfigurationProperties(CouchbaseConsumerProperties.class)
public class CouchbaseConsumerConfiguration {

    private static Logger logger =
            LoggerFactory.getLogger(CouchbaseConsumerConfiguration.class);

    @Bean
    public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
                Flux<MutationResult>> couchbaseConsumerFunction) {
        return message -> couchbaseConsumerFunction.apply(message)
               .subscribe(mutationResult -> logger.debug("Processed " + message));
    }

    @Bean
    public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
          Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
        return flux -> flux.flatMap(message -> {
            logger.debug("Processing message " + message);
             String bucketName = bucket(message,
                          consumerProperties.getBucketExpression());
            String key = key(message, consumerProperties.getKeyExpression());
            ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
             ReactiveCollection collection = collection(message,
                            consumerProperties.getCollectionExpression())
				  .map(name -> bucket.collection(name))
                                  .orElse(bucket.defaultCollection());
            return collection.upsert(key,
                              value(message, consumerProperties.getValueExpression()));
        });
    }

    private String bucket(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);
    }

    private String key(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);
    }

    private Object value(Message<?> message, Expression expression) {
        return expression.getValue(message);
    }

    private Optional<String> collection(Message<?> message,
                                             @Nullable Expression expression) {
        return expression == null ? Optional.empty() :
                Optional.of(expression.getValue(message, String.class));
    }
}

这两个类是实现 Function 所需的全部内容。所需的依赖项是

<dependency>
    <groupId>com.couchbase.client</groupId>
    <artifactId>java-client</artifactId>
</dependency>
<!-- Enable configuration properties metadata to be added to the jar -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->
<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>config-common</artifactId>
</dependency>

如前所述,这不是一个 Spring Boot 应用,而是一个必须嵌入到 Spring Boot 应用中才能运行的组件。Spring Boot 负责绑定 @ConfigurationPropeties,并提供 CouchbaseAutoConfiguration

注意

本例未使用 spring-data-couchbase,因为它旨在用于 Spring Data Repository 和自动映射特定领域对象。由于我们的 Function 旨在处理任何 payload 类型,我们使用 Boot 来自动配置 Cluster 以及 Couchbase Java SDK。

那么我们是如何得到一个真正可以工作的 Function 的呢?上面的示例代码是测试驱动开发的成果,经过多次迭代 refinement。由于 Function 依赖于执行所有工作的 Couchbase SDK Cluster 对象,因此在做任何事情之前,我们需要创建一个 Cluster 实例。Cluster 需要连接到 Couchbase 服务器。如果碰巧我们的网络上已经有一个正在运行的 Couchbase 集群,并且有一个可用于测试的 bucket,那么我们最初可能会使用它。但是,即使我们假设 Couchbase 可以从我们的开发和 CI 环境访问,如果由于某种原因无法连接到 Couchbase 会发生什么情况——集群宕机、凭证过期、权限更改或其他原因?我们想让这些问题破坏我们的 CI/CD 流水线或阻止我们的进展吗?

幸运的是,我们可以使用 Testcontainers 的 couchbase 模块来启动我们自己的 Couchbase 环境。

注意

完全披露:我也尝试过 CouchbaseMock,但它似乎与当前的 couchbase Java 客户端不兼容。

Junit 5 所需的测试库是

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>couchbase</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

为了在我们的 Junit 5 测试类中使用 Testcontainers,我们首先启动一个配置了名为 test 的 bucket 的 Couchbase 容器。

@Testcontainers
public class CouchbaseConsumerTests {

	@Container
	static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
			.withBucket(new BucketDefinition("test"));

@Testcontainers 注解为使用 @Container 注解的字段启用生命周期管理。这里,我们将 CouchbaseContainer 声明为 static,这样 TestContainers 将在测试运行前启动容器一次,并在测试后移除它。这是一件好事,因为启动容器需要几秒钟。

注意

还可以看看 Playtika Testcontainers for Spring Boot。这是一个有趣的项目,它使用 Spring Boot 抽象化“嵌入式”服务以自动配置 Testcontainer。这需要你选择的 org.springframework.cloud:spring-cloud-starter 版本。如果你使用的 Spring Cloud 版本与 Spring Boot 2.4+ 兼容,你需要设置 "spring.cloud.bootstrap.enabled=true"。示例未使用此库,因为 Spring bean 不能声明为 static,所以我们必须为每个测试启动一个新的容器实例。无论如何,Testcontainers 非常易于使用。

如上所述,不同的属性配置代表不同的测试用例。Spring Boot 在应用启动时会绑定属性源中的属性。因此,我们需要为我们想要测试的每种属性组合创建一个新的应用上下文。我们在 stream-applications 仓库中看到几种不同的策略:

  • 创建一个抽象的 @SpringBootTest 来配置 @SpringBootApplication 测试上下文和共享配置属性。为每个测试用例创建一个子类,用 @TestPropertySource 进行注解,如此处所示。

  • 使用 ApplicationContextRunner 为每个测试用例创建一个新的 ApplicationContext,如此处所示。

  • 使用 SpringApplicationBuilder 为每个测试用例创建一个新的 ApplicationContext,如此处所示。

你使用哪种方法很大程度上取决于个人选择。示例 Function 的测试使用了 ApplicationContextRunner,预先配置了测试容器提供的必需的 Boot Couchbase 连接属性。Testcontainers 的一个不错的功能是它按预期暴露标准端口,并将每个暴露的端口映射到随机的可用端口。Couchbase testContainer 包含 getConnectionString() 方法,它是 Couchbase 特有的。通常,你可以根据需要使用 container.getMappedPort(int originalPort)

提示

使用随机 TCP 端口对于自动化测试至关重要,因为 1) 你不知道给定环境中可能使用了哪些端口 2) 构建工具通常并行运行测试。当静态定义端口时,这经常会导致端口不可用错误。

@Testcontainers
public class CouchbaseConsumerTests {

    @Container
    static CouchbaseContainer container =
            new CouchbaseContainer("couchbase/server:6.6.0")
                   .withBucket(new BucketDefinition("test"));

	private ApplicationContextRunner applicationContextRunner;

    @BeforeEach
    void setup() {
        applicationContextRunner = new ApplicationContextRunner()
            .withUserConfiguration(TestConfig.class)
            .withPropertyValues(
                 "spring.couchbase.connection-string=" +
                                                container.getConnectionString(),
                 "spring.couchbase.username=" + container.getUsername(),
                 "spring.couchbase.password=" + container.getPassword());
    }

我们使用 TestConfig.class 启动应用上下文,我们将其作为内部类提供

@SpringBootApplication
static class TestConfig {
    @Autowired
    Cluster cluster;

   @PreDestroy
    public void destroy() {
        cluster.disconnect();
    }
}

在许多情况下,这可以是一个空类,只需使用 @SpringBootApplication 进行注解,以触发属性绑定和任何必需的自动配置——在本例中是 CouchbaseAutoConfiguration。在这里,我们断开与集群的连接,以防止在上下文关闭时出现多余的堆栈跟踪。

对于这些测试,我们将创建一个简单的 User 类型,包含 name 和 email 地址,我们可以使用 email 地址作为 key。

@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
	private String name;

	private String email;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getEmail() {
		return email;
	}

	public void setEmail(String email) {
		this.email = email;
	}

	public User() {
	}

	public User(String name, String email) {
		this.name = name;
		this.email = email;
	}
    ...

现在我们准备测试我们的 Function。由于 Function 是响应式的,我们将使用 reactor-test 库中的 StepVerifier 来验证返回的 Flux 的内容。我们从最简单的 happy path 场景开始:upsert 一个 User,提供最少必需的配置:Bucket 名称和 key 表达式。我们将构建一个包含 User payload 的 Message。要将 User 存储到 test Bucket 的默认 Collection 中,使用 User 的 email 作为 key,我们只需要将 Bucket 名称作为字面量提供,并将 key 表达式设置为 payload.email。这些属性需要使用在 CouchbaseConsumerProperties 中配置的 couchbase.consumer 前缀。至少,这是预期的行为。在我们能够验证调用 Function 后数据是否存在于数据存储中之前,我们无法确定所有这些是否都能正常工作。我们直接使用 Couchbase API 来检索数据,并断言其内容符合我们的预期。

@Test
void singleUpsert() {
   applicationContextRunner.withPropertyValues(
           "couchbase.consumer.bucketExpression='test'",
            "couchbase.consumer.keyExpression=payload.email")
      .run(context -> {
           CouchbaseConsumerProperties properties =
                    context.getBean(CouchbaseConsumerProperties.class);
           String bucketName = properties.getBucketExpression().getValue(String.class);
           Cluster cluster = context.getBean(Cluster.class);
           Function<Flux<Message<?>>, Flux<MutationResult>>
                 couchbaseConsumerFunction =
                       context.getBean("couchbaseConsumerFunction", Function.class);
           StepVerifier.create(couchbaseConsumerFunction.apply(
               Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
            .expectNextMatches(mutationResult ->
                   mutationResult.mutationToken().get().bucketName().equals(bucketName))
            .verifyComplete();

        User saved = cluster.bucket(bucketName).defaultCollection()
                                   .get("[email protected]").contentAs(User.class);
       assertThat(saved.getName()).isEqualTo("David");
  });
}

当我们在 IDE 中运行测试时,看到绿色的结果,我们欣喜若狂,Function 也按照之前所示的方式实现了。实际上,我们首先就需要这样的测试来编写 Function。这就是为什么我们在这个简单的测试中投入了大量的思考和努力。我们还想测试应用多个对象,并为 value 和 bucket 设置自定义表达式。我们可能还想检查属性类中的 Java 验证注解。

@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
    return keyExpression;
}

我忘了,注解是放在 getter 上还是 setter 上?我们真的需要 @Validated 类注解吗?让我们来查一下。如果我们忘记设置 couchbase.consumer.keyExpression,应该会在堆栈跟踪中的某个地方看到异常消息 'keyExpression is required'。如果没有,那我们就做错了。幸运的是,spring-boot-starter-test 提供了我们测试所需的一切,包括用于断言的流式 DSL Assertj、Mockito 和 Junit 5。

@Test
void keyExpressionRequired() {
  assertThatExceptionOfType(RuntimeException.class).isThrownBy(
   () -> applicationContextRunner.withPropertyValues(
      "couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
    .havingRootCause()
    .withMessageContaining("'keyExpression' is required");
}

等我们完成时,编写的代码行数将是实现 Function 所需的两倍多,耗时也可能超过两倍。但这些努力非常值得,因为它们证明了 Function 在常见场景下的行为符合预期,并且可以在重构或添加新功能时防止引入回归问题。完整的测试代码在此处。我很高兴地告诉大家,我的 IDE 报告测试覆盖率超过 90%。

结论

本文结束了测试主题的第 1 部分。在本文中,我们探讨了测试与外部资源(如 Couchbase)集成的 Function 的策略。我们还展示了 TestContainers 库在测试分布式系统组件方面的巨大作用,尤其是在使用 mock、stub 或嵌入式服务器不切实际的情况下。第 2 部分将涵盖基于 Function 的流应用的单元测试和集成测试。

敬请期待……​

感谢您的阅读!希望您觉得这篇文章对您有所帮助。在本系列结束前,我们还有几篇文章。

订阅 Spring 新闻邮件

订阅 Spring 新闻邮件,保持联系

订阅

取得进步

VMware 提供培训和认证,助您突飞猛进。

了解更多

获得支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部