领先一步
VMware 提供培训和认证,助您快速提升技能。
了解更多这篇文章是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用。本篇分为两部分,探讨了用于测试用于实现流应用的函数的策略。我们将特别关注与外部资源集成的函数,这带来了额外的测试挑战。大多数预打包的源和接收器应用程序都是这种情况。为了说明这一点,我们将逐步介绍一个示例couchbase-sink应用程序。在第 1 部分中,我们将重点关注接收器所基于的核心函数。在第 2 部分中,我们将研究如何为应用程序编写测试。
以下是本博客系列中所有之前的文章。
对于基于函数的流应用程序,核心功能作为函数公开。预构建的Spring Cloud Stream应用程序的核心函数被打包为单独的组件,以便任何应用程序都可以使用它们,而独立于 Spring Cloud Stream。Spring Cloud Stream 原生支持 Java 函数,并将绑定到实现核心java.util.function
类型之一的任何 bean:Consumer
、Supplier
或Function
。如果将其视为单独的组件,则该函数无需依赖于 Spring 或任何其他内容。如果将任何函数作为 bean 注册到包含 Spring Cloud Stream 绑定器作为依赖项的任何应用程序中,Spring Cloud Stream 将将其绑定到配置的消息目标。
在数据管道中,数据流从源头发出,流入接收器,中间有零个或多个处理步骤。实际上,源充当来自某些外部资源(例如数据存储、任何支持标准协议的服务或消息代理)的数据提供者。接收器充当某些其他外部资源的数据使用者。由于 Spring 为大多数常用的外部资源提供了一流的支持,因此大多数预打包的源和接收器依赖于 Spring Integration、Spring Data 和 Spring Boot 的某种组合也就不足为奇了。此外,它们旨在通过@ConfigurationProperties
为许多环境、域和用例进行配置。尽管这些函数本身不是 Spring Boot 应用程序,但必须将它们导入 Spring Boot 应用程序才能运行。
由于所有核心功能都是由函数实现的,因此我们希望将大部分测试工作集中在这个级别。为了确保我们的函数在所有预期的成功和错误条件下都能正确运行,我们需要编写测试来涵盖这些场景。这些测试需要创建一个自动配置的应用程序上下文并配置或模拟所需的外部资源。
如果函数可以通过@ConfigurationProperties
进行配置,那么我们可以将每个属性组合视为不同的测试用例。有些属性是必需的,有些是可选的。由于使用该函数需要最终用户提供这些属性,因此预期的场景包括有效和无效的配置,例如缺少必需的属性、无效的值或无效的组合(相互排斥的属性)。
没有任何公认的定义可以帮助我们。特别是对于源和接收器,其中核心功能 *是* 集成,很难知道单元测试和集成测试之间的界限在哪里。一方面,Java 函数是一个 *单元*,因为它是一个单一接口。但是,如果其唯一目的是与远程系统集成,则很难(如果不是不可能)隔离测试。但是,我认为我们可以就一些一般特征达成一致。
单元测试
在任何开发人员或 CI 环境中作为构建的一部分自动运行,无需任何外部配置
速度相当快
由开发人员编写并频繁运行
集成测试
在集成环境中自动运行
需要被测组件以及外部依赖项进行部署
可能很慢
运行频率较低
鉴于此单元测试定义,第 1 部分是关于单元测试函数。
Testcontainers 是一个最近流行的 Java 库,它允许您以编程方式启动和丢弃任何可以在 Docker 容器中运行的外部资源。它包含数十个针对常用资源的开箱即用模块。您还可以使用该库以编程方式从 Dockerfiles 或 docker-compose yaml 创建自定义容器。虽然主要用于集成测试,但在模拟需要更多工作时,它对于编写单元测试也极其有用。当然,我们必须牺牲一些速度并放宽“没有外部依赖项”规则,以允许在主机上安装并运行 Docker 守护程序。由于当今许多开发和 CI 环境都需要使用和构建镜像,因此这是一个合理的假设。
为了说明,我们将编写一个 Couchbase 消费者函数,使用 *upsert* 操作将一些数据添加到 Couchbase 键值存储中。
为了提高效率,我们将使用 Couchbase Java 客户端的响应式 API 实现该函数。此 API 返回MutationResult 的发布者,因此我们的核心接口是Function<Flux<Message<?>>, Flux<MutationResult>>
。此函数将使用 Spring 进行配置,并可以嵌入到任何 Spring Boot 应用程序中。为了支持couchbase-sink
,我们将该函数包装在Consumer<Flux<Message<?>>>
中。
upsert
操作在Bucket(这是 Couchbase 的主要数据存储抽象)中插入或更新数据。在我们的例子中,是ReactiveBucket
。桶由名称指定,必须事先存在于 Couchbase 集群中。从 v6.5 开始,Couchbase 支持集合。因此,桶可以被划分为多个集合,但这是一项必须在集群中启用的可选功能。upsert
方法针对命名集合或 *defaultCollection*。
我们将键和值通过Spring消息传递给我们的函数,该消息包含有效负载和标头。有效负载可以是任何对象,标头本质上是一个Map。为了使此函数泛型,我们可以使用SpEL表达式来指定键。键表达式针对消息进行评估,并可能引用有效负载中的字段或方法,或标头。值是有效负载。该函数还要求用户指定桶和集合名称。为了最大限度地提高灵活性,让我们加倍使用SpEL,并将所有内容都设置为表达式。现在,如果我们想,函数可以在运行时从消息中提取所有输入值,以更新任何桶中任何集合的任何数据。在最简单的情况下,桶和集合可以静态定义。
因此,该函数需要一些配置属性。
@ConfigurationProperties("couchbase.consumer")
@Validated
public class CouchbaseConsumerProperties {
private static final String DEFAULT_VALUE_EXPRESSION = "payload";
private final SpelExpressionParser parser = new SpelExpressionParser();
/**
* A SpEL expression to specify the bucket.
*/
private Expression bucketExpression;
/**
* A SpEL expression to specify the key.
*/
private Expression keyExpression;
/**
* A SpEL expression to specify the collection.
*/
private Expression collectionExpression;
/**
* A SpEL expression to specify the value (default is payload).
*/
private Expression valueExpression =
parser.parseExpression(DEFAULT_VALUE_EXPRESSION);
...
提示
要静态配置其中一些值,请使用文字表达式,将值括在单引号中,例如couchbase.consumer.bucketExpression='mybucket'
。通常,您会从消息内容中提取键和值。
我们使用Spring配置响应式函数和相应的消费者。
@Configuration
@EnableConfigurationProperties(CouchbaseConsumerProperties.class)
public class CouchbaseConsumerConfiguration {
private static Logger logger =
LoggerFactory.getLogger(CouchbaseConsumerConfiguration.class);
@Bean
public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
Flux<MutationResult>> couchbaseConsumerFunction) {
return message -> couchbaseConsumerFunction.apply(message)
.subscribe(mutationResult -> logger.debug("Processed " + message));
}
@Bean
public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
return flux -> flux.flatMap(message -> {
logger.debug("Processing message " + message);
String bucketName = bucket(message,
consumerProperties.getBucketExpression());
String key = key(message, consumerProperties.getKeyExpression());
ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
ReactiveCollection collection = collection(message,
consumerProperties.getCollectionExpression())
.map(name -> bucket.collection(name))
.orElse(bucket.defaultCollection());
return collection.upsert(key,
value(message, consumerProperties.getValueExpression()));
});
}
private String bucket(Message<?> message, Expression expression) {
return expression.getValue(message, String.class);
}
private String key(Message<?> message, Expression expression) {
return expression.getValue(message, String.class);
}
private Object value(Message<?> message, Expression expression) {
return expression.getValue(message);
}
private Optional<String> collection(Message<?> message,
@Nullable Expression expression) {
return expression == null ? Optional.empty() :
Optional.of(expression.getValue(message, String.class));
}
}
这两个类是我们实现该函数所需的一切。所需的依赖项是:
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
</dependency>
<!-- Enable configuration properties metadata to be added to the jar -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>config-common</artifactId>
</dependency>
如前所述,这不是一个Spring Boot应用程序,而是一个必须嵌入Spring Boot应用程序才能运行的组件。Spring Boot绑定@ConfigurationPropeties
,并提供CouchbaseAutoConfiguration。
注意
此示例不使用spring-data-couchbase,因为它旨在用于Spring Data存储库并自动映射特定的域对象。由于我们的函数旨在处理任何有效负载类型,因此我们使用boot自动配置集群以及Couchbase Java SDK。
那么我们是如何最终得到一个真正有效的函数的呢?上面的示例代码是测试驱动开发的结果,经过多次迭代的改进。由于该函数依赖于Couchbase SDK Cluster
对象(它完成所有工作),因此我们需要在执行任何操作之前创建一个Cluster实例。集群需要连接到Couchbase服务器。如果我们的网络上碰巧已经运行了一个Couchbase集群,并且我们有一个可用于测试的桶,那么我们最初可能会使用它。但是,即使我们假设可以从我们的开发和CI环境访问Couchbase,如果由于某种原因无法连接到Couchbase会发生什么情况——集群宕机、凭据过期、权限更改或其他原因?我们是否希望让这破坏我们的CI/CD管道或阻止我们的进度?
幸运的是,我们可以使用Testcontainers的couchbase模块来启动我们自己的Couchbase环境。
注意
完全披露:我也尝试过CouchbaseMock,但它似乎与当前的couchbase Java客户端不兼容。
Junit 5所需的测试库是:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>couchbase</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
要在我们的Junit 5测试类中使用Testcontainers,我们首先启动一个配置了名为test
的桶的Couchbase容器。
@Testcontainers
public class CouchbaseConsumerTests {
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
@Testcontainers
注解启用使用@Container
注解的字段的生命周期管理。在这里,我们将CouchbaseContainer
声明为static
,因此TestContainers将在测试运行之前启动容器,并在之后将其移除。这是一件好事,因为启动容器需要几秒钟。
注意
还可以查看Playtika Testcontainers for Spring Boot。这是一个有趣的项目,它使用Spring Boot抽象“嵌入式”服务来自动配置Testcontainer。这需要您首选版本的org.springframework.cloud:spring-cloud-starter
。如果您使用的是与Spring Boot 2.4+兼容的Spring Cloud版本,则需要设置"spring.cloud.bootstrap.enabled=true"
。示例未使用此库,因为Spring bean不能声明为static
,因此我们必须为每个测试启动一个新的容器实例。无论如何,Testcontainers非常易于使用。
如上所述,不同的属性配置代表不同的测试用例。Spring Boot在应用程序启动时从其属性源绑定属性。因此,我们需要为我们要测试的每个属性组合创建一个新的应用程序上下文。我们在stream-applications存储库中看到了一些不同的策略。
创建一个抽象的@SpringBootTest
来配置@SpringBootApplication
测试上下文和共享配置属性。为每个测试用例创建一个使用@TestPropertySource
注解的子类,如此处所示。
使用ApplicationContextRunner为每个测试用例创建一个新的ApplicationContext
,如此处所示。
使用SpringApplicationBuilder为每个测试用例创建一个新的ApplicationContext
,如此处所示。
使用哪一个很大程度上取决于个人选择。示例函数的测试使用ApplicationContextRunner
,预先配置了测试容器提供的必需的boot Couchbase连接属性。Testcontainers的一个不错的功能是它按预期公开标准端口,将每个公开的端口映射到一个随机的可用端口。Couchbase testContainer包含getConnectionString()
,它是Couchbase特有的。通常,您可以根据需要使用container.getMappedPort(int originalPort)
。
提示
使用随机TCP端口对于自动化测试至关重要,因为1) 您不知道给定环境中可能使用了哪些端口;2) 构建工具通常并行运行测试。当静态定义时,这经常会导致端口不可用而导致错误。
@Testcontainers
public class CouchbaseConsumerTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
private ApplicationContextRunner applicationContextRunner;
@BeforeEach
void setup() {
applicationContextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestConfig.class)
.withPropertyValues(
"spring.couchbase.connection-string=" +
container.getConnectionString(),
"spring.couchbase.username=" + container.getUsername(),
"spring.couchbase.password=" + container.getPassword());
}
我们使用TestConfig.class
启动一个应用程序上下文,我们将其作为一个内部类提供。
@SpringBootApplication
static class TestConfig {
@Autowired
Cluster cluster;
@PreDestroy
public void destroy() {
cluster.disconnect();
}
}
在许多情况下,这可以是一个使用@SpringBootApplication
注解的空类,以触发属性绑定和任何所需的自动配置——在本例中为CouchbaseAutoConfiguration
。在这里,我们断开与集群的连接,以防止上下文关闭时出现多余的堆栈跟踪。
对于这些测试,我们将创建一个简单的User
类型,其中包含名称和电子邮件地址,我们可以将其用作键。
@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
private String name;
private String email;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public User() {
}
public User(String name, String email) {
this.name = name;
this.email = email;
}
...
现在我们可以测试我们的函数了。由于该函数是响应式的,我们将使用reactor-test
库中的StepVerifier
来验证返回的Flux的内容。我们从最简单的成功路径场景开始:提供最小所需配置(桶名称和键表达式)来更新单个用户。我们将构造一个包含User
有效负载的Message
。要使用用户的电子邮件作为键将用户存储到test
桶的默认集合中,我们只需要提供桶名称作为文字,并将键表达式设置为payload.email
。这些属性需要使用在CouchbaseConsumerProperties
中配置的couchbase.consumer
前缀。至少,这是预期的行为。在我们能够验证调用该函数后数据是否存在于数据存储中之前,我们无法确定所有这些是否有效。我们直接使用Couchbase API检索数据并断言内容符合我们的预期。
@Test
void singleUpsert() {
applicationContextRunner.withPropertyValues(
"couchbase.consumer.bucketExpression='test'",
"couchbase.consumer.keyExpression=payload.email")
.run(context -> {
CouchbaseConsumerProperties properties =
context.getBean(CouchbaseConsumerProperties.class);
String bucketName = properties.getBucketExpression().getValue(String.class);
Cluster cluster = context.getBean(Cluster.class);
Function<Flux<Message<?>>, Flux<MutationResult>>
couchbaseConsumerFunction =
context.getBean("couchbaseConsumerFunction", Function.class);
StepVerifier.create(couchbaseConsumerFunction.apply(
Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
.expectNextMatches(mutationResult ->
mutationResult.mutationToken().get().bucketName().equals(bucketName))
.verifyComplete();
User saved = cluster.bucket(bucketName).defaultCollection()
.get("[email protected]").contentAs(User.class);
assertThat(saved.getName()).isEqualTo("David");
});
}
在按照前面显示的方式实现该函数后,当我们在IDE中运行测试时,我们欣喜地看到测试通过了。实际上,我们需要这样的测试来编写该函数。这就是为什么我们将大量的时间和精力投入到这个简单的测试中的原因。我们还希望测试应用多个对象,以及设置值的自定义表达式和桶。我们可能还想检查属性类中的Java验证注解。
@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
return keyExpression;
}
我忘了,注解是放在getter上还是setter上?我们真的需要@Validated
类注解吗?让我们来看看。如果我们忘记设置couchbase.consumer.keyExpression
,我们应该在堆栈跟踪中的某个位置得到一个异常消息'keyExpression is required'
。如果没有,那么我们做错了什么。幸运的是,spring-boot-starter-test
为我们提供了测试所需的一切,包括Assertj(一种流畅的DSL断言)、Mockito和Junit 5。
@Test
void keyExpressionRequired() {
assertThatExceptionOfType(RuntimeException.class).isThrownBy(
() -> applicationContextRunner.withPropertyValues(
"couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
.havingRootCause()
.withMessageContaining("'keyExpression' is required");
}
到我们完成时,我们将编写实现该函数所需的两倍以上的代码行数,并且可能花费两倍以上的时间。但是,付出这些努力是值得的,因为它使我们能够证明该函数在常见场景中的行为符合预期,并能够防止在重构或添加新功能时引入回归。完整的测试在此处。我很高兴地说,我的IDE报告了超过90%的覆盖率。
测试主题的第一部分到此结束。在这篇文章中,我们探讨了测试与外部资源(例如Couchbase)集成的函数的策略。我们还展示了TestContainers库对于测试分布式系统的组件是多么有用,尤其是在使用模拟、存根或嵌入式服务器不切实际的情况下。第二部分将介绍基于函数的流应用程序的单元测试和集成测试。
感谢您的参与!希望您觉得这些内容有所帮助。本系列还剩几篇文章。