如何测试 Spring Cloud Stream 应用 (第一部分)

工程 | Artem Bilan | 2017年10月24日 | ...

亲爱的 Spring 社区!

作为事件驱动的微服务框架,Spring Cloud Stream 在开发事件驱动应用程序时显著简化了复杂性。本文并非重点关注其功能特性和优势(如需了解更多信息,请参阅参考指南),我的目标是向您展示该框架在测试方面提供的功能、工具和技术。本文旨在鼓励社区就现有的测试基础设施分享反馈,因此欢迎提出任何想法、评论或功能请求!我们努力适应大家的输入,以在未来提供更好的开发和测试体验。

那么,让我从引言开始

“令人惊讶的是,如此多的集成解决方案的部署几乎没有进行任何测试。如果进行测试,通常也是手动且零星的。集成解决方案未经过彻底测试的原因之一是,测试异步的消息中间件解决方案具有挑战性。”

- Gregor Hohpe,
企业集成项目中的测试驱动开发

没错,当我们朝着将遗留单体工作负载现代化为云原生微服务架构迈进时,任何形式的测试都扮演着非常关键的角色。更具体地说,强烈建议在每次提交时迭代地运行测试套件的自动化 CI 管道,事实上,这已被广泛接受为采用的实践。

使用 Spring Cloud Stream 时,测试简单的(“微型”)业务逻辑可能并不明显,我们可能会忽略微服务之间的集成测试,因为最终用户只需要中间件绑定——框架会自动处理所有其他样板语义!或者,即使我们想这样做,也没有有用的工具可以快速启动和关闭它。这就是我们在这里的原因——打破关于微服务简单性和测试其复杂性的神话!

什么是 Spring Cloud Stream 应用?

Spring Cloud Stream 应用是一个事件驱动的、基于 Spring Boot 的微服务,它基于 Spring Integration 实现的众所周知的企业集成模式。它们的测试工具和实用程序(来自Spring 测试框架SpringRunner,Spring Boot 的自动配置用于测试环境,来自 Spring Integration 的模拟等等)为我们带来了一个有趣的组合,使我们的单元测试和集成测试不再那么具有挑战性。我们只需要知道什么、何时以及如何使用!

例如,以下简单的 Spring Cloud Stream 应用(一个源)根据信号量状态定期生成“foo”或“bar”字符串

@SpringBootApplication
@EnableBinding(Source.class)
public class FooBarSource {

  private AtomicBoolean semaphore = new AtomicBoolean(true);

  @Bean
  @InboundChannelAdapter(channel = Source.OUTPUT,
                      poller = @Poller(fixedDelay = "100"))
  public MessageSource<String> fooBarStrings() {
     return () ->
          new GenericMessage<>(
                this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar");
  }
}

或者这个处理器将传入的字符串转换为大写

@SpringBootApplication
@EnableBinding(Processor.class)
public class ToUpperCaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String transform(String payload) {
     return payload.toUpperCase();
  }
}

或者最后这个接收器将传入的有效负载保存到数据库

@SpringBootApplication
@EnableBinding(Sink.class)
public class JdbcSink {

  @Bean
  @ServiceActivator(inputChannel = Sink.INPUT)
  public MessageHandler logHandler(DataSource dataSource) {
     return new JdbcMessageHandler(dataSource,
                                 "INSERT INTO foobar (value) VALUES (:payload)");
  }
}

当然,最明显的单元测试(例如,对于处理器代码)将是

@Test
public void testUpperCase() {
  assertEquals("FOO", new ToUpperCaseProcessor().transform("foo"));
}

但我们的目标实际上是测试所有这些 Spring Boot、Cloud 和 Integration 的好东西。让我们看看如何做到这一点。

单元测试

由于 Spring Cloud Stream 完全基于 Spring Boot,因此很明显我们的测试应该用@SpringBootTest标记,并且所有其功能和实用程序(包括模拟和间谍)都可供我们使用。请参阅测试 Spring Boot 应用程序以了解更多信息。另一方面,Spring Cloud Stream 也是 Spring Integration 应用程序。因此,来自Spring Integration Test模块的所有 Spring Integration 测试工具和实用程序也可以在 Spring Cloud Stream 应用程序测试中使用。

此外,Spring Cloud Stream 提供了在不连接到消息系统的情况下测试微服务应用程序的支持。您可以使用spring-cloud-stream-test-support库提供的TestSupportBinder来实现这一点,该库可以作为测试依赖项添加到应用程序中。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

TestSupportBinder允许用户与绑定的通道交互并检查应用程序发送和/或接收的消息。对于出站消息通道,TestSupportBinder注册单个订阅者并将应用程序发出的消息保留在MessageCollector中。它们可以在测试期间检索,并且可以对它们进行断言。用户还可以将消息发送到入站消息通道,以便使用者应用程序可以消费这些消息。

@Autowired
private Source channels;

@Autowired
private MessageCollector collector;

@Test
public void testMessages() {
  BlockingQueue<Message<?>> messages = collector.forChannel(channels.output());

  assertThat(messages, receivesPayloadThat(is("foo")));
  assertThat(messages, receivesPayloadThat(is("bar")));
  assertThat(messages, receivesPayloadThat(is("foo")));
  assertThat(messages, receivesPayloadThat(is("bar")));
}

请注意receivesPayloadThat(),它是一个静态实用程序。这来自MessageQueueMatcher,与receivesMessageThat()一起,我们可以使用它来使用来自源或处理器应用程序输出通道中传入消息的任何适当的Matcher实现进行断言。

来自 Spring Boot 的@MockBean@SpyBean可用于验证与我们的流监听器的交互。

@SpyBean
private ToUpperCaseProcessor toUpperCaseProcessor;

@Test
public void testMessages() {
  this.channels.input().send(new GenericMessage<>("foo"));

  BlockingQueue<Message<?>> messages = collector.forChannel(channels.output());

  assertThat(messages, receivesPayloadThat(is("FOO")));

  verify(this.toUpperCaseProcessor, times(1)).transform(anyString());
}

正确地,要发送测试数据,只需获取处理器的input MessageChannel并构建Message对象即可。要与payload一起发送一些用于测试的标头,您可以使用org.springframework.integration.support.MessageBuilder

Message<String> testMessage =
     MessageBuilder.withPayload("headers")
           .setHeader("foo", "bar")
           .build();

input.send(testMessage);

Message<String> expected =
     MessageBuilder.withPayload("HEADERS")
           .copyHeaders(testMessage.getHeaders())
           .build();

Matcher<Message<Object>> sameExceptIgnorableHeaders =
     (Matcher<Message<Object>>) (Matcher<?>) sameExceptIgnorableHeaders(expected);

assertThat(messages, receivesMessageThat(sameExceptIgnorableHeaders));

测试接收器应用程序有点棘手,因为这通常是数据管道解决方案的最后一步。在这里,我们只是将数据发送到目标系统,依靠特定协议的通道适配器。例如,我们的JdbcSink应用程序使用 Spring Integration 中的JdbcMessageHandler将传入消息的有效负载插入数据库。因此,要测试JdbcSink是否正常工作,我们需要查询数据库。幸运的是,Spring Boot 提供了DataSource的自动配置——这与确保预期数据库供应商的驱动程序依赖项位于类路径上一样简单,例如:

<dependency>
  <groupId>org.hsqldb</groupId>
  <artifactId>hsqldb</artifactId>
  <scope>test</scope>
</dependency>

通过 Spring Boot 处理这一点后,与数据库交互的集成测试变得很简单。我们只需注入由 Spring Boot 自动配置的JdbcTemplate即可。

@Autowired
private Sink channels;

@Autowired
private JdbcTemplate jdbcTemplate;

@Test
public void testMessages() {
  this.channels.input().send(new GenericMessage<>("foo"));
  this.channels.input().send(new GenericMessage<>("bar"));

  List<Map<String, Object>> data =
                    this.jdbcTemplate.queryForList("SELECT * FROM foobar");

  assertThat(data.size()).isEqualTo(2);
  assertThat(data.get(0).get("value")).isEqualTo("foo");
  assertThat(data.get(1).get("value")).isEqualTo("bar");
}

这种基于 Spring Boot 测试框架的方法适用于 JDBC 和 JPA 配置。我们可以将类似的方法应用于许多其他协议,包括 ActiveMQ、MongoDB、Cassandra、Gemfire、Hazelcast 和许多其他协议。如果我们无法为我们的目的获得嵌入式测试范围的服务,我们可以选择模拟目标协议的MessageHandler并断言与它的交互。

@MockBean(name = "jdbcHandler")
private MessageHandler jdbcMessageHandler;
…
ArgumentCaptor<Message<?>> messageArgumentCaptor =
     (ArgumentCaptor<Message<?>>) (ArgumentCaptor<?>)
                                          ArgumentCaptor.forClass(Message.class);

verify(this.jdbcMessageHandler, times(2))
                          .handleMessage(messageArgumentCaptor.capture());

Message<?> message = messageArgumentCaptor.getValue();
assertThat(message).hasFieldOrPropertyWithValue("payload", "bar");

另一种验证发送到接收器(或处理器输出)的消息的方法可以作为ChannelInterceptor注入到消息通道中。

AbstractMessageChannel input = (AbstractMessageChannel) this.channels.input();

final AtomicReference<Message<?>> messageAtomicReference =
                                   new AtomicReference<>();

ChannelInterceptor assertionInterceptor = new ChannelInterceptorAdapter() {

  @Override
  public void afterSendCompletion(Message<?> message, MessageChannel channel,
                                          boolean sent, Exception ex) {
     messageAtomicReference.set(message);
     super.afterSendCompletion(message, channel, sent, ex);
  }

};

input.addInterceptor(assertionInterceptor);
input.send(new GenericMessage<>("foo"));

…
Message<?> message1 = messageAtomicReference.get();
assertThat(message1).isNotNull();
assertThat(message1).hasFieldOrPropertyWithValue("payload", "foo");

总结

上述应用程序的完整代码,当然还有它们的测试,都可以在 Spring Cloud Stream 示例项目中的testing模块中找到。尝试使用 Spring Boot、Spring Integration 和 Spring Cloud Stream 提供的各种测试工具和技术。不要犹豫,将这些工具结合起来以获得更全面的测试覆盖率。

本文的第二部分将重点介绍 Spring Cloud Stream 应用程序的集成测试和事件驱动的交互。

最后,请参加我的会议,在Spring One Platform十二月份的会议上,在我的演讲中,我将深入了解 Spring Integration 5.0 中新的测试功能。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部