案例研究:HTTP 请求函数和处理器

工程 | David Turanski | 2020年8月17日 | ...

简介

我们从介绍新的基于 Java 函数的流应用程序函数组合开始这个系列。在上一篇文章中,我们提供了一个构建简单的流应用程序并在 Spring Cloud Data Flow 中运行它的教程。今天,我们探讨HTTP 请求函数,并提供一些使用示例。

如果您错过了本系列中的先前文章,以下是链接:

HTTP 请求函数

这是基于响应式 Spring WebClient 的旧版HTTP 客户端处理器流应用程序启动器的更新实现。该函数是一个通用的 Web 客户端,它向 URL 提交 HTTP 请求并返回响应。它主要用于流应用程序,能够使用针对每个传入消息计算的配置的SpEL 表达式提取 URL、HTTP 方法、请求体、所需的响应类型和内容。此外,为了支持高效的流处理,该函数使用响应式流。其签名为

Function<Flux<Message>, Flux>

也就是说,它接受一个Flux(流)的消息并返回任何类型的 Flux。

配置属性

HttpRequestFunction 通过以下配置属性进行配置

http.request.body-expression
一个 SpEL 表达式,用于从传入的消息中派生请求体。(表达式,默认值)

http.request.expected-response-type
用于解释响应的类型。(Class<?>,默认值:String)

http.request.headers-expression
一个 SpEL 表达式,用于派生要使用的 http 标头映射。(表达式,默认值)

http.request.http-method-expression
一个 SpEL 表达式,用于从传入的消息中派生请求方法。(表达式,默认值:GET)

http.request.maximum-buffer-size
为输入流缓冲区分配的以字节为单位的最大缓冲区大小。默认为 256k。根据需要增加,用于发布或获取大型二进制内容。(整数,默认值:256 * 1024)

http.request.reply-expression
一个 SpEL 表达式,用于计算最终结果,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:ResponseEntity::getBody)

http.request.timeout
请求超时时间(毫秒)。(长整型,默认值:30000)

http.request.url-expression
针对传入消息的 SpEL 表达式,用于确定要使用的 URL。(表达式,默认值)

SpEL 表达式应用于传入的消息。因此,可以使用诸如 bodyheaders[name] 之类的字段来计算消息内容。我说“可以…”,因为有时使用静态值更可取。在这种情况下,文字值必须用单引号括起来,例如

http.request.url-expression='https://start.spring.io' http.request.http-method-expression='POST'

示例 1:在独立应用程序中使用 HTTP 请求函数

让我们看看如何在简单的 Spring Boot Web 应用程序中使用此函数的示例。在此示例中,我们将在一个从 URL 检索图像并呈现图像缩略图的应用程序中使用它。此示例的完整代码在此处

我们将使用 Spring Boot 和 Spring Web Flux 以及我们的函数来检索图像,以及一些生成缩略图的代码来构建应用程序。

相关依赖项为

  • org.springframework.cloud.fn:http-request-function - HTTP 请求函数传递地包含 spring-boot-starter-webflux

  • io.spring.example:image-thumbnail-processor - 一个简单的 Java 函数,包含在此示例中,用于创建缩略图。我们在这里不会详细介绍,只需注意它是一个我们将稍后在另一个示例中重用的独立组件。

首先,我们需要为我们的函数设置一些配置属性

http.request.url-expression=payload http.request.expected-response-type=byte[] http.request.maximum-buffer-size=2097152

因此,消息有效负载包含目标 URL,图像(响应体)将作为字节数组返回。并且由于这些图像可能相当大,因此我们将持有响应体的缓冲区大小增加到 2GB(2 * 1024 * 1024)。

以下是代码

@SpringBootApplication
@Controller
@Import(HttpRequestFunctionConfiguration.class)
public class ThumbnailStandaloneApplication {
  private static Logger logger = LoggerFactory.getLogger(ThumbnailStandaloneApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(ThumbnailStandaloneApplication.class, args);
  }

  private ThumbnailProcessor thumbnailProcessor = new ThumbnailProcessor();

  @Autowired
  private HttpRequestFunction httpRequestFunction;

  @Bean
  RouterFunction<?> routes() {
    return RouterFunctions.route()
        .GET("/thumbnail", this::createThumbnail)
        .build();
  }

  private Mono<ServerResponse> createThumbnail(ServerRequest serverRequest) {
    String url = serverRequest.queryParam("url").orElseThrow(
                           () -> new RuntimeException("URL required"));

    return Mono.from(httpRequestFunction.apply(Flux.just(new GenericMessage<>(url)))
        .flatMap(image -> {
          Map<String, Object> model = new HashMap<>();
          byte[] thumbnail = thumbnailProcessor.apply((byte[]) image);
          logger.info("creating thumbnail for {}", url);
          model.put("url", url);
          model.put("thumb", new String(Base64.getEncoder().encode(thumbnail)));
          Mono<ServerResponse> serverResponse = ServerResponse.ok()
              .render("thumbnail", model);
          return serverResponse;
        }));
  }

我们应用 HttpRequestFunction 来检索图像。然后我们将 thumbnailProcessor 应用于返回的字节数组并将其编码为 base 64,以便我们可以在页面上呈现它。

standalone

示例 2:在流应用程序中使用 HTTP 请求处理器

既然我们知道了函数的工作原理,那么让我们使用 Spring Cloud Stream 构建一个流应用程序来执行类似的操作。在这种情况下,我们将使用预打包的HTTP 请求处理器文件源流应用程序。此处理器将 HTTP 请求函数包装在 Spring Cloud Stream 处理器应用程序中,该应用程序只需调用该函数,并将输入和输出绑定到消息代理目标(例如 Kafka 主题或 Rabbit MQ 交换机)。我们的应用程序用流定义 DSL 表示,如下所示:

file-source | http-request-processor | image-thumbnail-sink

其中 | 表示使用消息代理进行 I/O。

这里,我们使用了一个用户开发的 Sink,它使用file-consumer 函数将每个缩略图写入文件。该 Sink 使用 Spring Cloud Function 的声明式组合,将前一个示例中的thumbnail-processor 与一个头部增强器组合,最后是标准的fileConsumer。因此,我们组合后的函数由以下定义:

spring.cloud.function.definition=thumbnailProcessor|filenameEnricher|fileConsumer

application.properties 中定义。

我们的复合函数定义在概念上和语法上都类似于上面的流定义。但在这里,| 表示进程内通信。

我们将在以后的文章中详细探讨文件源的内部工作原理。现在,我们将使用它来轮询源目录,并在新文件添加到目录时生成消息。在本例中,我们希望处理一个文本文件,每行包含一个图像 URL。我们将配置源以每行生成一条消息,消息有效负载包含 URL。我们已经知道 HTTP 请求处理器的工作原理。Sink 生成缩略图并将其写入文件。

完全配置的流定义如下:

file-source --file.consumer.mode=lines --file.consumer.mode=lines --file.supplier.directory=| http-request-processor --http.request.url-expression=payload --http.request.expected-response-type=byte[] --http.request.maximum-buffer-size=2097152| image-thumbnail-sink --file.consumer.directory=

如果我们运行此程序并将文本文件放入源目录,我们将看到缩略图被写入目标目录。

thumbnail files

如果您想在本地机器上运行此程序,完整的说明在这里

总结

我们刚刚深入研究了 HTTP 请求函数,演示了如何在独立的 Web 应用程序和流式管道中使用它来处理图像。我们还使用了函数组合,将用户编写的函数和现成的函数组合起来,取得了良好的效果。

敬请期待…​

在接下来的几周里,我们将展示更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每个案例研究都将重点介绍不同的流应用程序和功能。

获取 Spring 时事通讯

与 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部