如何测试 Spring Cloud Stream 应用 (第一部分)

工程 | Artem Bilan | 2017 年 10 月 24 日 | ...

亲爱的 Spring 社区!

作为一个事件驱动的微服务框架,Spring Cloud Stream 极大地简化了事件驱动应用开发中的复杂性。本文的重点不是其功能特性和优势(要了解更多信息,请查阅参考指南),而是展示框架从测试角度提供的功能、工具和技术。本文旨在鼓励社区分享对现有测试基础设施的反馈,欢迎任何想法、评论或功能请求!我们努力根据输入进行调整,以便未来提供更好的开发和测试体验。

好的,让我先引用一段话

"令人惊讶的是,如此多的集成解决方案在部署时几乎没有或根本没有经过测试。即使有测试,通常也是手动和零散地进行的。集成解决方案没有得到彻底测试的原因之一是测试异步的、基于消息的中间件解决方案具有挑战性。"

- Gregor Hohpe,
企业集成项目中的测试驱动开发

没错,在我们迈向将传统单体工作负载现代化为云原生风格微服务架构的过程中,任何形式的测试都扮演着至关重要的角色。更具体地说,强烈建议并且实际上已被广泛接受的做法是,在每次提交时通过自动化 CI 流水线迭代地运行测试套件。

使用 Spring Cloud Stream 时,测试简单的(“微”)业务逻辑可能并不明显,我们可能会忽略微服务之间的集成测试,因为最终用户期望的不过是中间件绑定——框架自动化了所有其他样板语义!即使我们想这样做,也没有好用的工具可以快速启动和关闭它们。这就是我们在这里的原因——打破关于微服务的简单性和测试复杂性的迷思!

什么是 Spring Cloud Stream 应用?

Spring Cloud Stream 应用是基于 Spring Integration 实现的、符合知名企业集成模式的事件驱动型 Spring Boot 微服务。它们的测试工具和实用程序(来自 Spring 测试框架SpringRunner,Spring Boot 用于测试环境的自动配置,Spring Integration 的 mock 等)为我们带来了一个有趣的组合,使我们的单元测试和集成测试不再那么具有挑战性。我们只需要知道何时、何地以及如何使用!

例如,下面的简单 Spring Cloud Stream 应用,一个源,会根据信号量状态周期性地生成“foo”或“bar”字符串

@SpringBootApplication
@EnableBinding(Source.class)
public class FooBarSource {

  private AtomicBoolean semaphore = new AtomicBoolean(true);

  @Bean
  @InboundChannelAdapter(channel = Source.OUTPUT,
                      poller = @Poller(fixedDelay = "100"))
  public MessageSource<String> fooBarStrings() {
     return () ->
          new GenericMessage<>(
                this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar");
  }
}

或者这个处理器将输入的字符串转换为大写

@SpringBootApplication
@EnableBinding(Processor.class)
public class ToUpperCaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String transform(String payload) {
     return payload.toUpperCase();
  }
}

或者最后这个 sink 将输入的有效载荷保存到数据库

@SpringBootApplication
@EnableBinding(Sink.class)
public class JdbcSink {

  @Bean
  @ServiceActivator(inputChannel = Sink.INPUT)
  public MessageHandler logHandler(DataSource dataSource) {
     return new JdbcMessageHandler(dataSource,
                                 "INSERT INTO foobar (value) VALUES (:payload)");
  }
}

当然,对于处理器代码,最明显的单元测试例如可以是

@Test
public void testUpperCase() {
  assertEquals("FOO", new ToUpperCaseProcessor().transform("foo"));
}

但我们的目标是真正测试所有这些 Spring Boot、Cloud 和 Integration 的优势。让我们看看如何实现这一点。

单元测试

由于 Spring Cloud Stream 完全基于 Spring Boot,因此我们的测试显然应该标记为 @SpringBootTest,并且 Spring Boot 的所有功能和实用程序(包括 mocking 和 spying)都可以使用。更多信息请参阅测试 Spring Boot 应用。另一方面,Spring Cloud Stream 也是 Spring Integration 应用。因此,Spring Integration Test 模块中的所有 Spring Integration 测试工具和实用程序也可以用于 Spring Cloud Stream 应用测试。

此外,Spring Cloud Stream 提供了一种测试微服务应用的方法,无需连接到消息系统。您可以使用 spring-cloud-stream-test-support 库提供的 TestSupportBinder 来实现,该库可以作为测试依赖项添加到应用中

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

TestSupportBinder 允许用户与绑定通道交互,并检查应用发送和/或接收的消息。对于出站消息通道,TestSupportBinder 会注册一个订阅者,并将应用发出的消息保存在一个 MessageCollector 中。这些消息可以在测试期间检索,并对其进行断言。用户还可以向入站消息通道发送消息,以便消费者应用可以消费这些消息

@Autowired
private Source channels;

@Autowired
private MessageCollector collector;

@Test
public void testMessages() {
  BlockingQueue<Message<?>> messages = collector.forChannel(channels.output());

  assertThat(messages, receivesPayloadThat(is("foo")));
  assertThat(messages, receivesPayloadThat(is("bar")));
  assertThat(messages, receivesPayloadThat(is("foo")));
  assertThat(messages, receivesPayloadThat(is("bar")));
}

请注意 receivesPayloadThat(),它是一个静态工具。它来自 MessageQueueMatcher,与 receivesMessageThat() 一起,我们可以用它来对源或处理器应用的输出通道中的传入消息使用任何适当的 Matcher 实现进行断言。

Spring Boot 的 @MockBean@SpyBean 可用于验证与我们的流监听器的交互

@SpyBean
private ToUpperCaseProcessor toUpperCaseProcessor;

@Test
public void testMessages() {
  this.channels.input().send(new GenericMessage<>("foo"));

  BlockingQueue<Message<?>> messages = collector.forChannel(channels.output());

  assertThat(messages, receivesPayloadThat(is("FOO")));

  verify(this.toUpperCaseProcessor, times(1)).transform(anyString());
}

没错,要发送测试数据,只需获取处理器的 input MessageChannel 并构建 Message 对象即可。要将一些用于测试的头部与 payload 一起发送,可以使用 org.springframework.integration.support.MessageBuilder

Message<String> testMessage =
     MessageBuilder.withPayload("headers")
           .setHeader("foo", "bar")
           .build();

input.send(testMessage);

Message<String> expected =
     MessageBuilder.withPayload("HEADERS")
           .copyHeaders(testMessage.getHeaders())
           .build();

Matcher<Message<Object>> sameExceptIgnorableHeaders =
     (Matcher<Message<Object>>) (Matcher<?>) sameExceptIgnorableHeaders(expected);

assertThat(messages, receivesMessageThat(sameExceptIgnorableHeaders));

测试 sink 应用有点棘手,因为它通常是数据管道解决方案中的最后一步。在这里,我们只是将数据发送到目标系统,依赖于特定协议的通道适配器。例如,我们的 JdbcSink 应用使用 Spring Integration 的 JdbcMessageHandler 将传入消息的有效载荷插入到数据库中。因此,要测试 JdbcSink 是否工作正常,我们需要查询数据库。幸运的是,Spring Boot 提供了 DataSource 的自动配置——只需确保目标数据库供应商的驱动依赖项在类路径上即可,例如

<dependency>
  <groupId>org.hsqldb</groupId>
  <artifactId>hsqldb</artifactId>
  <scope>test</scope>
</dependency>

有了 Spring Boot 的自动配置,与数据库交互的集成测试变得直观简单。我们只需注入由 Spring Boot 自动配置的 JdbcTemplate

@Autowired
private Sink channels;

@Autowired
private JdbcTemplate jdbcTemplate;

@Test
public void testMessages() {
  this.channels.input().send(new GenericMessage<>("foo"));
  this.channels.input().send(new GenericMessage<>("bar"));

  List<Map<String, Object>> data =
                    this.jdbcTemplate.queryForList("SELECT * FROM foobar");

  assertThat(data.size()).isEqualTo(2);
  assertThat(data.get(0).get("value")).isEqualTo("foo");
  assertThat(data.get(1).get("value")).isEqualTo("bar");
}

这种基于 Spring Boot 测试框架的方法适用于 JDBC 和 JPA 配置。类似的方法也可以应用于许多其他协议,包括 ActiveMQ、MongoDB、Cassandra、Gemfire、Hazelcast 等等。如果我们的目的无法使用嵌入式、测试范围的服务,我们可以选择 mock 目标协议的 MessageHandler 并断言与其的交互

@MockBean(name = "jdbcHandler")
private MessageHandler jdbcMessageHandler;
…
ArgumentCaptor<Message<?>> messageArgumentCaptor =
     (ArgumentCaptor<Message<?>>) (ArgumentCaptor<?>)
                                          ArgumentCaptor.forClass(Message.class);

verify(this.jdbcMessageHandler, times(2))
                          .handleMessage(messageArgumentCaptor.capture());

Message<?> message = messageArgumentCaptor.getValue();
assertThat(message).hasFieldOrPropertyWithValue("payload", "bar");

另一种验证发送到 sink(或处理器输出)的消息的方法是,将 ChannelInterceptor 注入到消息通道中

AbstractMessageChannel input = (AbstractMessageChannel) this.channels.input();

final AtomicReference<Message<?>> messageAtomicReference =
                                   new AtomicReference<>();

ChannelInterceptor assertionInterceptor = new ChannelInterceptorAdapter() {

  @Override
  public void afterSendCompletion(Message<?> message, MessageChannel channel,
                                          boolean sent, Exception ex) {
     messageAtomicReference.set(message);
     super.afterSendCompletion(message, channel, sent, ex);
  }

};

input.addInterceptor(assertionInterceptor);
input.send(new GenericMessage<>("foo"));

…
Message<?> message1 = messageAtomicReference.get();
assertThat(message1).isNotNull();
assertThat(message1).hasFieldOrPropertyWithValue("payload", "foo");

总结

上述应用的全部代码以及它们的测试当然可以在 Spring Cloud Stream Samples 项目的 testing 模块中找到。请尝试 Spring Boot、Spring Integration 和 Spring Cloud Stream 提供的各种测试工具和技术。请不要犹豫,将这些工具结合起来以获得更全面的测试覆盖。

本文的第二部分将重点介绍 Spring Cloud Stream 应用的集成测试以及事件驱动交互。

最后,请参加我今年 12 月在 Spring One Platform 大会上的演讲,我将在其中介绍 Spring Integration 5.0 中新的测试特性。

订阅 Spring 新闻简报

通过 Spring 新闻简报保持联系

订阅

抢先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一个简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部