领先一步
VMware 提供培训和认证,以加快您的进度。
了解更多本文是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用。在本篇文章中,我们将研究允许我们将记录索引到 Elasticsearch 的 Elasticsearch 输出以及其对应的消费者函数。
以下是本博客系列的所有先前部分。
在我们查看 Elasticsearch 输出应用程序之前,让我们看看为输出提供动力的消费者函数。正如我们在之前的其他输出应用程序中看到的那样,消费者是一个标准的 `java.util.function.Consumer`,它接受一个 `Message<?>`。消费者依赖于 Spring Boot 对 Elasticsearch 的支持,该支持会自动配置来自 Elasticsearch 的 `RestHighLevelClient`。消费者支持具有以下类型有效负载的消息。
字符串
java.util.Map
使用消费者时,要使用的 Elasticsearch 索引由属性 `elasticsearch.consumer.index` 给出。
您可以通过设置 `INDEX_ID` 消息头来设置要为每条消息使用的 Elasticsearch ID。或者,您可以设置 `elasticsearch.consumer.id` 属性,该属性接受 SpEL 表达式。如果两者都没有设置,Elasticsearch 将自动生成 ID。
通过将属性 `elasticsearch.consumer.async` 设置为 `true`,我们可以使索引操作异步。
我们可以在应用程序中注入消费者函数,并直接调用其 `accept` 方法将记录索引到 ElasticSearch。
例如,假设我们在应用程序中注入消费者 bean,如下所示。
@Autowired
ElasticsearchConsumer elasticsearchConsumer
然后我们可以使用下面的 `java.util.Map` 来索引记录。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();
elasticsearchConsumer.accept(message);
上述地图上的相同信息可以作为纯 `JSON` 或使用 Elasticsearch 的 XContentBuilder 提供。
正如我们在之前的博客中看到的,当与 Spring Cloud Stream 结合使用以使其成为输出应用程序时,消费者函数变得更加强大。它具有以无缝方式与中间件技术通信的固有能力。输出应用程序从 Apache Kafka 或 RabbitMQ 等中间件系统中消费数据,并发送到 Elasticsearch。我们已经为 Kafka 和 RabbitMQ 提供了现成的 Elasticsearch 变体。
让我们逐步了解如何运行 Apache Kafka 的独立 Elasticsearch 输出应用程序。
首先,下载输出应用程序。由于输出目前尚不可用,让我们使用最新的里程碑版本。
wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar
运行应用程序之前,请确保 Elasticsearch 正在运行。以下是在 docker 容器中启动单节点 Elasticsearch 集群的快速方法。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
我们还需要确保 Kafka 正在运行。
然后按如下方式运行应用程序:
java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
--spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing
通过提供输入目标属性,我们要求输出从 Kafka 主题 `data-in` 接收数据并将数据发送到 Elasticsearch 索引 `testing`。
向 Kafka 主题发送一些测试数据。例如,如果您在端口 `9092` 上本地运行 Kafka,则可以使用 Kafka 控制台生产者脚本发送数据。
kafka-console-producer.sh --broker-list localhost:9092 --topic data-in
然后发送以下 `JSON` 数据:
{"key1":"value1"}
我们可以通过调用以下端点来验证数据是否已索引。
curl localhost:9200/testing/_search
类似地,我们还可以下载 RabbitMQ 版本的 Elasticsearch 输出应用程序,并针对 RabbitMQ 集群运行它。
正如我们在本系列中多次看到的那样,当这些 Spring Cloud Stream 应用程序作为 Spring Cloud Data Flow 上的数据管道的一部分运行时,它们会变得更加强大和弹性。
上面看到的 Elasticsearch 可以与许多其他应用程序结合使用。例如,TCP 源应用程序可以从源接收数据,然后将数据转储到中间件目标,Elasticsearch 输出从那里消费和索引数据。然后,分析应用程序可以使用此索引来生成仪表板。这只是一个示例,还有许多此类用例。Spring Cloud Data Flow 使用户可以无缝地编排这些管道。我们鼓励您查看我们在之前的博客中概述的关于如何在 Spring Cloud Data Flow 上部署应用程序的步骤。使用相同的步骤,也可以部署 Elasticsearch 输出应用程序。
在本博客中,我们了解了 Elasticsearch 消费者函数及其相应的 Spring Cloud Stream 输出的工作原理。消费者函数可以注入到自定义应用程序中,以与其他业务逻辑相结合。输出应用程序为 Kafka 和 RabbitMQ 中间件变体提供现成可用。
在本博客系列中,我们还有一些后续文章即将推出。敬请关注。