领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多第1部分 - 编程模型 第2部分 - 编程模型续 第3部分 - 数据反序列化和序列化 第4部分 - 错误处理 第5部分 - 应用程序定制
在本部分(本系列的第六部分也是最后一部分)中,我们将探讨 Spring Cloud Stream Binder for Kafka Streams 如何支持 Kafka Streams 中的状态存储和交互式查询。
当您需要在应用程序中维护状态时,Kafka Streams 允许您将该状态信息物化为一个命名状态存储。Kafka Streams 中的几个操作需要它来跟踪状态,例如count
、aggregate
、reduce
、各种windowing
操作等等。在大多数情况下,Kafka Streams 使用名为RocksDB的特殊数据库来维护此状态存储(除非您明确更改存储类型)。默认情况下,状态存储中的相同信息也会备份到 Kafka 中的一个更改日志主题,以确保容错性。
当您像这样将状态显式地物化为命名状态存储时,应用程序可以在以后阶段查询该状态存储。这是一个非常强大的功能,因为它使您可以从 Kafka Streams 应用程序中查询类似数据库的结构。
基于 Kafka Streams Binder 的应用程序可以绑定到KTable
或GlobalKTable
作为目标。GlobalKTable
是一种特殊的表类型,无论实例运行在哪里,您都可以从中获取输入主题的所有分区的数据。相比之下,KTable
只为您提供实例正在从中消费的主题的相应分区的数据。
以下是我们在本系列博客文章中前面看到的函数签名
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
如您所见,此函数具有三个输入绑定,一个KStream
,一个KTable
和另一个GlobalKTable
。Kafka Streams 允许您将像这样消费的表物化为命名状态存储,前提是这些表基于主键。您可以使用绑定级别属性将它们与消费一起物化为命名状态存储。以下示例显示了如何操作
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.materializedAs: incoming-store-2
Kafka Streams 高级 DSL 中有各种方法,它们返回表类型,例如count
、aggregate
和reduce
。还有其他操作使用状态存储来跟踪信息。例如,KStream
中的各种连接方法调用,尽管它们返回KStream
类型,但内部使用状态存储来保存连接的数据。总之,当 Kafka Streams 允许您将数据物化成表或流时,它会被物化到状态存储中,就像存储在数据库表中的数据一样。
当使用 Kafka Streams 的处理器 API 时,它可以让您更灵活地处理流,您必须预先声明状态存储并将其提供给 StreamsBuilder。Kafka Streams Binder 可以扫描应用程序以检测类型为 StoreBuilder 的 bean,然后使用它来创建状态存储,并通过 StreamsBuilderFactoryBean 将它们与底层的 StreamsBuilder 一起传递。以下是此类 bean 的外观
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
Duration.ofSeconds(3), Duration.ofSeconds(3), false), Serdes.Long(),
Serdes.Long());
}
Binder 会检测这两个 StoreBuilder bean,然后自动将它们附加到流构建器。稍后,您可以在基于处理器 API 的应用程序中访问它们,如下所示
…
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
...
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext context) {
state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
}
@Override
public void process(Object key, String value) {
// processing code
}
@Override
public void close() {
if (state1 != null) {
state1.close();
}
if (state2 != null) {
state2.close();
}
}
}, "my-store", "other-store");
}
关于在基于 Kafka Streams Binder 的应用程序中使用处理器 API 的一个快速说明。当您使用 Binder 时,使用低级处理器 API 的唯一方法是通过高级 DSL 的使用模式,然后将其与对其进行转换或处理的调用结合使用,如前面的示例所示。有关如何在基于 Binder 的应用程序中使用处理器 API 的更多详细信息,请参阅此处。
您可以使用我们在上一篇博客中看到的StreamsBuilderFactoryBean
定制器,以编程方式添加状态存储,如果您更喜欢这种方式的话。
Kafka Streams 允许您在实时流处理进行时实时交互式地查询状态存储中的数据。Binder 提供了围绕此功能的抽象,以便更轻松地使用交互式查询。InteractiveQueryService
是 Binder 提供的基本 API,用于处理状态存储查询。您通常可以将其作为 bean 注入到您的应用程序中,然后从中调用各种 API 方法。这是一个示例
@Autowired
private InteractiveQueryService interactiveQueryService;
…
...
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
然后,您可以从存储中调用各种检索方法并遍历结果。您可以根据您的用例和正在使用的状态存储类型,从这些状态存储中调用各种方法。请参阅 Kafka Streams 文档中关于交互式查询的内容,了解这些可用的各种迭代方法。
通常,您希望通过 RPC 机制公开系统状态存储中的系统状态。您可以结合 Spring Web 支持来编写强大的基于 REST 的应用程序。这是一个蓝图
@RestController
public class Controller {
@RequestMapping("/song/id")
public SongBean song(@RequestParam(value="id") Long id) {
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryService.getQueryableStore(“song-store”, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(id);
if (song == null) {
throw new IllegalArgumentException("...");
}
例如,可以从前端 Web 应用程序访问此 REST 控制器。
这种使用模式显然会引发一些问题。如果有多个 Kafka Streams 应用程序实例正在运行会发生什么?例如,如果 3 个实例中的每一个都从单个源分区中提取数据,会发生什么?哪个控制器实例将负责提供密钥X的信息?如果密钥X只托管在分区 3 中,而该分区恰好是实例 3,但请求落到了实例 1 上会怎样?这显然是一个问题,但 Kafka Streams 提供了解决方案。
当您有多个实例运行并且想要使用交互式查询时,您必须在 Binder 级别设置此属性
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
然后,在控制器方法中,您必须编写类似于以下内容的逻辑
@RequestMapping("/charts/top-five")
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
{
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
RestTemplate restTemplate = new RestTemplate();
return restTemplate.postForObject(
String.format("http://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "charts/top-five?genre=Punk"), …);
}
在本博客中,我们了解了 Kafka Streams 将状态信息物化到状态存储的各种方法。绑定器允许您将数据作为KTable
或GlobalKTable
进行消费,同时允许您将其物化到命名状态存储中。Kafka Streams 有多个操作可以将状态存储物化为命名存储。我们看到,当在 Kafka Streams 中使用处理器 API 时,应用程序需要创建绑定器检测到的状态存储构建器 bean,然后将其传递给 Kafka Streams。最后,我们了解了如何使用交互式查询来查询这些状态存储。我们还了解了与应用程序的多个实例以及针对它们的交互式查询相关的细微差别。
感谢您阅读本系列博客!
在这个由六部分组成的系列中,我们了解了 Spring Cloud Stream 中 Kafka Streams 绑定器的许多功能,例如其编程模型、数据序列化、错误处理、自定义以及交互式查询状态存储。本系列中未涵盖更多功能,因为我们希望关注介绍在3.0.0
版本中添加或增强的此绑定器的主要功能这一主题。有关这些附加功能或与 Spring Cloud Stream 背后的工程团队互动,请查看下面资源部分中提供的各种链接。
核心 Spring Cloud Stream GitHub Spring Cloud Stream Kafka 绑定器 GitHub Spring Cloud Stream 示例