Kafka Streams 和 Spring Cloud Stream

工程 | Soby Chacko | 2018 年 4 月 19 日 | ...

继最近发布的 Spring Cloud Stream Elmhurst.RELEASE 之后,我们很高兴推出另一篇博客文章,专门介绍 Spring Cloud Stream 与 Apache Kafka Streams 库的原生集成。下面我们来回顾一下新的改进。

MessageChannel 绑定器

Spring Cloud Stream 框架使应用开发者能够编写事件驱动的应用,这些应用利用了 Spring Boot 和 Spring Integration 的坚实基础。所有这些的底层核心是 binder 实现,它负责应用和消息代理之间的通信。这些 binder 是基于 MessageChannel 的实现。

Kafka Streams 绑定器登场

尽管 Spring Cloud Stream 建立的契约从编程模型角度来看得到了维护,但 Kafka Streams 绑定器不使用 MessageChannel 作为目标类型。该绑定器实现原生与 Kafka Streams 的“类型”进行交互——即 KStreamKTable。应用可以直接使用 Kafka Streams 的原生类型,并在不妥协的情况下利用 Spring Cloud Stream 和 Spring 生态系统。

注意:Kafka Streams 绑定器并不能替代直接使用 Kafka Streams 库。

开始使用

通过 Spring Initializr 是生成一个包含 Spring Cloud Stream Kafka Streams 应用所需组件的快速方法 - 见下方。

kafka streams initializr

一个简单的示例

这里是一个使用 Spring Cloud Stream 和 Kafka Streams 编写的简单词频统计应用。

@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, WordCount> process(KStream<Object, String> input) {

     return input
           .flatMapValues(
              value -> Arrays.asList(value.toLowerCase().split("\\W+")))
           .map((key, value) -> new KeyValue<>(value, value))
           .groupByKey()
           .windowedBy(TimeWindows.of(5000)
                   .count(Materialized.as("wordcounts"))
           .toStream()
           .map((key, value) ->
             new KeyValue<>(null, new WordCount(key.key(), value));
  }
}
  • 使用 @EnableBinding 注解和 KafkaStreamsProcessor 会告诉框架在 Kafka Streams 目标上执行绑定。你也可以使用自己的接口来实现多个“输入”和“输出”绑定。

  • @StreamListener 注解指示框架允许应用从绑定到“input”目标的 topic 中以 KStream 的形式消费事件。

  • process() - 一个处理器,它接收包含文本数据的 KStream 中的事件。业务逻辑计算每个单词的出现次数,并在一个时间窗口(本例中为 5 秒)内将总计数存储到状态存储中。结果 KStream 包含该时间窗口内的单词及其对应的计数。

这里是该示例的完整版本

Josh Long (@starbuxman) 制作了一个截屏视频,详细介绍了 Kafka Streams 绑定支持的各种特性。

优势

  • 熟悉 Spring Cloud Stream(例如:@EnableBinding@StreamListener)的开发者可以利用 Kafka Streams API 将其扩展到构建有状态应用。

  • 开发者可以利用框架的内容类型转换功能进行入站和出站转换,也可以切换到 Kafka 提供的原生 SerDe。

  • 将现有的 Kafka Streams 工作负载移植到独立的云原生应用中,并能够使用 Spring Cloud Data Flow 将它们编排成连贯的数据管道。

  • 应用可按原样运行 - 不受任何云平台供应商锁定。

特性

  • Kafka Streams 和 Kafka 绑定器的 MessageChannel 绑定之间的互操作性

  • 支持多种 Kafka Streams 类型(例如 KStreamKTable)作为 Handler 参数

  • 入站和出站流的内容类型转换

  • 用于在框架内容转换和原生 Kafka SerDe 之间切换的属性开关,用于入站和出站消息转换

  • 错误处理支持

  • 支持将反序列化错误的记录发送到死信队列 (DLQ)

  • 分支支持

  • 交互式查询支持

多个输出绑定(即分支)

Kafka Streams 绑定器允许你发送到多个输出 topic(这是 Kafka Streams 中的 Branching API)。

下面是这类方法的大纲。

@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<String, String>[] process(KStream<Object, String> input) {
...
}

注意,方法返回类型是 KStream[]。有关其工作原理的更多详情,请参阅此示例

多个输入绑定

Kafka Streams 绑定器还允许你绑定到多个 KStreamKTable 目标类型的输入,如下例所示

  @StreamListener
  public void process(@Input("input") KStream<String, PlayEvent> playEvents,
                         @Input("inputX") KTable<Long, Song> songTable) {
...
   }

请注意方法参数列表中使用了多个输入。这里你可以看到两个 @Input 注解 - 一个用于 KStream,另一个用于 KTable

这里是此示例的一个可运行版本。

框架内容类型 vs 原生 Kafka SerDe

与基于 MessageChannel 的绑定器实现类似,Kafka Streams 绑定器也支持入站和出站流的内容类型转换。任何其他类型的数据序列化完全由 Kafka Streams 本身处理。框架提供的边缘内容类型转换可以禁用。你可以选择完全将责任委托给 Kafka,使用 Kafka Streams 提供的 SerDe 功能。

当依赖 Kafka Streams 绑定器进行内容类型转换时,它仅应用于消息中的“value”(即 payload)。“keys”始终由 Kafka SerDe 进行转换。

关于内容类型协商和序列化如何在 Kafka Streams 绑定器中处理的详细信息,请参阅文档

错误处理

Kafka Streams 库内置了对处理反序列化异常的支持 (KIP-161)。除了原生的反序列化错误处理支持外,Kafka Streams 绑定器还提供了将错误 payload 路由到 DLQ 的支持。有关详情,请参阅此文档部分。

这里有一个示例,展示了 Kafka Streams 绑定器中的 DLQ 功能。

交互式查询

Kafka Streams 允许你从应用中交互式查询状态存储,这可用于获取关于进行中的流数据的洞察。Kafka Streams 绑定器 API 暴露了一个名为 QueryableStoreRegistry 的类。你可以通过注入此 bean(可能通过自动装配)来在应用中将其作为 Spring bean 访问,如下例所示

@Autowired
QueryableStoreRegistry queryableStoreRegistry;

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
	queryableStoreRegistry.getQueryableStoreType("my-store",
                         QueryableStoreTypes.keyValueStore());

这里有基础高级示例,演示了通过绑定器实现的交互式查询功能。

混合使用 Kafka Streams 和基于 MessageChannel 的绑定器

如果应用场景需要同时使用基于 MessageChannel 的 Kafka 绑定器和 Kafka Streams 绑定器,两者可以在同一个应用中使用。在这种情况下,你可以有多个 StreamListener 方法,或者源方法与 sink/处理器类型方法的组合。下面的应用示例展示了如何使用多个 StreamListener 方法来针对各种类型的绑定

@StreamListener("binding2")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
}

@StreamListener("binding1")
public void sink(String input) {

}

interface MultipleProcessor {

	String BINDING_1 = "binding1";
	String BINDING_2 = "binding2";
	String OUTPUT = "output";

	@Input(BINDING_1)
	SubscribableChannel binding1();

	@Input(BINDING_2)
	KStream<?, ?> binding2();

	@Output(OUTPUT)
	KStream<?, ?> output();
}

在此示例中,第一个方法是 Kafka Streams 处理器,第二个方法是常规的基于 MessageChannel 的消费者。尽管你可以拥有多个具有不同目标类型(MessageChannel vs Kafka Stream 类型)的方法,但不能在单个方法中混合使用这两种类型。

结论

在本文中,我们探讨了通过 Spring Cloud Stream Kafka Streams 绑定器暴露的高级构造和使用示例。除了允许使用 Spring Cloud Stream 基于 MessageChannel 的绑定器外,此绑定器实现还使我们能够持续地开发、测试和生产有状态应用。

请访问项目页面和文档。一如既往,我们欢迎反馈和贡献,请通过 GitHubStack OverflowGitter 与我们联系。

订阅 Spring 新闻快讯

通过 Spring 新闻快讯保持联系

订阅

领先一步

VMware 提供培训和认证,助你加速进步。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅,提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持及二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部