高效解析响应式缓冲流

工程 | Arjen Poutsma | 2021 年 9 月 14 日 | ...

Spring Framework 5.3 发布已有一段时间。该版本的一个特性是 对响应式 Multipart 支持进行了重大改进。在这篇博文中,我们将分享在开发此特性时获得的一些知识。具体来说,我们将重点介绍如何在字节缓冲流中查找特定标记。

多部分表单数据 (Multipart Form Data)

每当你上传文件时,浏览器会将其(以及表单中的其他字段)作为 multipart/form-data 消息发送到服务器。这些消息的具体格式在 RFC 7578 中有描述。如果你提交一个包含名为 foo 的文本字段和名为 file 的文件选择器的简单表单,multipart/form-data 消息看起来像这样:

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)

--boundary (2)
Content-Disposition: form-data; name="foo" (3)

bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary-- (6)
  1. 消息的 Content-Type 头部包含 boundary 参数。

  2. 边界用于开始第一个部分。它前面加上 --

  3. 第一部分包含文本字段 foo 的值,如部分头部所示。该字段的值是 bar

  4. 边界用于分隔第一部分和第二部分。同样,它前面加上 --

  5. 第二部分包含提交的文件 lorum.txt 的内容。

  6. 消息的结束由边界指示。它前面和后面都加上 --

查找边界

multipart/form-data 消息中的边界非常重要。它被指定为 Content-Type 头部的一个参数。当其前面加上两个连字符 (--) 时,边界表示新部分的开始。当其后面也加上 -- 时,边界表示消息的结束。

在传入的字节缓冲流中查找边界是解析 multipart 消息的关键。这样做看起来非常简单

private int indexOf(DataBuffer source, byte[] target) {
  int max = source.readableByteCount() - target.length + 1;
  for (int i = 0; i < max; i++) {
    boolean found = true;
    for (int j = 0; j < target.length; j++) {
      if (source.getByte(i + j) != target[j]) {
        found = false;
        break;
      }
    }
    if (found) {
      return i;
    }
  }
  return -1;
}

然而,存在一个复杂性:边界可能跨越两个缓冲区,这在响应式环境中可能不会同时到达。例如,给定前面显示的示例 multipart 消息,第一个缓冲区可能包含以下内容

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary"

--boundary
Content-Disposition: form-data; name="foo"

bar
--bou

而下一个缓冲区包含剩余部分

ndary
Content-Disposition: form-data; name="file"; filename="lorum.txt"
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary--

如果我们一次只检查一个缓冲区,就无法找到像这样的分隔边界。相反,我们需要跨越多个缓冲区查找边界。

解决这个问题的一种方法是等待接收到所有缓冲区,将它们 合并,然后定位边界。以下示例使用一个示例流和前面定义的 indexOf 方法来完成此操作

Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
  .map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);

Mono<Integer> result = DataBufferUtils.join(stream)
  .map(joined -> indexOf(joined, boundary));

StepVerifier.create(result)
  .expectNext(6)
  .verifyComplete();

使用 Reactor 的 StepVerifier,我们可以看到边界从索引 6 开始。

这种方法有一个主要的缺点:将多个缓冲区合并成一个,实际上会将整个 multipart 消息存储在内存中。Multipart 消息主要用于上传(大型)文件,因此这不是一个可行的选择。相反,我们需要一种更智能的方法来定位边界。

Knuth 来救援!

幸运的是,Knuth–Morris–Pratt 算法提供了这样一种方法。该算法的核心思想是,如果我们已经匹配了边界的几个字节但下一个字节不匹配,我们无需从头开始。为此,该算法维护一个状态,表现为一个预计算表中的位置,该表包含在不匹配后可以跳过的字节数。

在 Spring Framework 中,我们在 Matcher 接口中实现了 Knuth-Morris–Pratt 算法,您可以通过 DataBufferUtils::matcher 获取其实例。您还可以查看 源代码

在这里,我们使用 Matcher 来获取 boundarystream 中的结束索引,使用与之前相同的示例输入

Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
  .map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);

DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(boundary);
Flux<Integer> result = stream.map(matcher::match);

StepVerifier.create(result)
  .expectNext(-1)
  .expectNext(-1)
  .expectNext(-1)
  .expectNext(3)
  .expectNext(-1)
  .verifyComplete();

请注意,Knuth-Morris–Pratt 算法给出的是边界的结束索引,这解释了测试结果:边界直到倒数第二个缓冲区的索引 3 才结束。

正如预期的那样,Spring Framework 的 MultipartParser 大量使用了 Matcher,用于

如果您需要在字节缓冲流中查找一系列字节,请试试 Matcher

订阅 Spring 资讯

通过 Spring 资讯保持连接

订阅

先行一步

VMware 提供培训和认证,助力您加速前进。

了解更多

获取支持

Tanzu Spring 提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,仅需一个简单的订阅。

了解更多

即将举办的活动

查看 Spring 社区所有即将举办的活动。

查看全部