领先一步
VMware 提供培训和认证,助您加速进步。
了解更多这篇博文是一个系列的一部分,该系列探讨了基于 Java 函数的全新重新设计的 Spring Cloud Stream 应用程序。本期分为两部分,探讨了用于实现流应用程序的函数测试策略。我们将特别关注与外部资源集成的函数,这带来了额外的测试挑战。大多数预打包的源和接收器应用程序都属于这种情况。为了说明这一点,我们将通过一个示例 couchbase-sink 应用程序进行演示。在第 1 部分中,我们将重点介绍接收器所基于的核心函数。在第 2 部分中,我们将讨论如何为应用程序编写测试。
以下是此博客系列的所有先前条目。
对于基于函数的流应用程序,核心功能以函数的形式暴露。预构建的 Spring Cloud Stream 应用程序的核心功能被打包为单独的组件,以便它们可以被任何应用程序使用,而与 Spring Cloud Stream 无关。Spring Cloud Stream 本地支持 Java 函数,并将绑定到实现核心 java.util.function 类型之一的任何 bean:Consumer、Supplier 或 Function。作为一个单独的组件,函数不需要依赖 Spring 或任何其他东西。如果您在任何包含 Spring Cloud Stream 绑定器作为依赖项的应用程序中将任何函数注册为 bean,Spring Cloud Stream 将其绑定到配置的消息目的地。
在数据管道中,数据流源自 Source 并流入 Sink,其间有零个或多个处理步骤。实际上,Source 充当来自某些外部资源(例如数据存储、支持标准协议的任何服务或消息代理)的数据 Supplier。Sink 充当用于其他外部数据的 Consumer。由于 Spring 为大多数常用外部资源提供一流支持,因此大多数预打包的 Source 和 Sink 都依赖于 Spring Integration、Spring Data 和 Spring Boot 的某种组合也就不足为奇了。此外,它们旨在通过 @ConfigurationProperties 配置用于许多环境、领域和用例。尽管这些函数本身不是 Spring Boot 应用程序,但它们必须导入到 Spring Boot 应用程序中才能运行。
由于所有核心功能都由函数实现,我们希望将大部分测试工作集中在这个层面。为了确保我们的函数在所有预期成功和错误条件下都能正常运行,我们需要编写测试来涵盖这些场景。这些测试需要创建自动配置的应用程序上下文并提供或模拟所需的外部资源。
如果该函数可以通过 @ConfigurationProperties 配置,那么我们可以将每个属性组合视为一个不同的测试用例。有些属性是必需的,有些是可选的。由于使用该函数需要最终用户提供这些属性,因此预期场景包括有效和无效配置,例如缺少必需属性、无效值或无效组合(互斥属性)。
这里没有广泛接受的定义可以帮助我们。尤其是在源和接收器中,其核心功能*就是*集成,很难知道在单元测试和集成测试之间划清界限。一方面,Java 函数是一个*单元*,因为它是一个单一的接口。但是,如果其唯一目的是与远程系统集成,那么独立测试它就很难,甚至不可能。但是,我认为我们可以在一些一般特征上达成一致
单元测试
作为任何开发人员或 CI 环境中构建的一部分自动运行,无需任何外部配置
速度合理
由开发人员编写并频繁运行
集成测试
在集成环境中自动运行
需要部署被测试的组件以及外部依赖项
可能很慢
运行频率较低
根据单元测试的这个定义,第 1 部分是关于单元测试函数的。
Testcontainers 是一个近期流行且流行的 Java 库,它允许您以编程方式启动和销毁任何可以在 Docker 容器中运行的外部资源。它包含数十个用于常用资源的开箱即用模块。您还可以使用该库通过 Dockerfiles 或 docker-compose yaml 以编程方式创建自定义容器。虽然主要用于集成测试,但在模拟需要大量精力时,它对于编写单元测试非常有用。当然,我们必须牺牲一些速度并放宽“无外部依赖”规则,以允许在主机上安装和运行 Docker 守护程序。由于当今许多开发和 CI 环境已经需要使用和构建镜像,这是一个合理的假设。
为了说明,我们将编写一个 Couchbase 消费者函数,使用 *upsert* 操作向 Couchbase 键值存储添加一些数据。
为了提高效率,我们将使用 Couchbase Java 客户端的响应式 API 实现该函数。此 API 返回 MutationResult 的发布者,因此我们的核心接口是 Function<Flux<Message<?>>, Flux<MutationResult>>。此函数将使用 Spring 进行配置,并且可以嵌入到任何 Spring Boot 应用程序中。为了支持 couchbase-sink,我们将函数包装在 Consumer<Flux<Message<?>>> 中。
upsert 操作在 Bucket 中插入或更新数据,Bucket 是 Couchbase 的主要数据存储抽象。在我们的案例中,是一个 ReactiveBucket。Bucket 通过名称指定,并且必须事先存在于 Couchbase 集群中。从 v6.5 开始,Couchbase 支持 Collections。因此,Bucket 可以分区为许多集合,但这是一个可选功能,必须在集群中启用。upsert 方法针对命名集合或 defaultCollection。
我们将键和值通过 Spring Message 传递给我们的函数,它由一个有效载荷和头部组成。有效载荷可以是任何对象,头部本质上是一个 Map。为了使此函数通用,我们可以使用 SpEL 表达式来指定键。键表达式针对 Message 进行评估,可以引用有效载荷中的字段或方法,或者引用一个头部。值是有效载荷。该函数还要求用户指定一个桶和集合名称。为了最大限度地提高灵活性,让我们在 SpEL 上加倍努力,将所有内容都设为表达式。现在,如果需要,该函数可以在运行时从消息中提取所有输入值,以在任何桶中的任何集合中 upsert 任何数据。在最简单的情况下,桶和集合可以静态定义。
所以函数需要一些配置属性
@ConfigurationProperties("couchbase.consumer")
@Validated
public class CouchbaseConsumerProperties {
private static final String DEFAULT_VALUE_EXPRESSION = "payload";
private final SpelExpressionParser parser = new SpelExpressionParser();
/**
* A SpEL expression to specify the bucket.
*/
private Expression bucketExpression;
/**
* A SpEL expression to specify the key.
*/
private Expression keyExpression;
/**
* A SpEL expression to specify the collection.
*/
private Expression collectionExpression;
/**
* A SpEL expression to specify the value (default is payload).
*/
private Expression valueExpression =
parser.parseExpression(DEFAULT_VALUE_EXPRESSION);
...
提示
要静态配置这些值中的一些,请使用文字表达式,将值用单引号括起来,例如 couchbase.consumer.bucketExpression='mybucket'。通常,您会从消息内容中提取键和值。
我们用 Spring 配置响应式函数和相应的消费者
@Configuration
@EnableConfigurationProperties(CouchbaseConsumerProperties.class)
public class CouchbaseConsumerConfiguration {
private static Logger logger =
LoggerFactory.getLogger(CouchbaseConsumerConfiguration.class);
@Bean
public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
Flux<MutationResult>> couchbaseConsumerFunction) {
return message -> couchbaseConsumerFunction.apply(message)
.subscribe(mutationResult -> logger.debug("Processed " + message));
}
@Bean
public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
return flux -> flux.flatMap(message -> {
logger.debug("Processing message " + message);
String bucketName = bucket(message,
consumerProperties.getBucketExpression());
String key = key(message, consumerProperties.getKeyExpression());
ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
ReactiveCollection collection = collection(message,
consumerProperties.getCollectionExpression())
.map(name -> bucket.collection(name))
.orElse(bucket.defaultCollection());
return collection.upsert(key,
value(message, consumerProperties.getValueExpression()));
});
}
private String bucket(Message<?> message, Expression expression) {
return expression.getValue(message, String.class);
}
private String key(Message<?> message, Expression expression) {
return expression.getValue(message, String.class);
}
private Object value(Message<?> message, Expression expression) {
return expression.getValue(message);
}
private Optional<String> collection(Message<?> message,
@Nullable Expression expression) {
return expression == null ? Optional.empty() :
Optional.of(expression.getValue(message, String.class));
}
}
这两个类就是我们实现该函数所需的全部。所需的依赖项是
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
</dependency>
<!-- Enable configuration properties metadata to be added to the jar -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>config-common</artifactId>
</dependency>
如前所述,这不是一个 Spring Boot 应用程序,而是一个必须嵌入到 Spring Boot 应用程序中才能运行的组件。Spring Boot 绑定了 @ConfigurationPropeties,还提供了 CouchbaseAutoConfiguration。
注意
本例不使用 spring-data-couchbase,因为它旨在用于使用 Spring Data 存储库和自动映射特定域对象。由于我们的函数旨在处理任何有效载荷类型,我们使用 boot 来自动配置 Cluster 以及 Couchbase Java SDK。
那么我们是如何得到一个真正有效的函数的呢?上面给出的示例代码是测试驱动开发的结果,经过多次迭代改进。由于该函数依赖于执行所有工作的 Couchbase SDK Cluster 对象,因此我们需要在执行任何操作之前创建一个 Cluster 实例。Cluster 需要连接到 Couchbase 服务器。如果我们的网络上已经有一个 Couchbase 集群正在运行,并且有一个我们可以用于测试的 Bucket,那么我们最初可能会使用它。但是,即使我们假设 Couchbase 可以从我们的开发和 CI 环境中访问,如果由于某种原因我们无法连接到 Couchbase(集群已关闭、凭据过期、权限更改或其他原因)会发生什么?我们是否希望这会破坏我们的 CI/CD 管道或阻止我们的进展?
幸运的是,我们可以使用 Testcontainers couchbase 模块来启动我们自己的 Couchbase 环境。
注意
完全披露:我也尝试了CouchbaseMock,但它似乎与当前的couchbase Java 客户端不兼容。
Junit 5 所需的测试库是
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>couchbase</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
要在我们的 Junit 5 测试类中使用 Testcontainers,我们首先使用配置了名为 test 的桶的 Couchbase 容器。
@Testcontainers
public class CouchbaseConsumerTests {
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
@Testcontainers 注解为带有 @Container 注解的字段启用生命周期管理。在这里,我们将 CouchbaseContainer 声明为 static,因此 TestContainers 将在测试运行前启动一次容器,并在测试后将其移除。这是一件好事,因为启动容器需要几秒钟。
注意
此外,请查看 Playtika Testcontainers for Spring Boot。这是一个有趣的项目,它使用 Spring Boot 抽象“嵌入式”服务以自动配置 Testcontainer。这需要您首选的 org.springframework.cloud:spring-cloud-starter 版本。如果您使用的 Spring Cloud 版本与 Spring Boot 2.4+ 兼容,则需要设置 "spring.cloud.bootstrap.enabled=true"。示例未使用此库,因为 Spring bean 无法声明为 static,因此我们必须为每个测试启动一个新的容器实例。无论如何,Testcontainers 非常易于使用。
如上所述,不同的属性配置代表不同的测试用例。Spring Boot 在应用程序启动时从其属性源绑定属性。因此,我们需要为要测试的每种属性组合创建一个新的应用程序上下文。我们在 stream-applications 仓库中看到了几种不同的策略
创建一个抽象的 @SpringBootTest 以配置 @SpringBootApplication 测试上下文和共享配置属性。为每个测试用例创建一个用 @TestPropertySource 注解的子类,如此处所示。
使用 ApplicationContextRunner 为每个测试用例创建一个新的 ApplicationContext,如此处所示。
使用 SpringApplicationBuilder 为每个测试用例创建一个新的 ApplicationContext,如此处所示。
您使用哪一个主要取决于个人选择。示例函数的测试使用 ApplicationContextRunner,它预先配置了测试容器提供的所需引导 Couchbase 连接属性。Testcontainers 的一个优点是它按预期暴露标准端口,将每个暴露的端口映射到随机可用端口。Couchbase testContainer 包含 getConnectionString(),这是 Couchbase 特有的。通常,您可以根据需要使用 container.getMappedPort(int originalPort)。
提示
使用随机 TCP 端口对于自动化测试至关重要,因为 1) 您不知道给定环境中可能正在使用哪些端口 2) 构建工具通常并行运行测试。这通常会导致由于静态定义而导致的端口不可用错误。
@Testcontainers
public class CouchbaseConsumerTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
private ApplicationContextRunner applicationContextRunner;
@BeforeEach
void setup() {
applicationContextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestConfig.class)
.withPropertyValues(
"spring.couchbase.connection-string=" +
container.getConnectionString(),
"spring.couchbase.username=" + container.getUsername(),
"spring.couchbase.password=" + container.getPassword());
}
我们使用 TestConfig.class 启动应用程序上下文,我们将其作为内部类提供
@SpringBootApplication
static class TestConfig {
@Autowired
Cluster cluster;
@PreDestroy
public void destroy() {
cluster.disconnect();
}
}
在许多情况下,这可以是一个带有 @SpringBootApplication 注解的空类,以触发属性绑定和任何所需的自动配置——本例中是 CouchbaseAutoConfiguration。在这里,我们断开与集群的连接,以防止在上下文关闭时出现多余的堆栈跟踪。
对于这些测试,我们将创建一个简单的 User 类型,包含姓名和电子邮件地址,我们可以将其用作键
@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
private String name;
private String email;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public User() {
}
public User(String name, String email) {
this.name = name;
this.email = email;
}
...
现在我们准备测试我们的函数了。由于该函数是响应式的,我们将使用 reactor-test 库中的 StepVerifier 来验证返回的 Flux 的内容。我们从最简单的成功路径场景开始:upsert 一个单独的用户,提供最少所需的配置:桶名称和键表达式。我们将构建一个带有 User 有效载荷的 Message。要将用户存储到 test 桶的默认集合中,并使用用户的电子邮件作为键,我们只需提供桶名称作为文字并将键表达式设置为 payload.email。这些属性需要使用 CouchbaseConsumerProperties 中配置的 couchbase.consumer 前缀。至少,这是预期的行为。我们无法确定所有这些是否有效,直到我们能够验证在调用函数后,数据存在于数据存储中。我们直接使用 Couchbase API 检索数据并断言内容是我们预期的。
@Test
void singleUpsert() {
applicationContextRunner.withPropertyValues(
"couchbase.consumer.bucketExpression='test'",
"couchbase.consumer.keyExpression=payload.email")
.run(context -> {
CouchbaseConsumerProperties properties =
context.getBean(CouchbaseConsumerProperties.class);
String bucketName = properties.getBucketExpression().getValue(String.class);
Cluster cluster = context.getBean(Cluster.class);
Function<Flux<Message<?>>, Flux<MutationResult>>
couchbaseConsumerFunction =
context.getBean("couchbaseConsumerFunction", Function.class);
StepVerifier.create(couchbaseConsumerFunction.apply(
Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
.expectNextMatches(mutationResult ->
mutationResult.mutationToken().get().bucketName().equals(bucketName))
.verifyComplete();
User saved = cluster.bucket(bucketName).defaultCollection()
.get("[email protected]").contentAs(User.class);
assertThat(saved.getName()).isEqualTo("David");
});
}
有了之前展示的函数实现,当我们在 IDE 中运行测试时,我们高兴地看到绿色。实际上,我们首先需要一个这样的测试来编写函数。这就是为什么我们对这个简单的测试投入了大量思考和精力。我们还想测试应用多个对象,并为值和桶设置自定义表达式。我们可能还想检查属性类中的 Java 验证注解。
@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
return keyExpression;
}
我忘了,注解是放在 getter 还是 setter 上?我们真的需要 @Validated 类注解吗?让我们找出答案。如果忘记设置 couchbase.consumer.keyExpression,我们应该在堆栈跟踪的某个地方得到一个异常消息 'keyExpression is required'。如果没有,那么我们做错了。幸运的是,spring-boot-starter-test 为我们提供了测试所需的一切,包括 Assertj(一个用于断言的流式 DSL)、Mockito 和 Junit 5。
@Test
void keyExpressionRequired() {
assertThatExceptionOfType(RuntimeException.class).isThrownBy(
() -> applicationContextRunner.withPropertyValues(
"couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
.havingRootCause()
.withMessageContaining("'keyExpression' is required");
}
到我们完成时,我们将编写的行数是实现函数所需行数的两倍以上,并且可能花费的时间也超过两倍。但是,这项工作非常值得,因为它为我们提供了函数在常见场景下按预期运行的证据,并在重构或添加新功能时提供了防止引入回归的保护。完整的测试请参阅此处。我很高兴地说,我的 IDE 报告的覆盖率超过 90%。
测试主题的第 1 部分到此结束。在这篇文章中,我们探讨了测试与外部资源(例如 Couchbase)集成的函数的策略。我们还展示了 TestContainers 库对于测试分布式系统组件的实用性,尤其是在模拟、存根或嵌入式服务器不切实际时。第 2 部分将涵盖基于函数的流应用程序的单元测试和集成测试。
感谢您的光临!我们希望您觉得此内容有所帮助。在本系列结束之前,我们还有几篇文章。