领先一步
VMware 提供培训和认证,助您快速提升。
了解更多第 1 部分 - 编程模型 第 2 部分 - 编程模型续篇 第 3 部分 - 数据反序列化和序列化 第 4 部分 - 错误处理 第 5 部分 - 应用程序定制
在本部分(本系列的第六篇也是最后一篇),我们将深入探讨 Spring Cloud Stream Binder for Kafka Streams 如何支持 Kafka Streams 中的状态存储和交互式查询。
当您需要在应用程序中维护状态时,Kafka Streams 允许您将状态信息具体化为一个命名状态存储。Kafka Streams 中有几个操作需要跟踪状态,例如 count
、aggregate
、reduce
、各种 windowing
操作等等。在大多数情况下,Kafka Streams 使用一个名为 RocksDB 的特殊数据库来维护此状态存储(除非您明确更改存储类型)。默认情况下,状态存储中的相同信息也会备份到变更日志主题和 Kafka 内部,以实现容错。
当您像这样明确地将状态具体化为一个命名状态存储时,这使得应用程序能够在后期查询该状态存储。这是一个非常强大的功能,因为它使您能够从 Kafka Streams 应用程序内部查询类似数据库的结构。
基于 Kafka Streams 绑定的应用程序可以将目标绑定为 KTable
或 GlobalKTable
。GlobalKTable
是一种特殊的表类型,无论它运行在哪一个实例上,您都可以从输入主题的所有分区获取数据。相比之下,KTable
仅提供该实例正在消费的主题相应分区的数据。
以下是我们在本系列博客文章中之前看到的函数签名
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
正如您所见,此函数有三个输入绑定:一个 KStream
、一个 KTable
和另一个 GlobalKTable
。Kafka Streams 允许您将像这样消费的表具体化为命名状态存储,前提是这些表基于主键。您可以使用绑定级别的属性在消费时将它们具体化为命名状态存储。以下示例展示了如何操作
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.materializedAs: incoming-store-2
Kafka Streams 高级 DSL 中有多种方法返回表类型,例如 count
、aggregate
和 reduce
。还有其他操作使用状态存储来跟踪信息。例如,KStream
中的各种 join 方法调用,虽然它们返回 KStream
类型,但内部使用状态存储来保存连接的数据。总而言之,当 Kafka Streams 允许您将数据具体化为表或流时,它会具体化为状态存储,非常类似于存储在数据库表中的数据。
当使用 Kafka Streams 的 Processor API 时(它让您在如何处理流方面有更大的灵活性),您必须事先声明一个状态存储,并将其提供给 StreamsBuilder。Kafka Streams 绑定器可以扫描应用程序以检测 StoreBuilder 类型的 bean,然后使用它们创建状态存储,并通过 StreamsBuilderFactoryBean 将它们连同底层的 StreamsBuilder 一起传递。以下是此类 bean 的示例
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
Duration.ofSeconds(3), Duration.ofSeconds(3), false), Serdes.Long(),
Serdes.Long());
}
这两个 StoreBuilder bean 会被绑定器检测到,然后绑定器会自动将它们附加到 streams builder。之后,您可以在基于 Processor API 的应用程序中访问它们,如下所示
…
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
...
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext context) {
state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
}
@Override
public void process(Object key, String value) {
// processing code
}
@Override
public void close() {
if (state1 != null) {
state1.close();
}
if (state2 != null) {
state2.close();
}
}
}, "my-store", "other-store");
}
关于在基于 Kafka Streams 绑定的应用程序中使用 Processor API 的一个简要说明。当您使用绑定器时,使用低级 Processor API 的唯一方式是通过使用高级 DSL,然后结合对其进行 transform 或 process 调用,如前面的示例所示。有关如何在基于绑定器的应用程序中使用 Processor API 的更多详细信息,请参阅此处。
除了在应用程序中创建 StoreBuilder
bean 外,您还可以使用我们在上一篇博客中看到的 StreamsBuilderFactoryBean
自定义器,以编程方式添加状态存储,如果这是您的偏好。
Kafka Streams 允许您在实时流处理进行时交互式地查询状态存储中的数据。绑定器为此功能提供了抽象,以使交互式查询更容易使用。InteractiveQueryService
是绑定器提供的用于状态存储查询的基本 API。通常可以将其作为 bean 注入您的应用程序,然后调用其各种 API 方法。以下是一个示例
@Autowired
private InteractiveQueryService interactiveQueryService;
…
...
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
然后您可以调用存储中的各种检索方法并迭代结果。根据您的用例和正在使用的状态存储类型,您可以从这些状态存储中调用各种方法。请参阅 Kafka Streams 文档中关于交互式查询的部分,了解这些可用的迭代方法。
通常,您希望通过 RPC 机制从状态存储中暴露系统的状态。您可以结合 Spring 的 Web 支持,以这种方式编写强大的基于 REST 的应用程序。以下是一个蓝图
@RestController
public class Controller {
@RequestMapping("/song/id")
public SongBean song(@RequestParam(value="id") Long id) {
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryService.getQueryableStore(“song-store”, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(id);
if (song == null) {
throw new IllegalArgumentException("...");
}
例如,前端 Web 应用程序可以访问此 REST 控制器。
这种使用模式显然会引发担忧。如果运行多个 Kafka Streams 应用程序实例会发生什么?例如,如果存在 3 个实例,每个实例都从单个源分区拉取数据怎么办?哪个控制器实例将负责提供键 X 的信息?如果键 X 仅托管在分区 3(恰好是实例 3),但请求落在实例 1 上怎么办?这显然是一个问题,但 Kafka Streams 提供了一个解决方案。
当您有多个实例正在运行并希望使用交互式查询时,您必须在绑定器级别设置此属性
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
然后,在控制器方法中,您必须编写类似于以下内容的逻辑
@RequestMapping("/charts/top-five")
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
{
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
RestTemplate restTemplate = new RestTemplate();
return restTemplate.postForObject(
String.format("http://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "charts/top-five?genre=Punk"), …);
}
在这篇博客中,我们探讨了 Kafka Streams 允许您将状态信息具体化为状态存储的各种方式。绑定器允许您将数据消费为 KTable
或 GlobalKTable
,同时允许将其具体化为命名状态存储。Kafka Streams 有多个操作可以将状态存储具体化为命名存储。我们看到,在使用 Kafka Streams 中的 Processor API 时,应用程序需要创建状态存储构建器 bean,绑定器会检测到这些 bean 并将其传递给 Kafka Streams。最后,我们看到了如何使用交互式查询来查询这些状态存储。我们还探讨了涉及应用程序多个实例及其交互式查询的细微差别。
感谢您阅读本系列博客文章!
在这个包含六个部分的系列中,我们探讨了 Spring Cloud Stream 中 Kafka Streams 绑定器的许多功能,例如其编程模型、数据序列化、错误处理、定制以及交互式查询状态存储。本系列未涵盖更多功能,因为我们希望重点介绍该绑定器在版本 3.0.0
中添加或增强的主要功能。有关这些额外功能或与 Spring Cloud Stream 背后的工程团队交流,请查看下面资源部分提供的各种链接。
Spring Cloud Stream 核心 GitHub Spring Cloud Stream Kafka 绑定器 GitHub Spring Cloud Stream 示例