案例研究:HTTP 请求函数与处理器

工程 | David Turanski | 2020年8月17日 | ...

引言

我们以介绍新的基于 Java 函数的流应用以及函数组合来开始本系列。上一篇提供了一个教程,介绍如何构建简单的流应用并在 Spring Cloud Data Flow 中运行。今天我们探索HTTP 请求函数并提供如何使用它的示例。

如果您错过了,本系列之前的文章如下

HTTP 请求函数

这是遗留的HTTP Client Processor流应用 Starter 的更新实现,基于响应式的 Spring WebClient。该函数是一个通用的 web 客户端,向 URL 提交 HTTP 请求并返回响应。它主要为流应用设计,能够使用根据每条输入 Message 评估的配置的SpEL 表达式来提取 URL、HTTP 方法、请求体、期望的响应类型和内容。此外,为了支持高效的流处理,该函数使用了 reactive streams。其签名是

Function<Flux<Message>, Flux>

>>也就是说,它接受一个Flux (流) 的 Messages,并返回一个任何类型的 Flux。

配置属性

HttpRequestFunction 通过以下配置属性进行配置

http.request.body-expression
用于从输入消息中派生请求体的 SpEL 表达式。(表达式,默认值)

http.request.expected-response-type
用于解释响应的类型。(Class<?>,默认值:String)

http.request.headers-expression
用于派生要使用的 http headers map 的 SpEL 表达式。(表达式,默认值)

http.request.http-method-expression
用于从输入消息中派生请求方法的 SpEL 表达式。(表达式,默认值:GET)

http.request.maximum-buffer-size
为输入流缓冲区分配的最大缓冲区大小(以字节为单位)。默认为 256k。对于 posting 或 getting 大型二进制内容,如有必要,请增大此值。(Integer,默认值:256 * 1024)

http.request.reply-expression
用于计算最终结果的 SpEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:ResponseEntity::getBody)

http.request.timeout
请求超时时间(毫秒)。(Long,默认值:30000)

http.request.url-expression
根据输入消息确定要使用的 URL 的 SpEL 表达式。(表达式,默认值)

SpEL 表达式应用于输入 Message。因此,诸如 bodyheaders[name] 之类的字段可用于评估消息内容。我说“可以…”是因为有时使用静态值更可取。在这种情况下,字面量值必须用单引号括起来,例如

http.request.url-expression='https://start.spring.io' http.request.http-method-expression='POST'

示例 1:在独立应用中使用 HTTP 请求函数

让我们看看如何在简单的 Spring Boot Web 应用中使用此函数的示例。在此示例中,我们将在一个应用中使用它,该应用从 URL 检索图像并渲染图像的缩略图。此示例的完整代码在此处

我们将使用 Spring Boot 和 Spring Web Flux 构建此应用,同时使用我们的函数来检索图像,以及一些用于生成缩略图的代码。

相关的依赖项是

  • org.springframework.cloud.fn:http-request-function - HTTP 请求函数间接包含 spring-boot-starter-webflux

  • io.spring.example:image-thumbnail-processor - 一个简单的 Java 函数,包含在此示例中,用于创建缩略图。我们在此不深入探讨细节,只需注意它是一个独立的组件,我们将在后面的示例中重用它。

我们首先需要为我们的函数设置一些配置属性

http.request.url-expression=payload http.request.expected-response-type=byte[] http.request.maximum-buffer-size=2097152

因此,消息有效载荷包含目标 URL,图像(响应体)将以字节数组形式返回。并且由于这些图像可能相当大,我们将把存放响应体的缓冲区大小增加到 2MB (2 * 1024 * 1024)。

代码如下

@SpringBootApplication
@Controller
@Import(HttpRequestFunctionConfiguration.class)
public class ThumbnailStandaloneApplication {
  private static Logger logger = LoggerFactory.getLogger(ThumbnailStandaloneApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(ThumbnailStandaloneApplication.class, args);
  }

  private ThumbnailProcessor thumbnailProcessor = new ThumbnailProcessor();

  @Autowired
  private HttpRequestFunction httpRequestFunction;

  @Bean
  RouterFunction<?> routes() {
    return RouterFunctions.route()
        .GET("/thumbnail", this::createThumbnail)
        .build();
  }

  private Mono<ServerResponse> createThumbnail(ServerRequest serverRequest) {
    String url = serverRequest.queryParam("url").orElseThrow(
                           () -> new RuntimeException("URL required"));

    return Mono.from(httpRequestFunction.apply(Flux.just(new GenericMessage<>(url)))
        .flatMap(image -> {
          Map<String, Object> model = new HashMap<>();
          byte[] thumbnail = thumbnailProcessor.apply((byte[]) image);
          logger.info("creating thumbnail for {}", url);
          model.put("url", url);
          model.put("thumb", new String(Base64.getEncoder().encode(thumbnail)));
          Mono<ServerResponse> serverResponse = ServerResponse.ok()
              .render("thumbnail", model);
          return serverResponse;
        }));
  }

我们应用 HttpRequestFunction 来检索图像。然后我们将 thumbnailProcessor 应用于返回的字节数组,并将其编码为 base 64,以便可以在页面上渲染。

standalone

示例 2:在流应用中使用 HTTP 请求处理器

既然我们知道了函数的工作原理,让我们使用 Spring Cloud Stream 构建一个类似的流应用。在此示例中,我们将使用预打包的HTTP 请求处理器File Source流应用。此处理器将 HTTP 请求函数包装在 Spring Cloud Stream 处理器应用中,该应用只是简单地调用该函数,将输入和输出绑定到消息代理目标(例如,Kafka 主题或 Rabbit MQ 交换)。我们的应用,用流定义 DSL 表示,看起来像

file-source | http-request-processor | image-thumbnail-sink

其中 | 表示使用消息代理进行 I/O。

在这里,我们使用了一个用户开发的 sink,它使用file-consumer函数将每个缩略图写入文件。该 sink 使用 Spring Cloud Function 的声明性组合,将上一示例中的thumbnail-processor与一个 header enricher 以及最后的标准fileConsumer进行组合。因此,我们组合的函数定义为

spring.cloud.function.definition=thumbnailProcessor|filenameEnricher|fileConsumer

application.properties中。

我们的复合函数定义在概念和语法上与上述流定义相似。但在此示例中,| 表示进程内通信。

我们将在未来的文章中探讨 File Source 的详细信息。目前,我们将使用它来轮询源目录,并在目录中添加新文件时生成消息。在此示例中,我们想要处理一个文本文件,其中每行包含一个图像 URL。我们将配置源以每行生成一条消息,消息的有效载荷中包含 URL。我们已经知道 HTTP 请求处理器做什么。sink 生成缩略图并将其写入文件。

完整配置的流定义如下

file-source --file.consumer.mode=lines --file.consumer.mode=lines --file.supplier.directory=| http-request-processor --http.request.url-expression=payload --http.request.expected-response-type=byte[] --http.request.maximum-buffer-size=2097152| image-thumbnail-sink --file.consumer.directory=

如果我们运行此应用并将一个文本文件放入源目录,我们将看到缩略图写入目标目录

thumbnail files

如果您想在本地机器上运行此应用,完整的说明在此处

总结

我们刚才深入探讨了 HTTP 请求函数,演示了如何在独立 Web 应用和流式管道中使用它来处理图像。我们还使用了函数组合,将用户编写的函数与现成的函数进行组合,取得了很好的效果。

敬请关注…​

在接下来的几周,我们将展示更多关于 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每个案例都将重点介绍不同的流应用和功能。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速前进。

了解更多

获取支持

Tanzu Spring 通过一份简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区的所有即将到来的活动。

查看全部