使用 Spring Cloud Stream 和 Apache Kafka Streams 进行流处理。第 6 部分 - 状态存储和交互式查询

工程 | Soby Chacko | 2019年12月09日 | ...

第 1 部分 - 编程模型 第 2 部分 - 编程模型续篇 第 3 部分 - 数据反序列化和序列化 第 4 部分 - 错误处理 第 5 部分 - 应用程序定制

在本部分(本系列的第六篇也是最后一篇),我们将深入探讨 Spring Cloud Stream Binder for Kafka Streams 如何支持 Kafka Streams 中的状态存储和交互式查询。

命名状态存储

当您需要在应用程序中维护状态时,Kafka Streams 允许您将状态信息具体化为一个命名状态存储。Kafka Streams 中有几个操作需要跟踪状态,例如 countaggregatereduce、各种 windowing 操作等等。在大多数情况下,Kafka Streams 使用一个名为 RocksDB 的特殊数据库来维护此状态存储(除非您明确更改存储类型)。默认情况下,状态存储中的相同信息也会备份到变更日志主题和 Kafka 内部,以实现容错。

当您像这样明确地将状态具体化为一个命名状态存储时,这使得应用程序能够在后期查询该状态存储。这是一个非常强大的功能,因为它使您能够从 Kafka Streams 应用程序内部查询类似数据库的结构。

将数据消费为 KTable 或 GlobalKTable

基于 Kafka Streams 绑定的应用程序可以将目标绑定为 KTableGlobalKTableGlobalKTable 是一种特殊的表类型,无论它运行在哪一个实例上,您都可以从输入主题的所有分区获取数据。相比之下,KTable 仅提供该实例正在消费的主题相应分区的数据。

以下是我们在本系列博客文章中之前看到的函数签名

@Bean
public Function<KStream<Long, Order>,
     Function<KTable<Long, Customer>,
           Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {

正如您所见,此函数有三个输入绑定:一个 KStream、一个 KTable 和另一个 GlobalKTable。Kafka Streams 允许您将像这样消费的表具体化为命名状态存储,前提是这些表基于主键。您可以使用绑定级别的属性在消费时将它们具体化为命名状态存储。以下示例展示了如何操作

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.materializedAs: incoming-store-2

具体化为状态存储的 Kafka Streams DSL 操作

Kafka Streams 高级 DSL 中有多种方法返回表类型,例如 countaggregatereduce。还有其他操作使用状态存储来跟踪信息。例如,KStream 中的各种 join 方法调用,虽然它们返回 KStream 类型,但内部使用状态存储来保存连接的数据。总而言之,当 Kafka Streams 允许您将数据具体化为表或流时,它会具体化为状态存储,非常类似于存储在数据库表中的数据。

在低级处理器中使用的显式状态存储

当使用 Kafka Streams 的 Processor API 时(它让您在如何处理流方面有更大的灵活性),您必须事先声明一个状态存储,并将其提供给 StreamsBuilder。Kafka Streams 绑定器可以扫描应用程序以检测 StoreBuilder 类型的 bean,然后使用它们创建状态存储,并通过 StreamsBuilderFactoryBean 将它们连同底层的 StreamsBuilder 一起传递。以下是此类 bean 的示例

@Bean
public StoreBuilder myStore() {
  return Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
        Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
  return Stores.windowStoreBuilder(
        Stores.persistentWindowStore("other-store",
              Duration.ofSeconds(3), Duration.ofSeconds(3),  false), Serdes.Long(),
        Serdes.Long());
}

这两个 StoreBuilder bean 会被绑定器检测到,然后绑定器会自动将它们附加到 streams builder。之后,您可以在基于 Processor API 的应用程序中访问它们,如下所示

…
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
...
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
  return input ->
        input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
           @Override
            public void init(ProcessorContext context) {
              state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
              state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
           }

           @Override
           public void process(Object key, String value) {
              // processing code
           }

           @Override
           public void close() {
              if (state1 != null) {
                 state1.close();
              }
              if (state2 != null) {
                 state2.close();
              }
           }
        }, "my-store", "other-store");
}

关于在基于 Kafka Streams 绑定的应用程序中使用 Processor API 的一个简要说明。当您使用绑定器时,使用低级 Processor API 的唯一方式是通过使用高级 DSL,然后结合对其进行 transform 或 process 调用,如前面的示例所示。有关如何在基于绑定器的应用程序中使用 Processor API 的更多详细信息,请参阅此处

除了在应用程序中创建 StoreBuilder bean 外,您还可以使用我们在上一篇博客中看到的 StreamsBuilderFactoryBean 自定义器,以编程方式添加状态存储,如果这是您的偏好。

使用交互式查询从状态存储中查询数据

Kafka Streams 允许您在实时流处理进行时交互式地查询状态存储中的数据。绑定器为此功能提供了抽象,以使交互式查询更容易使用。InteractiveQueryService 是绑定器提供的用于状态存储查询的基本 API。通常可以将其作为 bean 注入您的应用程序,然后调用其各种 API 方法。以下是一个示例

@Autowired
private InteractiveQueryService interactiveQueryService;
 …
 ...
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
                                                interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

然后您可以调用存储中的各种检索方法并迭代结果。根据您的用例和正在使用的状态存储类型,您可以从这些状态存储中调用各种方法。请参阅 Kafka Streams 文档中关于交互式查询的部分,了解这些可用的迭代方法。

通过 RPC 机制进行交互式查询

通常,您希望通过 RPC 机制从状态存储中暴露系统的状态。您可以结合 Spring 的 Web 支持,以这种方式编写强大的基于 REST 的应用程序。以下是一个蓝图

@RestController
public class Controller {

		@RequestMapping("/song/id")
		public SongBean song(@RequestParam(value="id") Long id) {
			final ReadOnlyKeyValueStore<Long, Song> songStore =
					interactiveQueryService.getQueryableStore(“song-store”, QueryableStoreTypes.<Long, Song>keyValueStore());

			final Song song = songStore.get(id);
			if (song == null) {
				throw new IllegalArgumentException("...");
}

例如,前端 Web 应用程序可以访问此 REST 控制器。

这种使用模式显然会引发担忧。如果运行多个 Kafka Streams 应用程序实例会发生什么?例如,如果存在 3 个实例,每个实例都从单个源分区拉取数据怎么办?哪个控制器实例将负责提供键 X 的信息?如果键 X 仅托管在分区 3(恰好是实例 3),但请求落在实例 1 上怎么办?这显然是一个问题,但 Kafka Streams 提供了一个解决方案。

从正确的实例检索键

当您有多个实例正在运行并希望使用交互式查询时,您必须在绑定器级别设置此属性

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

然后,在控制器方法中,您必须编写类似于以下内容的逻辑

@RequestMapping("/charts/top-five")
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
{

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
                                                key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
RestTemplate restTemplate = new RestTemplate();
	return restTemplate.postForObject(
						String.format("http://%s:%d/%s", hostInfo.host(),
								hostInfo.port(), "charts/top-five?genre=Punk"), …);

}

总结

在这篇博客中,我们探讨了 Kafka Streams 允许您将状态信息具体化为状态存储的各种方式。绑定器允许您将数据消费为 KTableGlobalKTable,同时允许将其具体化为命名状态存储。Kafka Streams 有多个操作可以将状态存储具体化为命名存储。我们看到,在使用 Kafka Streams 中的 Processor API 时,应用程序需要创建状态存储构建器 bean,绑定器会检测到这些 bean 并将其传递给 Kafka Streams。最后,我们看到了如何使用交互式查询来查询这些状态存储。我们还探讨了涉及应用程序多个实例及其交互式查询的细微差别。

结束系列和后续步骤...

感谢您阅读本系列博客文章!

在这个包含六个部分的系列中,我们探讨了 Spring Cloud Stream 中 Kafka Streams 绑定器的许多功能,例如其编程模型数据序列化错误处理定制以及交互式查询状态存储。本系列未涵盖更多功能,因为我们希望重点介绍该绑定器在版本 3.0.0 中添加或增强的主要功能。有关这些额外功能或与 Spring Cloud Stream 背后的工程团队交流,请查看下面资源部分提供的各种链接。

资源

Kafka Streams 绑定器文档

Spring Cloud Stream

Spring Cloud Stream 核心 GitHub Spring Cloud Stream Kafka 绑定器 GitHub Spring Cloud Stream 示例

Stack Overflow Gitter

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将举办的活动

查看 Spring 社区所有即将举办的活动。

查看全部