Spring Cloud Stream 2.0 - 内容类型协商与转换

工程 | Oleg Zhurakousky | 2018 年 2 月 26 日 | ...

这是为 Spring Cloud Stream 2.0.0.RELEASE 做准备的一系列预发布博客中的第一篇。

前言

Spring Cloud Stream 2.0 对基于通道的绑定器的内容类型协商进行了彻底改造,以解决性能、灵活性以及最重要的一致性问题。以下博客将重点介绍已完成的工作、预期结果以及它如何帮助您。

引言

数据转换是任何消息驱动微服务架构的核心功能之一。在 Spring Cloud Stream 中,此类数据表示为 Spring Message

在消息流(即流)的各个点,消息可能需要转换为所需的形状/大小才能到达其目的地。这有两个原因

1. 将传入消息的内容线路格式转换为匹配应用程序提供的处理器的签名。 2. 将传出消息的内容转换为下一个处理器(如果存在内部流程)的签名,或转换回线路格式。

线路格式通常是 byte[],并由绑定器实现控制。

在 Spring Cloud Stream Message 中,转换通过 org.springframework.messaging.converter.MessageConverter 抽象完成。

以下步骤序列展示了典型的消息流以及 Message 所经历的转换,使用 Spring Cloud Stream 的 Processor 契约进行描述,本质上涵盖了入站出站内容转换背后的要求。

1. 从绑定器接收线路格式的 Spring Message 2. 确保在 Spring Message 中设置了入站 contentType 头部 3. 将 Spring Message 线路格式转换为应用程序提供的 MessageHandler 的签名 4. 调用应用程序提供的 MessageHandler 5. 将 MessageHandler 的返回值转换回 Spring Message 6. 确保在 Spring Message 中设置了出站 contentType 头部 7. 将 Spring Message 转换回线路格式 8. 将线路格式的 Spring Message 发送回绑定器

虽然上述内容完整总结了典型消息流中的主要状态变化,但魔鬼总是藏在细节中,所以让我们更仔细地查看每个步骤。

细节

  1. 传入消息由绑定器接收,并以线路格式发送到绑定器的输入通道(例如,Processor.INPUT')。
  2. 内部输入通道预配置了通道拦截器,仅当传入消息尚未设置 contentType 头部时,才为传入消息注入 contentType 头部。这是必需的,以确保在需要时,下游消息转换可以考虑 contentType(稍后详细介绍)。注入的 contentType 来自每个单独目的地绑定的内容类型设置,其中 application/json 是默认内容类型。

例如,'spring.cloud.stream.bindings.myInput.content-type=text/plain' 为 'myInput'(传入)目的地绑定设置内容类型为 'text/plain'。这意味着除非消息已包含 'contentType' 头部,否则每个传入消息都将注入 'contentType=text/plain' 头部。 换句话说,头部提供的 contentType 优先于每个绑定设置的 contentType。 3. 现在,借助 HandlerMethodArgumentResolvers 和预配置或用户提供的 MessageConverters,传入消息将转换为应用程序提供的 MessageHandler 的签名(例如,public Text process(Foo foo){..})。此类处理方法通常使用 @StreamListener@ServiceActivator@Transformer 等注解之一进行标注。在某些转换器可能需要 contentType 的地方,步骤 2 中的操作保证了此类消息始终可以通过其 contentType 头部获取它。当然,如果此类方法将其输入参数设置为 Message,则不执行转换。 4. 调用处理方法,并在成功后,从处理方法的返回值创建传出消息的过程开始(假设处理方法非 void)。 5. 仅当返回的值不是 Message 时,才将处理方法的返回值转换回 Spring Message。这意味着会创建一个新的 Spring Message,其有效载荷是处理方法的返回值。传入消息的头部会复制到新的传出消息中,同时剥离由 'SpringIntegrationProperties.messageHandlerNotPropagatedHeaders' 标识的任何头部。默认情况下,这里只设置了一个头部 - contentType。这意味着创建的新传出消息未设置 contentType 头部。这是为了确保 contentType 可以随着实际数据的应用程序级转换而演变。 注意:仅当处理方法返回非 Message 时,才剥离 contentType 消息将发送到绑定器的输出通道。 6. 与绑定器的输入通道类似,绑定器的输出通道(例如,Processor.OUTPUT)也预配置了通道拦截器。在这里,我们选择性地将 contentType 头部注入到传出消息中,为将传出消息的内容转换回线路格式做准备。我们来看仅有的两种可能情况: a. 传出消息已设置 contentType 头部。由于头部设置的 contentType 优先于任何其他 contentType,因此不会执行 contentType 注入,并且在转换回线路格式时将使用头部设置的 contentType 的值。 b. 传出消息未设置 contentType 头部。绑定 contentType(默认或提供)将作为头部注入到传出消息中,并在转换回线路格式时使用。 7. 使用可用的 MessageConverters 之一将消息转换为线路格式。 8. 转换后的消息被发送回绑定器,同时保留注入的或已有的 contentType 头部。换句话说,传出消息将始终存在 contentType 头部。

定制

上述内容涵盖了默认的开箱即用行为。但这可能还不够,所以我们能否以及如何进行定制?。2.0 中内容类型协商改进的目标不仅是回答这些类型的问题,还要确保答案是一致的——入站出站通道拦截器用于转换为/从线路格式转换的 'MessageConverters' 与 'HandlerMethodArgumentResolvers' 用于转换为/从强类型转换的 'MessageConverters' 是相同的。

要添加自定义的 MessageConverter,只需创建一个 org.springframework.messaging.converter.MessageConverter 的实现,并将其配置为 @Bean,同时将该 bean 注解为 @StreamMessageConverter,它将作为现有 MessageConverters 堆栈中的第一个转换器添加,本质上优先于现有的 MessageConverters

总结

希望现在已经相当清楚,任何和所有内容类型转换都是由 MessageConverters 完成的。虽然 MessageConverters 的实现方式不同,但大多数都同时利用 contentType 头部和目标类型(targetClass),这使得它们可以执行类型内部转换以及转换为/从线路格式转换。目前,有一套预配置的 MessageConverters 来支持大多数用例,因此对于大多数典型数据类型(即 json、文本等),最终用户实际上无需做任何事情。然而,了解当前的工作方式以及如何定制是值得的——定制现有实现和/或引入新的 MessageConverter 实现

结论

我们目前正在更新文档,其中将包含有关此主题以及与 2.0 相关工作的更多详细信息和示例,而这些预发布博客的主要目标是提高认识、促进“试一试”并征集反馈。话虽如此,Spring Cloud Stream 2.0.0.RC1 可在此处获取。

我们鼓励您使用以下任一方式提供反馈

尽情享受!

订阅 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速发展。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单订阅。

了解更多

即将举行的活动

查看 Spring 社区的所有即将举行的活动。

查看全部