Kafka Streams 和 Spring Cloud Stream

工程 | Soby Chacko | 2018年4月19日 | ...

继最近发布的 Spring Cloud Stream Elmhurst.RELEASE 之后,我们很高兴地推出另一篇博文,专门介绍 Spring Cloud Stream 与 Apache Kafka Streams 库的原生集成。让我们回顾一下新的改进。

MessageChannel 绑定器

Spring Cloud Stream 框架使应用程序开发人员能够编写使用 Spring Boot 和 Spring Integration 强大基础的事件驱动应用程序。所有这些的基础是绑定器实现,它负责应用程序和消息代理之间的通信。这些绑定器是基于 MessageChannel 的实现。

进入 Kafka Streams 绑定器

虽然 Spring Cloud Stream 建立的契约从编程模型的角度来看是保持一致的,但 Kafka Streams 绑定器不使用 MessageChannel 作为目标类型。绑定器实现与 Kafka Streams 的“类型” - KStreamKTable 本地交互。应用程序可以直接使用 Kafka Streams 原语并利用 Spring Cloud Stream 和 Spring 生态系统,而不会有任何妥协。

注意:Kafka Streams 绑定器并不是使用库本身的替代品。

入门

通过 Spring Initializr 生成具有 Spring Cloud Stream Kafka Streams 应用程序必要组件的项目的快速方法,请参见下文。

kafka streams initializr

简单示例

这是一个用 Spring Cloud Stream 和 Kafka Streams 编写的简单的单词计数应用程序。

@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, WordCount> process(KStream<Object, String> input) {

     return input
           .flatMapValues(
              value -> Arrays.asList(value.toLowerCase().split("\\W+")))
           .map((key, value) -> new KeyValue<>(value, value))
           .groupByKey()
           .windowedBy(TimeWindows.of(5000)
                   .count(Materialized.as("wordcounts"))
           .toStream()
           .map((key, value) ->
             new KeyValue<>(null, new WordCount(key.key(), value));
  }
}
  • 带有 KafkaStreamsProcessor@EnableBinding 注解传达给框架,以便在 Kafka Streams 目标上执行绑定。您也可以拥有自己的接口,其中包含多个“输入”和“输出”绑定。

  • @StreamListener 指示框架允许应用程序从绑定在“输入”目标上的主题中以 KStream 的形式使用事件。

  • process() - 一个处理程序,它接收来自包含文本数据的 KStream 的事件。业务逻辑计算每个单词的数量,并在状态存储中存储一段时间窗口(在本例中为 5 秒)内的总计数。生成的 KStream 包含该时间窗口内单词及其对应的计数。

此示例的完整 版本

Josh Long@starbuxman)制作了一个屏幕录像,详细介绍了 Kafka Streams 绑定支持的各种功能。

优势

  • 熟悉 Spring Cloud Stream(例如:@EnableBinding@StreamListener)的开发人员可以使用 Kafka Streams API 将其扩展到构建有状态应用程序。

  • 开发人员可以利用框架的传入和传出内容类型转换,或切换到 Kafka 提供的原生 SerDe。

  • 将现有的 Kafka Streams 工作负载移植到独立的云原生应用程序中,并能够使用 Spring Cloud Data Flow 将它们编排为连贯的数据管道。

  • 应用程序按原样运行 - 不受任何云平台供应商的锁定。

功能

  • Kafka Streams 和 Kafka 绑定器的 MessageChannel 绑定之间的互操作性

  • 多个 Kafka Streams 类型(如 KStreamKTable)作为处理程序参数

  • 传入和传出流的内容类型转换

  • 属性切换以在框架与本机 Kafka SerDe 之间切换,用于传入和传出消息转换

  • 错误处理支持

  • 反序列化错误记录的死信队列 (DLQ) 支持

  • 分支支持

  • 交互式查询支持

多个输出绑定(又名分支)

Kafka Streams 绑定器允许您发送到多个输出主题(Kafka Streams 中的分支 API)。

以下是此类方法的概述。

@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<String, String>[] process(KStream<Object, String> input) {
...
}

请注意,方法上的返回类型是 KStream[]。有关此工作原理的更多详细信息,请参阅此 示例

多个输入绑定

Kafka Streams 绑定器还允许您绑定到 KStreamKTable 目标类型的多个输入,如下例所示

  @StreamListener
  public void process(@Input("input") KStream<String, PlayEvent> playEvents,
                         @Input("inputX") KTable<Long, Song> songTable) {
...
   }

请注意方法参数列表中使用了多个输入。在这里,您可以看到两个 @Input 注解 - 一个用于 KStream,另一个用于 KTable

示例 的工作版本。

框架内容类型与原生 Kafka SerDe

类似于基于MessageChannel的绑定实现,Kafka Streams绑定也支持传入和传出流的内容类型转换。任何其他类型的數據序列化完全由Kafka Streams本身处理。可以禁用框架提供的内容类型转换。相反,您可以使用Kafka Streams提供的SerDe功能将责任完全委托给Kafka。

当依赖Kafka Streams绑定进行内容类型转换时,它仅应用于消息中的“值”(即有效负载)。“键”始终由Kafka SerDe转换。

有关Kafka Streams绑定中如何处理内容类型协商和序列化的详细信息,请参阅文档

错误处理

Kafka Streams库内置支持处理反序列化异常(KIP-161)。除了原生反序列化错误处理支持外,Kafka Streams绑定还提供支持将错误有效负载路由到DLQ。有关详细信息,请参阅此文档部分。

这是一个示例,演示了Kafka Streams绑定中的DLQ功能。

交互式查询

Kafka Streams允许您从应用程序交互式查询状态存储,这可以用来深入了解正在进行的流数据。Kafka Streams绑定API公开了一个名为QueryableStoreRegistry的类。您可以通过注入此bean(可能通过自动装配)在您的应用程序中将其作为Spring bean访问,如下例所示

@Autowired
QueryableStoreRegistry queryableStoreRegistry;

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
	queryableStoreRegistry.getQueryableStoreType("my-store",
                         QueryableStoreTypes.keyValueStore());

以下是一些基本高级示例,演示了通过绑定进行交互式查询的功能。

混合使用Kafka Streams和基于MessageChannel的绑定

如果应用程序用例需要同时使用基于MessageChannel的Kafka绑定和Kafka Streams绑定,则它们都可以在同一个应用程序中使用。在这种情况下,您可以有多个StreamListener方法,或者源和接收器/处理器类型方法的组合。以下应用程序示例显示了如何使用多个StreamListener方法来定位各种类型的绑定

@StreamListener("binding2")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
}

@StreamListener("binding1")
public void sink(String input) {

}

interface MultipleProcessor {

	String BINDING_1 = "binding1";
	String BINDING_2 = "binding2";
	String OUTPUT = "output";

	@Input(BINDING_1)
	SubscribableChannel binding1();

	@Input(BINDING_2)
	KStream<?, ?> binding2();

	@Output(OUTPUT)
	KStream<?, ?> output();
}

在此示例中,第一个方法是Kafka Streams处理器,第二个方法是常规的基于MessageChannel的消费者。虽然您可以有多个具有不同目标类型(MessageChannel与Kafka Stream类型)的方法,但在单个方法中无法混合使用这两种类型。

结论

在本文中,我们看到了通过Spring Cloud Stream Kafka Streams绑定公开的高级结构和使用示例。除了允许使用Spring Cloud Stream的基于MessageChannel的绑定外,此绑定实现还使我们能够一致地开发、测试和生成有状态的应用程序。

查看项目页面和文档。与往常一样,我们欢迎反馈和贡献,因此请通过GitHubStack OverflowGitter与我们联系。

获取Spring时事通讯

与Spring时事通讯保持联系

订阅

领先一步

VMware提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部