Kafka Streams 和 Spring Cloud Stream

工程 | Soby Chacko | 2018 年 4 月 19 日 | ...

继最近发布的 Spring Cloud Stream Elmhurst.RELEASE 之后,我们很高兴地推出另一篇博客,专门介绍 Spring Cloud Stream 与 Apache Kafka Streams 库的本地集成。让我们回顾一下新的改进。

MessageChannel 绑定器

Spring Cloud Stream 框架使应用程序开发人员能够编写基于 Spring Boot 和 Spring Integration 强大基础的事件驱动应用程序。所有这些的基础是 binder 的实现,它负责应用程序和消息代理之间的通信。这些 binder 是基于 MessageChannel 的实现。

引入 Kafka Streams Binder

虽然从编程模型的角度来看,Spring Cloud Stream 确定的契约得以保留,但 Kafka Streams binder 不使用 MessageChannel 作为目标类型。binder 实现直接与 Kafka Streams 的“类型”——KStreamKTable——进行交互。应用程序可以直接使用 Kafka Streams 的原始类型,并无任何妥协地利用 Spring Cloud Stream 和 Spring 生态系统。

注意:Kafka Streams binder 并非对库本身的替代。

入门

生成包含 Spring Cloud Stream Kafka Streams 应用程序所需组件的项目的一种快速方法是通过 Spring Initializr — 见下文。

kafka streams initializr

简单示例

这是一个用 Spring Cloud Stream 和 Kafka Streams 编写的简单的词频统计应用程序。

@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, WordCount> process(KStream<Object, String> input) {

     return input
           .flatMapValues(
              value -> Arrays.asList(value.toLowerCase().split("\\W+")))
           .map((key, value) -> new KeyValue<>(value, value))
           .groupByKey()
           .windowedBy(TimeWindows.of(5000)
                   .count(Materialized.as("wordcounts"))
           .toStream()
           .map((key, value) ->
             new KeyValue<>(null, new WordCount(key.key(), value));
  }
}
  • @EnableBinding 注解与 KafkaStreamsProcessor 一起,指示框架在 Kafka Streams 目标上执行绑定。你也可以拥有自己的接口,包含多个“输入”和“输出”绑定。

  • @StreamListener 指示框架允许应用程序将事件作为 KStream 从绑定到“input”目标的 topic 中进行消费。

  • process() — 这是处理来自包含文本数据的 KStream 的事件的处理器。业务逻辑计算每个单词的出现次数,并在一个时间窗口(此处为 5 秒)内将总计数存储在状态存储中。生成的 KStream 包含单词及其在该时间窗口内的相应计数。

这是一个完整 版本 的示例。

Josh Long (@starbuxman) 制作了一个屏幕录像,详细介绍了 Kafka Streams 绑定支持的各种特性。

优势

  • 熟悉 Spring Cloud Stream(例如 @EnableBinding@StreamListener)的开发人员,可以通过使用 Kafka Streams API 将其扩展到构建有状态的应用程序。

  • 开发人员可以利用框架的内容类型转换进行入站和出站转换,或者切换到 Kafka 提供的原生 SerDe。

  • 将现有的 Kafka Streams 工作负载移植到独立的云原生应用程序,并能够使用 Spring Cloud Data Flow 将它们编排成一致的数据管道。

  • 应用程序按原样运行 — 不与任何云平台供应商锁定。

特性

  • Kafka Streams 和 Kafka binder 的 MessageChannel 绑定之间的互操作性

  • 多个 Kafka Streams 类型(如 KStreamKTable)作为 Handler 参数

  • 入站和出站流的内容类型转换

  • 属性开关,用于在框架与原生 Kafka SerDe 之间切换入站和出站消息转换

  • 错误处理支持

  • 反序列化错误中记录的死信队列(DLQ)支持

  • 分支支持

  • 交互查询支持

多个输出绑定(又称分支)

Kafka Streams binder 允许你发送到多个输出 topic(Kafka Streams 中的分支 API)。

以下是此类方法的轮廓。

@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<String, String>[] process(KStream<Object, String> input) {
...
}

请注意,方法上的返回类型是 KStream[]。请参阅此 示例 以了解更多关于此如何工作的详细信息。

多个输入绑定

Kafka Streams binder 还允许你绑定到 KStreamKTable 目标类型的多个输入,如下面的示例所示。

  @StreamListener
  public void process(@Input("input") KStream<String, PlayEvent> playEvents,
                         @Input("inputX") KTable<Long, Song> songTable) {
...
   }

注意方法参数列表中使用了多个输入。在这里,你可以看到两个 @Input 注解 — 一个用于 KStream,另一个用于 KTable

这是一个 示例 的工作版本。

框架内容类型 vs. 原生 Kafka SerDe

与基于 MessageChannel 的 binder 实现类似,Kafka Streams binder 也支持对传入和传出流的内容类型转换。任何其他类型的数据序列化完全由 Kafka Streams 本身处理。框架提供的边缘内容类型转换可以被禁用。相反,你可以使用 Kafka Streams 提供的 SerDe 工具,将这些职责完全委托给 Kafka。

当依赖 Kafka Streams binder 进行内容类型转换时,它仅适用于消息中的“值”(即,payload)。“键”始终由 Kafka SerDe 进行转换。

有关内容类型协商和序列化在 Kafka Streams binder 中如何处理的详细信息,请参阅 文档

错误处理

Kafka Streams 库内置了处理反序列化异常的支持 (KIP-161)。除了原生的反序列化错误处理支持外,Kafka Streams binder 还提供了将错误 payload 路由到 DLQ 的支持。有关详细信息,请参阅 文档 部分。

这是一个演示 Kafka Streams binder 中 DLQ 功能的 示例

交互查询

Kafka Streams 允许你从应用程序中交互式地查询状态存储,这可用于深入了解正在进行的流数据。Kafka Streams binder API 公开了一个名为 QueryableStoreRegistry 的类。你可以通过注入此 bean(可能通过自动装配)将其作为 Spring bean 在你的应用程序中访问,如下面的示例所示。

@Autowired
QueryableStoreRegistry queryableStoreRegistry;

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
	queryableStoreRegistry.getQueryableStoreType("my-store",
                         QueryableStoreTypes.keyValueStore());

这里有 基础高级 示例,演示了通过 binder 进行交互式查询的功能。

混合使用 Kafka Streams 和基于 MessageChannel 的 binder

如果应用程序用例需要同时使用基于 MessageChannel 的 Kafka binder 和 Kafka Streams binder,那么它们都可以在同一个应用程序中使用。在这种情况下,你可以有多个 StreamListener 方法,或者源和宿/处理器类型方法的组合。下面的应用程序示例展示了如何使用多个 StreamListener 方法来定位各种类型的绑定。

@StreamListener("binding2")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
}

@StreamListener("binding1")
public void sink(String input) {

}

interface MultipleProcessor {

	String BINDING_1 = "binding1";
	String BINDING_2 = "binding2";
	String OUTPUT = "output";

	@Input(BINDING_1)
	SubscribableChannel binding1();

	@Input(BINDING_2)
	KStream<?, ?> binding2();

	@Output(OUTPUT)
	KStream<?, ?> output();
}

在此示例中,第一个方法是 Kafka Streams 处理器,第二个方法是普通的基于 MessageChannel 的消费者。虽然你可以有多个具有不同目标类型(MessageChannel vs Kafka Stream 类型)的方法,但不能在单个方法中混合这两种类型。

结论

在本文中,我们介绍了 Spring Cloud Stream Kafka Streams binder 公开的高级结构和用法示例。除了允许使用 Spring Cloud Stream 的基于 MessageChannel 的 binder 外,此 binder 实现还可以让我们一致地开发、测试和生成有状态的应用程序。

请查看 项目 主页和 文档。一如既往,我们欢迎反馈和贡献,因此请通过 GitHubStack OverflowGitter 联系我们。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有