如何测试 Spring Cloud Stream 应用程序(第一部分)

工程 | Artem Bilan | 2017年10月24日 | ...

亲爱的Spring社区!

作为一款事件驱动的微服务框架,Spring Cloud Stream 极大地简化了开发事件驱动应用程序的复杂性。本文的重点并非其功能能力及其优点(想了解更多,请查阅 参考指南),而是从测试的角度,展示该框架所提供的工具和技术。本文旨在鼓励社区分享对现有测试基础设施的反馈,因此,任何想法、评论或功能请求都欢迎!我们努力根据输入进行调整,以提供更好的开发和测试体验。

那么,让我从这段引言开始

"令人惊讶的是,许多集成解决方案在部署时测试很少,甚至没有测试。如果有测试,通常也是手动且零星的。集成解决方案未经过充分测试的一个原因是,测试异步、基于消息的中间件解决方案极具挑战性。"

- Gregor Hohpe,
企业集成项目中的测试驱动开发

没错,当我们致力于将遗留的单体工作负载现代化改造为云原生风格的微服务架构时,任何形式的测试都起着至关重要的作用。更具体地说,拥有自动化的 CI 流水线,能够在每次提交时迭代地运行测试套件,是高度推荐的,并且事实上已被广泛接受为一种实践。

在使用 Spring Cloud Stream 时,测试简单的(“微”)业务逻辑可能并不明显,我们可能会忽略微服务之间的集成测试,因为除了中间件绑定之外,用户不需要其他任何东西——框架自动化了所有其他样板化语义!或者即使我们想这样做,也没有有效的工具可以快速启动和销毁。这就是我们在此的原因——打破微服务简单性与测试复杂性之间神话!

什么是 Spring Cloud Stream 应用程序?

Spring Cloud Stream 应用程序是基于众所周知的 Spring Integration 实现的企业集成模式的事件驱动的 Spring Boot 微服务。它们的测试工具和实用程序(来自 Spring 测试框架SpringRunner,Spring Boot 的测试环境 自动配置,Spring Integration 的模拟等)为我们提供了一个有趣的组合,使得单元测试和集成测试不再那么具有挑战性。我们只需要知道如何以及何时使用它们!

例如,下面这个简单的 Spring Cloud Stream 应用程序,一个源,会根据信号灯状态周期性地生成“foo”或“bar”字符串

@SpringBootApplication
@EnableBinding(Source.class)
public class FooBarSource {

  private AtomicBoolean semaphore = new AtomicBoolean(true);

  @Bean
  @InboundChannelAdapter(channel = Source.OUTPUT,
                      poller = @Poller(fixedDelay = "100"))
  public MessageSource<String> fooBarStrings() {
     return () ->
          new GenericMessage<>(
                this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar");
  }
}

或者这个处理器,将传入的字符串转换为大写

@SpringBootApplication
@EnableBinding(Processor.class)
public class ToUpperCaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String transform(String payload) {
     return payload.toUpperCase();
  }
}

最后这个宿,将传入的有效负载保存到数据库

@SpringBootApplication
@EnableBinding(Sink.class)
public class JdbcSink {

  @Bean
  @ServiceActivator(inputChannel = Sink.INPUT)
  public MessageHandler logHandler(DataSource dataSource) {
     return new JdbcMessageHandler(dataSource,
                                 "INSERT INTO foobar (value) VALUES (:payload)");
  }
}

当然,最明显的单元测试,例如,对于处理器代码会是

@Test
public void testUpperCase() {
  assertEquals("FOO", new ToUpperCaseProcessor().transform("foo"));
}

但我们的目标确实是测试所有这些 Spring Boot、Cloud 和 Integration 的优点。让我们看看如何做到这一点。

单元测试

由于 Spring Cloud Stream 完全基于 Spring Boot,因此我们的测试自然会使用 @SpringBootTest 标记,并且所有其功能和实用程序(包括模拟和间谍)都可供我们使用。有关更多信息,请参阅 测试 Spring Boot 应用程序。另一方面,Spring Cloud Stream 也是一个 Spring Integration 应用程序。因此,来自 Spring Integration Test 模块的所有 Spring Integration 测试工具和实用程序也可以在 Spring Cloud Stream 应用程序的测试中使用。

此外,Spring Cloud Stream 支持在不连接消息传递系统的情况下测试微服务应用程序。您可以通过使用 spring-cloud-stream-test-support 库提供的 TestSupportBinder 来实现此目的,该库可以作为测试依赖添加到应用程序中。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

TestSupportBinder 允许用户与绑定的通道进行交互,并检查应用程序发送和/或接收的消息。对于出站消息通道,TestSupportBinder 会注册一个订阅者,并将应用程序发出的消息保留在 MessageCollector 中。在测试期间可以检索这些消息,并对其进行断言。用户还可以向入站消息通道发送消息,以便消费者应用程序可以消耗这些消息。

@Autowired
private Source channels;

@Autowired
private MessageCollector collector;

@Test
public void testMessages() {
  BlockingQueue<Message<?>> messages = collector.forChannel(channels.output());

  assertThat(messages, receivesPayloadThat(is("foo")));
  assertThat(messages, receivesPayloadThat(is("bar")));
  assertThat(messages, receivesPayloadThat(is("foo")));
  assertThat(messages, receivesPayloadThat(is("bar")));
}

请注意 receivesPayloadThat(),这是一个静态实用程序。它来自 MessageQueueMatcher,与 receivesMessageThat() 一起,我们可以使用它来断言来自源应用程序或处理器应用程序输出通道的传入消息是否符合任何适当的 Matcher 实现。

来自 Spring Boot 的 @MockBean@SpyBean 可用于验证与我们的流监听器的交互。

@SpyBean
private ToUpperCaseProcessor toUpperCaseProcessor;

@Test
public void testMessages() {
  this.channels.input().send(new GenericMessage<>("foo"));

  BlockingQueue<Message<?>> messages = collector.forChannel(channels.output());

  assertThat(messages, receivesPayloadThat(is("FOO")));

  verify(this.toUpperCaseProcessor, times(1)).transform(anyString());
}

是的,要发送测试数据,只需获取处理器的 input MessageChannel 并构建 Message 对象即可。要与 payload 一起发送一些用于测试的头部,您可以使用 org.springframework.integration.support.MessageBuilder

Message<String> testMessage =
     MessageBuilder.withPayload("headers")
           .setHeader("foo", "bar")
           .build();

input.send(testMessage);

Message<String> expected =
     MessageBuilder.withPayload("HEADERS")
           .copyHeaders(testMessage.getHeaders())
           .build();

Matcher<Message<Object>> sameExceptIgnorableHeaders =
     (Matcher<Message<Object>>) (Matcher<?>) sameExceptIgnorableHeaders(expected);

assertThat(messages, receivesMessageThat(sameExceptIgnorableHeaders));

测试宿应用程序有点棘手,因为这通常是数据管道解决方案的最后一步。在这里,我们只需将数据发送到目标系统,依赖于特定协议的通道适配器。例如,我们的 JdbcSink 应用程序使用 Spring Integration 的 JdbcMessageHandler 将传入消息的有效负载插入数据库。因此,为了测试 JdbcSink 是否工作正常,我们需要查询数据库。幸运的是,Spring Boot 提供了 DataSource 的自动配置——只需确保将相应数据库供应商的驱动程序依赖项添加到类路径中即可,例如:

<dependency>
  <groupId>org.hsqldb</groupId>
  <artifactId>hsqldb</artifactId>
  <scope>test</scope>
</dependency>

在 Spring Boot 处理了这些之后,与数据库交互的集成测试就变得很简单了。我们只需注入 Spring Boot 自动配置的 JdbcTemplate

@Autowired
private Sink channels;

@Autowired
private JdbcTemplate jdbcTemplate;

@Test
public void testMessages() {
  this.channels.input().send(new GenericMessage<>("foo"));
  this.channels.input().send(new GenericMessage<>("bar"));

  List<Map<String, Object>> data =
                    this.jdbcTemplate.queryForList("SELECT * FROM foobar");

  assertThat(data.size()).isEqualTo(2);
  assertThat(data.get(0).get("value")).isEqualTo("foo");
  assertThat(data.get(1).get("value")).isEqualTo("bar");
}

这种基于 Spring Boot 测试框架的方法适用于 JDBC 和 JPA 配置。我们可以将类似的方法应用于许多其他协议,包括 ActiveMQ、MongoDB、Cassandra、Gemfire、Hazelcast 等等。如果我们无法为我们的目的拥有一个嵌入式的、测试作用域的服务,我们可以选择 **模拟** 目标协议的 MessageHandler 并断言与它的交互。

@MockBean(name = "jdbcHandler")
private MessageHandler jdbcMessageHandler;
…
ArgumentCaptor<Message<?>> messageArgumentCaptor =
     (ArgumentCaptor<Message<?>>) (ArgumentCaptor<?>)
                                          ArgumentCaptor.forClass(Message.class);

verify(this.jdbcMessageHandler, times(2))
                          .handleMessage(messageArgumentCaptor.capture());

Message<?> message = messageArgumentCaptor.getValue();
assertThat(message).hasFieldOrPropertyWithValue("payload", "bar");

另一种验证发送到宿(或从处理器发出)的消息的方法可以作为 ChannelInterceptor 注入到消息通道中。

AbstractMessageChannel input = (AbstractMessageChannel) this.channels.input();

final AtomicReference<Message<?>> messageAtomicReference =
                                   new AtomicReference<>();

ChannelInterceptor assertionInterceptor = new ChannelInterceptorAdapter() {

  @Override
  public void afterSendCompletion(Message<?> message, MessageChannel channel,
                                          boolean sent, Exception ex) {
     messageAtomicReference.set(message);
     super.afterSendCompletion(message, channel, sent, ex);
  }

};

input.addInterceptor(assertionInterceptor);
input.send(new GenericMessage<>("foo"));

…
Message<?> message1 = messageAtomicReference.get();
assertThat(message1).isNotNull();
assertThat(message1).hasFieldOrPropertyWithValue("payload", "foo");

总结

上述应用程序的全部代码,当然还有它们的测试,都可以在 Spring Cloud Stream 示例项目的 testing 模块中找到。尝试使用 Spring Boot、Spring Integration 和 Spring Cloud Stream 提供的各种测试工具和技术。不要犹豫组合这些工具以获得更全面的测试覆盖。

本文的第二部分将重点介绍 Spring Cloud Stream 应用程序的集成测试以及事件驱动的交互。

最后,请参加我将于今年十二月在 Spring One Platform 会议上进行的演示,届时我将在演讲中介绍 Spring Integration 5.0 中新的测试功能。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有