先行一步
VMware 提供培训和认证,助力您快速进步。
了解更多本文是探讨基于 Java Functions 的重新设计的 Spring Cloud Stream 应用系列博客的一部分。在本文中,我们将介绍 Elasticsearch sink,它允许我们在 Elasticsearch 中索引记录,以及它对应的 Consumer 函数。
以下是本系列博客的所有先前部分。
在我们了解 Elasticsearch sink 应用之前,先来看看驱动该 sink 的 consumer 函数。正如我们在之前的其他 sink 应用中看到的那样,consumer 是一个标准的 java.util.function.Consumer
,它接受一个 Message<?>
。该 consumer 依赖于 Spring Boot 对 Elasticsearch 的支持,Spring Boot 会自动配置一个来自 Elasticsearch 的 RestHighLevelClient
。该 consumer 支持以下载荷类型的消息。
String
java.util.Map
来自 Elasticsearch 的 XContentBuilder。
使用该 consumer 时,要使用的 Elasticsearch 索引由属性 elasticsearch.consumer.index
指定
您可以通过设置 INDEX_ID
消息头来设置要用于每条消息的 Elasticsearch ID。另外,您也可以设置 elasticsearch.consumer.id
属性,该属性接受一个 SpEL 表达式。如果两者都没有设置,Elasticsearch 将自动生成一个 ID。
通过将属性 elasticsearch.consumer.async
设置为 true
,我们可以使索引操作异步化。
我们可以将 consumer 函数注入到应用中,并直接调用其 accept
方法将记录索引到 ElasticSearch。
例如,假设我们在应用中注入 consumer bean,如下所示。
@Autowired
ElasticsearchConsumer elasticsearchConsumer
然后我们可以使用以下 java.util.Map
来索引一条记录。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();
elasticsearchConsumer.accept(message);
上面的 map 中的信息可以作为纯文本 JSON
提供,或者使用来自 Elasticsearch 的 XContentBuilder 提供。
正如我们在之前的博客中看到的那样,当 consumer 函数与 Spring Cloud Stream 结合使其成为一个 sink 应用时,它会变得更加强大。它具有无缝地与中间件技术通信的固有能力。该 sink 应用从 Apache Kafka 或 RabbitMQ 等中间件系统消费数据并发送到 Elasticsearch。我们已经为 Kafka 和 RabbitMQ 提供了开箱即用的 Elasticsearch 变体。
让我们看看运行 Apache Kafka 独立 Elasticsearch sink 应用的步骤。
首先,下载 sink 应用。由于该 sink 尚未正式发布,我们使用最新的里程碑版本。
wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar
在运行应用之前,请确保您已运行 Easticsearch。这里提供一个在 docker 容器中启动单节点 Elasticsearch 集群的快速方法。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
我们还需要确保 Kafka 正在运行。
然后如下运行应用
java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
--spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing
通过提供 input destination 属性,我们要求该 sink 从 Kafka 主题 data-in
接收数据并将其发送到 Elasticsearch 索引 testing
。
向 Kafka 主题发送一些测试数据。例如,如果 Kafka 在本地端口 9092
运行,您可以使用 Kafka 控制台生产者脚本发送数据,如下所示。
kafka-console-producer.sh --broker-list localhost:9092 --topic data-in
然后发送以下 JSON
数据。
{"key1":"value1"}
我们可以通过调用以下端点来验证数据是否已被索引。
curl localhost:9200/testing/_search
类似地,我们也可以下载 Elasticsearch sink 应用的 RabbitMQ 变体,并在 RabbitMQ 集群上运行它。
正如我们在本系列之前多次看到的那样,当这些 Spring Cloud Stream 应用作为数据管道的一部分在 Spring Cloud Data Flow 上运行时,它们会变得更加强大和有弹性。
上面我们看到的 Elasticsearch 可以与其他许多应用结合使用。例如,一个 TCP source 应用可以从一个源接收数据,然后将数据转储到中间件目标,Elasticsearch sink 再从那里消费并索引数据。然后,分析应用可以使用这个索引来生成仪表盘。这只是一个例子,还有许多这样的用例。Spring Cloud Data Flow 使这些管道的编排对用户来说是无缝的。我们鼓励您查阅我们之前博客中关于如何在 Spring Cloud Data Flow 上部署应用的步骤。使用同样的步骤,Elasticsearch sink 应用也可以部署。
在本博客中,我们了解了 Elasticsearch consumer 函数及其对应的 Spring Cloud Stream sink 的工作原理。consumer 函数可以注入到自定义应用中,与其他业务逻辑结合使用。该 sink 应用提供了开箱即用的 Kafka 和 RabbitMQ 中间件变体。
本系列博客还将有几篇后续文章。请继续关注。