Cloud Events 和 Spring - 第 2 部分

工程 | Oleg Zhurakousky | 2020年12月23日 | ...

引言

我们首先快速回顾一下上一篇文章

  • 在 Spring 的上下文中,Message 是一个合适的结构和抽象,用于消费代表 Cloud Event 的数据。我们希望这一点很清楚。
  • 在 Spring 中,我们致力于隔离功能性关注点与非功能性关注点,这使我们能够在框架层面处理非功能性方面(例如发送、接收、重试、连接、转换等),让您(大部分时间)专注于实际的业务逻辑,并使您的代码保持简单且可插入到各种*执行上下文*(稍后会详细介绍)。

业务问题

正如承诺的,这篇文章将更具技术性,因为它涵盖了可供您尝试的具体示例。因此,事不宜迟,我们首先描述将要介绍的三个用例。实际上用例是相同的,但执行上下文不同。

“接收代表待招聘人员的数据,生成员工记录。”

三种不同的变体在于执行上下文(典型非功能性关注点的一个例子)

  • HTTP 请求/响应
  • 从 AMQP 到 Apache Kafka
  • 从 RSocket 到 Apache Kafka。

无论是用例还是执行上下文,它们都不是真正新的或独一无二的。在 Spring 中,我们处理它们已经有几十年了,有成千上万的应用在生产环境中运行。那么,添加 Cloud Event 上下文会改变什么吗?换句话说,如果传入和传出的数据代表一个 Cloud Event,会有什么变化吗?这些正是我们试图在本文中回答的问题。

这些示例的用户代码是

@SpringBootApplication
public static class SampleApplication
  public static void main(String[] args) throws Exception {
    SpringApplication.run(SampleApplication.class, args);
  }

  @Bean
  public Function<Person, Employee> hire() {
    return person -> {
	Employee employee = new Employee(person);
	return employee;
    };
  }
}

是的,这有点无聊,因为它没有展示任何非功能性方面,这些方面由特定于执行上下文的框架处理。我们还将函数的实现细节保持得相当简单,因为它们与主题无关。框架并不关心您做什么。它只关心您期望什么——*输入*——以及您产生什么——*输出*——这些信息可以从函数签名中获取。

用例 1(基于 HTTP)

此示例的完整源代码可在Spring Cloud Function 示例中找到。在此示例中,我们将 Cloud Event 作为 HTTP 请求发送,并期望接收一个 Cloud Event 作为 HTTP 响应。这意味着,我们的 hire() 函数需要以某种方式成为 HTTP 端点。我们可以通过使用Spring Cloud Function框架来实现这一点。通过添加其 spring-cloud-function-web 依赖,我们可以添加将函数转换为 HTTP 端点所需的 Spring Boot 自动配置和组件。配置选项和默认值超出了本文的范围,但您可以从Spring Cloud Function 文档的相关部分获取。重要的是,基于这些默认值,函数名称成为运行在 localhost 端口 8080 上的 URL 路径的一部分,形成 http://localhost:8080/hire 端点。

现在您可以启动应用程序并向其发送请求。应用程序运行后,您可以使用以下命令 curl

curl -w'\n' localhost:8080/hire \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

您应该收到以下响应

. . .
{"person":{"firstName":"John","lastName":"Doe"},"id":172,"message":"Employee 172 was hired on 17-12-2020"}

嗯... 这跟 Cloud Events 一点关系也没有!对吧...?

正确,但框架将函数暴露为 REST 端点、处理类型转换、调用以及其他非功能性方面的能力是显而易见的,并且与 Cloud Events 直接相关。请继续阅读...

这种能力的实现核心是Message——一种结构和类型,它使传入的 HTTP(或任何其他)请求能够采用规范形式,以便其他框架能够以统一的方式处理其内容,而不管其来源或目的地。

但是等等,Cloud Events 呢?

让我们通过添加表示所需 Cloud Event 属性的 HTTP 头,将此 HTTP 请求转换为 Cloud Event。请注意,这些头带有 Cloud Event 规范的HTTP 协议绑定部分要求的 ce- 前缀。

curl -w'\n' localhost:8080/hire \
 -H "ce-id: 0001" \
 -H "ce-specversion: 1.0" \
 -H "ce-type: hire" \
 -H "ce-source: spring.io/spring-event" \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

执行后,您不会看到任何区别。您的函数行为相同,您收到相同的响应。

当然,除非您查看并分析响应头,它们现在包含所需的 Cloud Event 属性(尽管与请求中的不同)

ce-source: https://springframework.org.cn/cloudevent
ce-specversion: 1.0
ce-type: sample
ce-id: 76208faf-f8e5-4267-9028-bb4392d66765
message-type: cloudevent
timestamp: 1608211771624
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 17 Dec 2020 13:29:31 GMT
{"person":{"firstName":"John","lastName":"Doe"},"id":171,"message":"Employee 171 was hired on 17-12-2020"}

但是怎么做到的呢?

这部分我们再次提醒您,我们致力于将非功能性方面外包给框架,因为这就是其中之一。因此,默认情况下(由框架建立),我们假设如果请求是一个 Cloud Event,响应也应该是一个 Cloud Event。您还可以看到,四个必需的 Cloud Event 属性的值也是根据框架建立的某些默认规则生成的。specversion 默认为 1.0type 默认为返回对象的类型名称,id 默认为生成的 UUID(以提供合理安全的唯一性预期),而 source 默认为 https://springframework.org.cn/

但我 不喜欢默认值。我想要自己的值,并且想添加额外的属性?

正如我们在上一篇文章中提到的:*“我们还提供了实用工具、库和配置选项,让您可以影响某些非功能性关注点,因为出于各种原因,这可能仍然是必需的。”* 在这里,您有两种选择。***第一种选择:*** 您可以更改函数签名并返回一个 Message<Employee>,您可以在其中添加额外的元数据(即 Cloud Event 属性)。一旦框架看到您返回了 Message,它就不会尝试对用户添加的元数据进行任何额外处理。这条规则实际上适用于大多数(如果不是全部)依赖 Spring Messaging 的框架。虽然这个选项很简单,但它确实会将非功能性方面泄露到您的业务逻辑中。毕竟,您需要创建一个 Message 实例,您需要添加表示 Cloud Event 属性的头(最好带有正确——规范要求的——属性前缀),等等。但是这个选项最大的缺点是它要求您更改函数签名并将功能性和非功能性方面混在一起,这明显违反了*关注点分离*原则。然而,为了论证起见,这里展示了如何做到这一点

@Bean
public Function<Message<Person>, Message<Employee>> hire() {
  return message -> {
    Person person = message.getPayload();
    Employee employee = new Employee(person);
      return CloudEventMessageBuilder.withData(employee).setId("123456")
	.setSource(URI.create("https://spring.cloudevenets.sample")).build();
  };
}

示例源代码中包含其注释版本。

***第二种选择:*** 您可以提供一个名为 CloudEventHeaderEnricher 的策略的实现,它提供了一个单独的地方,您可以在其中实现为输出生成适当属性和头的逻辑。此策略在框架生成输出 Message 时被调用。以下示例展示了此策略的一种可能实现(在示例中也被注释掉了,所以请取消注释,重启应用程序,然后查看区别)。

@Bean
public CloudEventHeaderEnricher cloudEventEnricher() {
  return messageBuilder -> messageBuilder.setSource("https://springframework.org.cn/cloudevent")
	.setType("sample").setId("987654");
}

在这里,您还可以看到一个可以帮助您构建 Cloud Event 消息的实用类:CloudEventMessageBuilder。它是模仿标准的 Spring MessageBuilder 设计的,但具有 Cloud Event 特定的 setter。然而,这种方法的主要优点是关注点分离。您的业务逻辑(您的功能代码)保持清晰。此外,您仍然需要编写的非功能代码被写在一个单独的地方。

还有一件事... 示例代码假设您只关心 Cloud Event 的 data 部分,并且希望它是 POJO 的形式。但如果不是这样呢?如果您想要 Cloud Event 的完整视图怎么办?或者如果您也想要 Cloud Event 数据的原始形式(即 byte[])怎么办?如前所述,框架从函数的签名中获取指令。因此,通过将输入和输出类型声明为 Message,您实际上是指示框架为您提供整个 Cloud Event(不仅仅是其 data)。此外,通过指定 Message 的泛型类型,您指示框架将 Cloud Event 的 data 部分作为该 Java 类型提供,这实际上是要求它在必要时执行类型转换。所以请尝试以下签名:public Function<Message<byte[]>, Message<Employee>> hire() {...}public Function<byte[], Employee> hire() {...} 或其他。

目前就这些了。README 文件和源代码中的注释也在需要的地方提供了额外的说明。

用例 2(从 AMQP 到 Kafka)

此示例的完整源代码可在Spring Cloud Function 示例中找到。它假设您对 AMQP 和 Apache Kafka 有一定程度的了解。在此示例中,我们使用 RabbitMQ(作为 AMQP 消息代理)和 Apache Kafka。

虽然这个用例可能看起来比上一个更复杂,但本节和下一节(第三个用例)却出奇地短。这是因为上一节解释的所有内容也适用于这里。实际上,我们在这里改变的唯一一件事就是执行上下文。我们通过同样的机制实现这一点:添加相关的基于 Spring Boot 的自动配置。因此,在这种情况下,我们添加了两个自动配置:一个用于 RabbitMQ(AMQP 消息代理)绑定器,另一个用于Spring Cloud Stream框架中提供的 Apache Kafka 绑定器。还有一些额外的应用程序配置(您可以在 application.properties 文件中看到),用于指示框架如何将 hire 函数的输入端绑定到 RabbitMQ(通过 RabbitMQ 绑定器),以及将输出端绑定到 Apache Kafka(通过 Apache Kafka 绑定器)。

假设您已经运行了 RabbitMQ 和 Kafka,请启动应用程序并向 RabbitMQ 发送一条 Message。您可以使用RabbitMQ 控制台(如果已安装)向 hire-in-0 交换机发送消息。
为了符合 Cloud Event 规范,您应该提供带有 AMQP 适当前缀的属性(即 cloudEvents:)。请考虑以下示例

cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001

然后考虑以下数据:{"firstName":"John", "lastName":"Doe"}

为了简化本演示部分,我们包含了一个测试用例,通过将 Cloud Event 发送到 RabbitMQ 并从 Apache Kafka 接收来有效地自动化此演示。

Message<byte[]> messageToAMQP = CloudEventMessageBuilder
	.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
	.setSource("https://cloudevent.demo")
	.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
	.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);

rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .

请注意我们在此如何使用 CloudEventMessageBuilder 仅将 source 设置为 Cloud Event 属性,而其余必需的 Cloud Event 属性则依赖默认值。我们还使用 build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) 来确保属性带有 cloudEvents: 前缀(参见Cloud Events AMQP 协议绑定)。另请注意,在接收端,Cloud Events 属性现在带有 ce_ 前缀(参见Cloud Events Kafka 协议绑定),因为框架确定目标目的地是 Apache Kafka。最后一点值得稍微详细说明。我们已经确定设置 Cloud Event 属性是非功能性方面,因此我们暴露了一种机制,让您可以在业务逻辑之外处理它。但是属性前缀呢?请注意,我们在不同的执行上下文中运行相同的代码。这意味着属性前缀实际上取决于执行上下文。因此,框架通过了解执行上下文来确保 Cloud Event 属性前缀的正确性。

在这里,我们依赖于Spring Cloud Stream框架及其默认设置,例如目标自动配置(Kafka 和 Rabbit)、绑定名称、连接性等。这些默认设置和配置选项的详细信息超出了本文的范围,因为它们与 Cloud Events 无关。有关框架本身及其配置选项的更多详细信息,请参阅Spring Cloud Stream 文档

此外,与上一个示例一样,此示例也包含注释掉的变体,欢迎您进行实验。

用例 3(从 RSocket 到 Kafka)

此示例的完整源代码可在Spring Cloud Function 示例中找到。它假设您对 RSocket 和 Apache Kafka 有一定程度的了解。本节应该比上一节更短,因为它非常相似。但是,这里有一些有趣的变体值得讨论。嗯,最明显的是RSocket。我们引入了一种不同的传递机制。但真正让它更有趣的是,RSocket 没有定义协议绑定。我们可以选择遵循 Kafka、HTTP 或 AMQP 规范之一,或者我们可以以结构化模式传递 Cloud Event,其中整个事件被编码为某种结构(例如 JSON)。

此示例中的一些实现细节也与其他用例不同。然而,这些细节与 Cloud Event 无关。相反,它们是您可以使用的其他机制的演示。例如,我们使用 Consumer 代替 Function,并通过使用 Spring Cloud Stream 框架提供的 StreamBridge 组件手动发送输出消息。

所以,事不宜迟,这是我们的应用程序代码

@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
  return person -> {
    Employee employee = new Employee(person);
    streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
	.setSource("https://springframework.org.cn/rsocket")
	.setId("1234567890")
	.build());
  };
}

请注意我们如何使用 CloudEventMessageBuilder 将输出 Message 生成为 Cloud Event。

我们通过 RSocket 向 hire() 函数发送一个结构化的 Cloud Event 表示形式,编码为 JSON。

String payload = "{\n" +
	"    \"specversion\" : \"1.0\",\n" +
	"    \"type\" : \"org.springframework\",\n" +
	"    \"source\" : \"https://springframework.org.cn/\",\n" +
	"    \"id\" : \"A234-1234-1234\",\n" +
	"    \"datacontenttype\" : \"application/json\",\n" +
	"    \"data\" : {\n" +
	"        \"firstName\" : \"John\",\n" +
	"        \"lastName\" : \"Doe\"\n" +
	"    }\n" +
	"}";

rsocketRequesterBuilder.tcp("localhost", 55555)
	.route("hire")        // target function
	.data(payload).       // data we're sending
	.send()

预期的输出应该与之前的用例类似,因为目标目的地相同。

结论

正如您所见,在 Spring 的上下文中处理 Cloud Events 时,您有多种选择

  • 您可以选择只关心 Cloud Event 的内容,同时完全控制出站 Cloud Event 的外观。
  • 您可以通过 Message 处理 Cloud Event 本身,并依赖提供的实用工具来简化对 Cloud Event 特定数据的访问。
  • 您可以选择执行上下文而不影响您的业务逻辑(用户代码),同时委托框架确保某些 Cloud Event 特性的正确性,例如属性前缀。

这些只是与本文上下文相关的几个选项,但还有更多。

既定和经过验证的模式、实现这些模式的框架以及分层和有主见的 Spring Boot 自动配置使其成为可能。分层很重要,因为它们使您可以将问题分解为可以在存在相同问题的其他项目和集成中重用的解决方案。这有效地使当前的 Cloud Event 集成变得相当简单,因为大多数与 Cloud Event 无关的非功能性方面(即连接、发送、接收、转换、重试等)已经由 Spring Cloud Function 和 Spring Cloud Stream 后面的各个框架解决了。

最后但同样重要的是,还有另一种处理 Cloud Events 和 Spring 的方法,即通过Cloud Events Java SDK,您也可以在其中找到一个示例

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

抢占先机

VMware 提供培训和认证,加速您的进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部