领先一步
VMware 提供培训和认证,助您快速提升技能。
了解更多这是测试 Stream 应用的第二部分。在 第一部分 中,我们实现并测试了我们的示例 couchbase-sink 应用所需的核心功能。函数级别的测试涵盖了预期的成功和错误场景,并依赖于 Testcontainers 来配置 Couchbase 集群。本文假设您已阅读第一部分,并在此基础上继续。
在第一部分中,我们验证了我们为将数据 upsert 到 Couchbase 而编写的函数按预期工作。现在,我们可以使用该函数(作为 java.util.Consumer
公开)来实现一个 sink,用于在使用 Spring Cloud Stream 构建的数据管道中使用。与大多数预打包的流应用程序一样,我们只需将函数配置嵌入到 Spring Boot 应用程序中。与为 Kafka 和 Rabbit 配置相同的预打包应用程序不同,我们将使用 Kafka 绑定器自己构建。
这是主要的应用程序类
@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
public static void main(String... args) {
new SpringApplication(CouchbaseSinkApplication.class).run(args);
}
}
我们还需要添加一些依赖项:函数、Spring Cloud Stream 和 Kafka 绑定器。
<dependency>
<groupId>io.spring.example</groupId>
<artifactId>couchbase-consumer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
而且,由于我们自己构建,我们可以在 application.properties
中设置一些必需的属性。由于 couchbase-consumer
包含 2 个候选函数,我们需要告诉 Spring Cloud Stream 使用 Consumer
包装器。此外,我们将默认的消费者输入绑定名称 couchbaseConsumer-in-0
另名为 input
,以便 sink 可以与 Spring Cloud Data Flow 一起工作。
spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input
仅此而已!至少我们这么认为。我们如何确定呢?不出所料,我们需要进行的测试与函数级别测试类似。但是我们实际上不需要运行每个测试用例,因为我们已经知道函数如何在具有各种属性设置的 boot 应用程序中运行。但是我们还没有通过 Spring Cloud Stream 调用该函数。此外,这并不太费事,因为我们可以重用为函数编写的许多测试代码。因此,我们只需要一个“冒烟测试”来运行成功路径,以确保我们没有遗漏一些必需的依赖项,或者我们的配置属性中没有错别字,或者现在没有问题,或者在将来升级某些依赖项时不会出现问题。在这里,我们配置一个 Couchbase TestContainer,就像我们测试函数一样。但是,我们不会直接调用该函数,而是在将消息发送到为 sink 配置的输入目标时,让 Spring Cloud Stream 来调用它。对于此测试,我们使用 TestChannelBinder
,这是一个由以下依赖项提供的内存绑定器
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
我们使用 TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class)
将 TestChannelBinder 添加到我们的应用程序上下文中以进行测试。这给了我们一个 InputDestination
bean 来向 sink 发送消息。与函数测试一样,我们使用 Cluster
对象来验证数据是否存在于 Couchbase 中。由于 *upsert* 操作是异步的,我们需要轮询数据存储一段时间,直到数据出现。 awaitility 库非常适合测试异步系统。在这种情况下,我们将给它 10 秒钟的时间,然后再假设操作失败。
@Testcontainers
public class CouchbaseSinkApplicationTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
static Map<String, Object> connectProperties = new HashMap<>();
@BeforeAll
static void initialize() {
connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
connectProperties.put("spring.couchbase.username", container.getUsername());
connectProperties.put("spring.couchbase.password", container.getPassword());
}
@Test
void test() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(CouchbaseSinkApplication.class))
.web(WebApplicationType.NONE)
.properties(connectProperties)
.run("--couchbase.consumer.bucketExpression='test'",
"--couchbase.consumer.keyExpression=payload.email")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
Cluster cluster = context.getBean(Cluster.class);
inputDestination.send(new GenericMessage<>(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
User user = cluster.bucket("test")
.defaultCollection().get("[email protected]")
.contentAs(User.class);
assertThat(user).isNotNull();
assertThat(user.getName()).isEqualTo("Bart Simpson");
});
}
}
}
至此,我们在应用程序和函数测试之间有了良好的测试覆盖率。但是我们还没有验证我们想要构建和部署的应用程序二进制文件是否在真实的集成环境中工作。由于 sink 应用程序使用 Kafka 绑定器,因此集成测试环境需要一个 Kafka 代理、一个 Couchbase 集群和我们部署的应用程序。我们可以直接部署和运行 Spring Boot 可执行 jar。如今,它更常是一个容器镜像。
通常,假设作为容器构建的 sink 可以工作并不是太冒险,但我们至少希望确保我们知道如何配置应用程序以使用外部 Kafka 代理和 Couchbase 集群,以及我们是否正确构建了我们的镜像。
对于预构建的 Spring Cloud Stream 应用程序,我们更有理由测试构建的工件。核心应用程序不提供任何额外的代码。相反,我们使用 spring-cloud-dataflow-apps-generator-plugin 自动生成可以在 Kafka 或 RabbitMQ 上运行的相同应用程序。该插件需要 Maven 配置,我们为每个应用程序手动添加。仅仅因为我们的函数与 TestChannelBinder 一起工作,我们就不能确定构建的工件是否有效,直到我们运行它为止。错误配置应用程序生成器插件、插件本身的更改、基础镜像或任何依赖项都可能破坏某些东西。Testcontainers 和 Junit 5 为我们提供了一种相对直接的方法来使用 Kafka 和 RabbitMQ 集成测试预构建的应用程序。为了帮助我们编写集成测试,我们在 stream-applications-test-suport 中提供了额外的支持。可以通过添加依赖项来使社区可以使用此库
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-test-support</artifactId>
<scope>test</scope>
</dependency>
此示例包括一个集成测试来测试构建的镜像,在本例中使用 Spring Boot Maven 插件 构建。与应用程序测试一样,我们将只插入 Kafka、Couchbase 和我们的镜像,打开电源,并确保我们没有看到或闻到任何烟雾。
完整的集成测试是
@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {
static StreamAppContainer sink =
new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
static Cluster cluster;
@Autowired
TestTopicSender testTopicSender;
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
cluster = Cluster.connect(container.getConnectionString(),
ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
}
@AfterAll
static void stop() {
sink.stop();
}
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
}
为了解释这一点,让我们从 @KafkaStreamAppTest
类注释开始。这将启动一个 Kafka 测试容器,并使用 Spring for Apache Kafka 配置 Kafka 组件,我们可以使用这些组件来使用 Kafka 生成和使用消息。Kafka 容器在静态初始化器中启动,这使其成为真正的单例,允许在 JVM 中运行的每个测试都使用它。除了 Spring 配置之外,该注释还包括 @TestContainers
作为元注释。对于此测试,我们不允许 Testcontainers 管理 StreamAppContainer
的生命周期,因为我们希望在知道 Couchbase 集群正在运行后启动它。Couchbase 容器有一些额外的配置。为方便起见,它与 StreamAppContainer
共享一个虚拟网络(自动配置为使用与 Kafka 容器相同的网络)。这允许 Stream App Container 使用我们选择的别名 couchbase-server
连接到 Couchbase 服务器(记住,容器内的 localhost
指的是它自己的 IP 地址)。
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
StreamAppContainer 是一个具有所需配置的 GenericContainer,用于连接到 Kafka 并使用 Kafka 绑定器。Spring 配置还设置了一个已知主题上的侦听器,以使用 Kafka 接收来自容器的任何输出。在本例中未使用此功能,因为我们只有一个 sink 的输入。输入目标是随机生成的,并通过 getInputDestination()
访问。
static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
一旦 Couchbase 容器运行,我们将启动 sink。我们等待标准的 Spring Boot 启动消息以确认 sink 已启动。我们还添加了一个 LogConsumer 来输出所有日志消息,以防出现错误。请注意,连接字符串只是使用 Couchbase 容器的网络别名。这是可能的,因为 sink 和 Couchbase 使用相同的虚拟网络。在这里,我们在命令行上传递所有属性,但我们也可以将它们设置为环境变量,通过 withEnvironment()
。由于我们控制 sink 的生命周期,因此我们需要在所有测试完成后停止它。
该测试使用自动注入的 TestTopicSender
。这是一个与中间件无关的接口,在本例中由 KafkaTemplate 支持。此接口对于运行 Kafka 和 Rabbit 的相同测试用例很有用。在这里,我们也可以自动注入 KafkaTemplate
。在撰写本文时,只有 String serdes 为 Kafka 模板配置,因此我们使用 ObjectMapper
来处理字符串。
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
由于此测试需要 sink 镜像,因此我们使用 Junit 5 的 @Tag
注释将其标记为集成测试。我们还配置了 Maven 将其从正常构建中排除,并且仅在设置了 *integration* 配置文件时才构建镜像并运行它。完整的源代码 在此,需要 Java 8+ 和 Docker。
在这篇文章中,我们探讨了测试与外部服务(如 Couchbase)集成的 Spring Cloud Stream 应用程序的策略。第一部分中描述的大部分测试是在函数级别完成的。应用程序和集成测试实际上是冒烟测试,用于验证我们是否正确构建、配置和集成了所有内容。我们还展示了如何使用 TestContainers 测试 Stream 应用程序。
感谢您的参与!我们希望您觉得这些内容有所帮助。在本系列结束之前,我们还有几篇文章。