抢占先机
VMware 提供培训和认证,助力您的进步。
了解更多这是测试 Stream 应用的第 2 部分。在第 1 部分中,我们为示例 couchbase-sink 应用实现了核心函数并进行了测试。函数级别的测试涵盖了预期的成功和错误场景,并依赖于 Testcontainers 来提供 Couchbase 集群。本文假设您已阅读第 1 部分,并将从上次中断的地方继续。
在第 1 部分中,我们验证了为将数据插入(upsert)到 Couchbase 而编写的函数按预期工作。现在我们可以使用该函数(公开为 java.util.Consumer
)来实现一个 sink,用于构建 Spring Cloud Stream 数据管道。与大多数预打包的 Stream 应用一样,我们只需将函数配置嵌入到 Spring Boot 应用中。与预打包的应用不同的是,预打包应用会生成针对 Kafka 和 RabbitMQ 配置的相同应用,而我们将自行创建使用 Kafka binder 的应用。
这是主要应用类
@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
public static void main(String... args) {
new SpringApplication(CouchbaseSinkApplication.class).run(args);
}
}
我们还需要添加一些依赖项:函数、Spring Cloud Stream 和 Kafka binder。
<dependency>
<groupId>io.spring.example</groupId>
<artifactId>couchbase-consumer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
而且,既然我们是自行创建的,我们可以在 application.properties
中设置一些必需的属性。由于 couchbase-consumer
包含 2 个候选函数,我们需要告诉 Spring Cloud Stream 使用 Consumer
包装器。此外,我们将默认 consumer input binding 名称 couchbaseConsumer-in-0
别名为 input
,以便该 sink 可以与 Spring Cloud Data Flow 一起使用。
spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input
就这样!至少我们是这么认为的。我们怎么能确定呢?毫不奇怪,我们需要的测试类型类似于函数级别的测试。但我们并非真的需要运行每个测试用例,因为我们已经知道函数在具有各种属性设置的 Boot 应用中将如何表现。但是,我们尚未实际通过 Spring Cloud Stream 调用该函数。而且,成本也不是很高,因为我们可以重用为函数编写的大部分测试代码。所以我们只需要一个“冒烟测试”来运行正常路径,以确保我们没有遗漏某些必需的依赖项,或者我们的配置属性没有拼写错误,或者现在或将来升级某些依赖项时没有意外。这里我们像测试函数时一样,配置了一个 Couchbase TestContainer。但我们不是直接调用函数,而是通过将消息发送到为 sink 配置的输入目标,让 Spring Cloud Stream 来完成。对于此测试,我们使用 TestChannelBinder
,这是由以下依赖项提供的内存中 binder。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
我们使用 TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class)
将 TestChannelBinder 添加到测试的应用上下文中。这为我们提供了一个 InputDestination
bean,用于向 sink 发送消息。与函数测试一样,我们使用 Cluster
对象来验证 Couchbase 中是否存在数据。由于 upsert 操作是异步的,我们需要轮询数据存储一段时间,直到数据存在。 awaitility 库非常适合测试异步系统。在这种情况下,我们将等待 10 秒,然后假设操作失败。
@Testcontainers
public class CouchbaseSinkApplicationTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
static Map<String, Object> connectProperties = new HashMap<>();
@BeforeAll
static void initialize() {
connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
connectProperties.put("spring.couchbase.username", container.getUsername());
connectProperties.put("spring.couchbase.password", container.getPassword());
}
@Test
void test() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(CouchbaseSinkApplication.class))
.web(WebApplicationType.NONE)
.properties(connectProperties)
.run("--couchbase.consumer.bucketExpression='test'",
"--couchbase.consumer.keyExpression=payload.email")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
Cluster cluster = context.getBean(Cluster.class);
inputDestination.send(new GenericMessage<>(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
User user = cluster.bucket("test")
.defaultCollection().get("[email protected]")
.contentAs(User.class);
assertThat(user).isNotNull();
assertThat(user.getName()).isEqualTo("Bart Simpson");
});
}
}
}
至此,我们在应用和函数测试之间有了良好的测试覆盖率。但我们尚未验证我们想要构建和部署的应用二进制文件在真正的集成环境中是否工作。由于 sink 应用使用 Kafka binder,集成测试环境需要一个 Kafka broker、一个 Couchbase 集群和我们部署的应用。我们可以直接部署和运行 Spring Boot 可执行 jar。现在,它更常见的是容器镜像。
一般来说,假设构建为容器的 sink 会工作风险不大,但我们至少要确保我们知道如何配置应用以使用外部 Kafka broker 和 Couchbase 集群,以及我们正确地构建了镜像。
对于预构建的 Spring Cloud Stream 应用,我们有进一步的理由测试构建的 artifact。核心应用不提供任何附加代码。相反,我们使用 spring-cloud-dataflow-apps-generator-plugin 自动生成可以与 Kafka 或 RabbitMQ 一起运行的相同应用。该插件需要 Maven 配置,我们手动为每个应用添加。仅仅因为我们的函数与 TestChannelBinder 一起工作,我们不能确定构建的 artifact 工作正常,直到我们运行它。 apps generator 插件配置错误、插件本身的更改、基础镜像或任何依赖项的更改都可能导致问题。Testcontainers 和 Junit 5 为我们提供了一种相对直接的方式来集成测试预构建的应用与 Kafka 和 RabbitMQ。为了帮助我们编写集成测试,我们在 stream-applications-test-suport 中提供了额外的支持。通过添加以下依赖项,社区可以使用此库。
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-test-support</artifactId>
<scope>test</scope>
</dependency>
该示例包含一个集成测试,用于测试构建的镜像,在本例中是使用 Spring Boot Maven plugin 构建的。就像应用测试一样,我们只需插入 Kafka、Couchbase 和我们的镜像,通电,并确保我们没有看到或闻到任何烟味。
完整的集成测试是
@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {
static StreamAppContainer sink =
new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
static Cluster cluster;
@Autowired
TestTopicSender testTopicSender;
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
cluster = Cluster.connect(container.getConnectionString(),
ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
}
@AfterAll
static void stop() {
sink.stop();
}
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
}
为了解释这一点,让我们从 @KafkaStreamAppTest
类注解开始。这会启动一个 Kafka test container,并使用 Spring for Apache Kafka 配置 Kafka 组件,我们可以使用它们与 Kafka 生产和消费消息。Kafka 容器在静态初始化器中启动,这使得它成为真正的单例,允许在 JVM 中运行的每个测试都使用它。除了 Spring 配置之外,该注解还包含 @TestContainers
作为元注解。对于此测试,我们不让 Testcontainers 管理 StreamAppContainer
的生命周期,因为我们希望在知道 Couchbase 集群正在运行后才启动它。Couchbase 容器有一些附加配置。为了方便起见,它与 StreamAppContainer
共享一个虚拟网络(自动配置为使用与 Kafka 容器相同的网络)。这允许 Stream App Container 使用我们选择的别名 couchbase-server
连接到 Couchbase 服务器(记住,容器内的 localhost
指的是它自己的 IP 地址)。
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
StreamAppContainer 是一个 GenericContainer,包含连接到 Kafka 和使用 Kafka binder 的所需配置。Spring 配置还设置了一个监听器,用于在已知 topic 上消费来自容器的任何输出。在此情况下未使用,因为我们只有一个 sink 的输入。输入目标是随机生成的,并通过 getInputDestination()
访问。
static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
Couchbase 容器运行后,我们将启动 sink。我们等待标准的 Spring Boot 启动消息以确认 sink 已启动。我们还添加了一个 LogConsumer 来输出所有日志消息,以防出现错误。请注意,连接字符串仅使用 Couchbase 容器的网络别名。这是可能的,因为 sink 和 Couchbase 使用相同的虚拟网络。这里,我们在命令行上传递所有属性,但我们也可以通过 withEnvironment()
将它们设置为环境变量。由于我们控制 sink 的生命周期,我们需要在所有测试完成后停止它。
该测试使用自动装配的 TestTopicSender
。这是一个与中间件无关的接口,在本例中由 KafkaTemplate 支持。该接口对于运行 Kafka 和 Rabbit 的相同测试用例非常有用。这里,我们也可以自动装配 KafkaTemplate
。在撰写本文时,Kafka template 仅配置了 String serdes,因此我们使用 ObjectMapper
来处理 String。
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
由于此测试需要 sink 镜像,因此我们使用 Junit 5 的 @Tag
注解将其标记为集成测试。我们还配置了 Maven,将其从正常构建中排除,并且仅在设置了 integration profile 时才构建镜像并运行。完整的源代码位于此处,需要 Java 8+ 和 Docker。
在本文中,我们探讨了测试与外部服务(如 Couchbase)集成的 Spring Cloud Stream 应用的策略。大部分测试(在第 1 部分中描述)是在函数级别完成的。应用和集成测试实际上是冒烟测试,用于验证我们正确构建、配置和集成了所有内容。我们还展示了如何使用 TestContainers 测试 Stream 应用。
感谢阅读!希望您觉得这些内容有帮助。本系列还有几篇文章。