案例研究:Elasticsearch 输出

工程 | Soby Chacko | 2020年11月16日 | ...

本文是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用。在本篇文章中,我们将研究允许我们将记录索引到 Elasticsearch 的 Elasticsearch 输出以及其对应的消费者函数。

以下是本博客系列的所有先前部分。

Elasticsearch 消费者

在我们查看 Elasticsearch 输出应用程序之前,让我们看看为输出提供动力的消费者函数。正如我们在之前的其他输出应用程序中看到的那样,消费者是一个标准的 `java.util.function.Consumer`,它接受一个 `Message<?>`。消费者依赖于 Spring Boot 对 Elasticsearch 的支持,该支持会自动配置来自 Elasticsearch 的 `RestHighLevelClient`。消费者支持具有以下类型有效负载的消息。

使用消费者时,要使用的 Elasticsearch 索引由属性 `elasticsearch.consumer.index` 给出。

您可以通过设置 `INDEX_ID` 消息头来设置要为每条消息使用的 Elasticsearch ID。或者,您可以设置 `elasticsearch.consumer.id` 属性,该属性接受 SpEL 表达式。如果两者都没有设置,Elasticsearch 将自动生成 ID。

通过将属性 `elasticsearch.consumer.async` 设置为 `true`,我们可以使索引操作异步。

我们可以在应用程序中注入消费者函数,并直接调用其 `accept` 方法将记录索引到 ElasticSearch。

例如,假设我们在应用程序中注入消费者 bean,如下所示。

@Autowired
ElasticsearchConsumer elasticsearchConsumer

然后我们可以使用下面的 `java.util.Map` 来索引记录。

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();

elasticsearchConsumer.accept(message);

上述地图上的相同信息可以作为纯 `JSON` 或使用 Elasticsearch 的 XContentBuilder 提供。

Elasticsearch 输出

正如我们在之前的博客中看到的,当与 Spring Cloud Stream 结合使用以使其成为输出应用程序时,消费者函数变得更加强大。它具有以无缝方式与中间件技术通信的固有能力。输出应用程序从 Apache Kafka 或 RabbitMQ 等中间件系统中消费数据,并发送到 Elasticsearch。我们已经为 Kafka 和 RabbitMQ 提供了现成的 Elasticsearch 变体。

让我们逐步了解如何运行 Apache Kafka 的独立 Elasticsearch 输出应用程序。

首先,下载输出应用程序。由于输出目前尚不可用,让我们使用最新的里程碑版本。

wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar

运行应用程序之前,请确保 Elasticsearch 正在运行。以下是在 docker 容器中启动单节点 Elasticsearch 集群的快速方法。

docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

我们还需要确保 Kafka 正在运行。

然后按如下方式运行应用程序:

java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
 --spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing

通过提供输入目标属性,我们要求输出从 Kafka 主题 `data-in` 接收数据并将数据发送到 Elasticsearch 索引 `testing`。

向 Kafka 主题发送一些测试数据。例如,如果您在端口 `9092` 上本地运行 Kafka,则可以使用 Kafka 控制台生产者脚本发送数据。

kafka-console-producer.sh --broker-list localhost:9092 --topic data-in

然后发送以下 `JSON` 数据:

{"key1":"value1"}

我们可以通过调用以下端点来验证数据是否已索引。

curl localhost:9200/testing/_search

类似地,我们还可以下载 RabbitMQ 版本的 Elasticsearch 输出应用程序,并针对 RabbitMQ 集群运行它。

输出的进一步使用

正如我们在本系列中多次看到的那样,当这些 Spring Cloud Stream 应用程序作为 Spring Cloud Data Flow 上的数据管道的一部分运行时,它们会变得更加强大和弹性。

上面看到的 Elasticsearch 可以与许多其他应用程序结合使用。例如,TCP 源应用程序可以从源接收数据,然后将数据转储到中间件目标,Elasticsearch 输出从那里消费和索引数据。然后,分析应用程序可以使用此索引来生成仪表板。这只是一个示例,还有许多此类用例。Spring Cloud Data Flow 使用户可以无缝地编排这些管道。我们鼓励您查看我们在之前的博客中概述的关于如何在 Spring Cloud Data Flow 上部署应用程序的步骤。使用相同的步骤,也可以部署 Elasticsearch 输出应用程序。

结论

在本博客中,我们了解了 Elasticsearch 消费者函数及其相应的 Spring Cloud Stream 输出的工作原理。消费者函数可以注入到自定义应用程序中,以与其他业务逻辑相结合。输出应用程序为 Kafka 和 RabbitMQ 中间件变体提供现成可用。

在本博客系列中,我们还有一些后续文章即将推出。敬请关注。

获取 Spring 新闻通讯

关注 Spring 新闻通讯

订阅

领先一步

VMware 提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部