Spring Cloud Stream - 事件路由

工程 | Oleg Zhurakousky | 2019 年 10 月 31 日 | ...

欢迎阅读关于 Spring Cloud Stream (SCSt) 新特性系列的另一篇文章。在之前的文章中(可在此处获取:此处此处此处),我们试图解释我们在 Spring Cloud Stream (SCSt) 中转向函数式编程模型的原因。它代码量更少、配置更少,而且您的代码完全独立于 SCSt 的内部实现。

今天,我们将讨论使用函数进行路由。在 SCSt 的上下文中,路由是指能够 a) 将事件路由到特定的事件订阅者b) 将事件订阅者产生的事件路由到特定的目标。为了更好地理解上下文,让我们快速回顾一下基于注解的编程模型的工作方式。在本文中,我们将这种方式称为路由 'TO' 和路由 'FROM'。

对于路由 TO 事件订阅者,我们使用了 StreamListener 注解的 condition 属性,如下所示

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

此处提供了有关此方法的更多详细信息。

而对于路由 FROM 事件订阅者,我们使用了动态绑定目标——这种方法允许框架根据每个事件中提供的某些指令绑定到目标。

使用函数进行事件路由

通过函数式方法,我们可以以更简洁的方式实现上述所有功能,并附带一些额外特性。

路由 TO

路由 'TO' 函数可以通过依赖 Spring Cloud Function (SCF) 中可用的路由函数特性来实现。您可以通过设置 spring.cloud.stream.function.routing.enabled 属性来显式启用路由,或者通过设置 spring.cloud.function.routing-expression 属性并使用 Spring Expression Language (SpEL) 提供路由指令来隐式启用。路由指令应生成要路由 'TO' 的函数的定义。出于绑定的目的,路由目标的名称是 functionRouter-in-0(请参阅 RoutingFunction.FUNCTION_NAME此处描述的绑定命名约定)。

当消息被发送到此目标时,路由函数会尝试确定哪个实际函数需要处理此事件。它首先尝试访问 spring.cloud.function.routing-expression 消息头,如果提供,则确定要调用的实际函数的名称。这是最具动态性的方法。次动态的方法是提供一个 spring.cloud.function.definition 头,该头应包含要路由 'TO' 的函数的定义。这两种方法都需要通过设置 spring.cloud.stream.function.routing.enabled 属性来显式启用路由函数。

尽管微不足道,但 spring.cloud.function.routing-expression 也可以用作应用程序属性,这是一种额外的特性,在以前的版本中不可用。例如,考虑表达式与传入事件无关的情况,就像本文前面显示的基于注解的示例中一样(例如,spring.cloud.function.routing-expression=headers['type']=='order')。对于这种方法,您无需显式启用路由函数,因为将 spring.cloud.function.routing-expression 作为应用程序属性具有相同的效果。

虽然微不足道,但以下是上述方法之一的完整示例

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

通过向由绑定器(即 rabbit 或 kafka)公开的 functionRouter-in-0 目标发送消息,该消息将根据消息处理时 nanoTime() 的值被路由到适当的(“偶数”或“奇数”)Consumer Bean。

路由 FROM

和之前一样,路由 'FROM' 依赖于 SCSt 的动态绑定目标特性。然而,与路由 'TO' 一样,还有许多额外的特性。

以下示例展示了基础知识

@Autowired
private BinderAwareChannelResolver resolver;

public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . . // your code
     destination.send(outgoingMessage);
}

您只需要一个对 BinderAwareChannelResolver 的引用(在下面的示例中自动装配)。然后,您可以使用一些逻辑来确定目标名称(在我们的示例中,我们使用 'type' 头部的值)。一旦确定了目标名称,您就可以使用 BinderAwareChannelResolver.resolveDestination(..) 操作获取对其的引用并向其发送消息。这就是全部所需。

上述方法的缺点是一些特定于框架的抽象会渗透到您的代码中。例如,您需要了解 BinderAwareChannelResolverMessageChannel 等等。事实上,上面示例中的大部分代码都是样板代码。

一种更动态且更少渗透的方法是依赖 spring.cloud.stream.sendto.destination 属性,它有效地在幕后完成了上述所有操作。以下示例展示了如何使用这种方法

@SpringBootApplication
public class RoutingStreamApplication {

  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
		.withPayload("Hello")
		.setHeader("spring.cloud.stream.sendto.destination", "even")
		.build();
       return outgoingMessage;
     };
  }
}

我们不再需要注入 BinderAwareChannelResolver、执行 MessageChannel 的解析等等。我们只需创建一个新的 Message,其中指定了框架用于动态解析目标的头部。

路由源

最后但同样重要的一点,让我们看看另一个路由 'FROM' 的常见用例,即数据源来自 SCSt 上下文之外,但需要路由到适当的目标

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;

    private final EmitterProcessor<?> processor = EmitterProcessor.create();

    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get("id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader("spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

然后我们可以通过运行以下 curl 命令看到结果

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080

在这里,我们使用了函数式方法和响应式范例的一点点润色,这得益于 Supplier<Flux<?>> bean。我们有一个简单的 MVC 控制器,我们想根据内容中 'id' 属性的值将请求向下游路由。虽然 EmitterProcessor 的详细信息及其在此处的用法是另一篇文章的主题,但重要的是它展示了一个完整的函数式应用程序,其中 HTTP 请求动态路由到由目标绑定器管理的目标。

注意:截至本文撰写时,参考文档正在积极更新以支持即将发布的 SCSt 3.0.0.RELEASE 版本,但您始终可以使用参考文档的源代码来获取最新信息。

请访问 GitHub 上的 Spring Cloud Stream

此外,本系列先前的博客文章

- Spring Cloud Stream - 函数式和响应式

获取 Spring 新闻通讯

订阅 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 通过简单的订阅,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区所有即将到来的活动。

查看全部