领先一步
VMware提供培训和认证,以加速您的进步。
了解更多继最近发布的 Spring Cloud Stream Elmhurst.RELEASE 之后,我们很高兴地推出另一篇博文,专门介绍 Spring Cloud Stream 与 Apache Kafka Streams 库的原生集成。让我们回顾一下新的改进。
Spring Cloud Stream 框架使应用程序开发人员能够编写使用 Spring Boot 和 Spring Integration 强大基础的事件驱动应用程序。所有这些的基础是绑定器实现,它负责应用程序和消息代理之间的通信。这些绑定器是基于 MessageChannel
的实现。
虽然 Spring Cloud Stream 建立的契约从编程模型的角度来看是保持一致的,但 Kafka Streams 绑定器不使用 MessageChannel
作为目标类型。绑定器实现与 Kafka Streams 的“类型” - KStream
或 KTable
本地交互。应用程序可以直接使用 Kafka Streams 原语并利用 Spring Cloud Stream 和 Spring 生态系统,而不会有任何妥协。
注意:Kafka Streams 绑定器并不是使用库本身的替代品。
通过 Spring Initializr 生成具有 Spring Cloud Stream Kafka Streams 应用程序必要组件的项目的快速方法,请参见下文。
这是一个用 Spring Cloud Stream 和 Kafka Streams 编写的简单的单词计数应用程序。
@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey()
.windowedBy(TimeWindows.of(5000)
.count(Materialized.as("wordcounts"))
.toStream()
.map((key, value) ->
new KeyValue<>(null, new WordCount(key.key(), value));
}
}
带有 KafkaStreamsProcessor
的 @EnableBinding
注解传达给框架,以便在 Kafka Streams 目标上执行绑定。您也可以拥有自己的接口,其中包含多个“输入”和“输出”绑定。
@StreamListener
指示框架允许应用程序从绑定在“输入”目标上的主题中以 KStream
的形式使用事件。
process()
- 一个处理程序,它接收来自包含文本数据的 KStream
的事件。业务逻辑计算每个单词的数量,并在状态存储中存储一段时间窗口(在本例中为 5 秒)内的总计数。生成的 KStream
包含该时间窗口内单词及其对应的计数。
此示例的完整 版本。
Josh Long(@starbuxman)制作了一个屏幕录像,详细介绍了 Kafka Streams 绑定支持的各种功能。
熟悉 Spring Cloud Stream(例如:@EnableBinding
和 @StreamListener
)的开发人员可以使用 Kafka Streams API 将其扩展到构建有状态应用程序。
开发人员可以利用框架的传入和传出内容类型转换,或切换到 Kafka 提供的原生 SerDe。
将现有的 Kafka Streams 工作负载移植到独立的云原生应用程序中,并能够使用 Spring Cloud Data Flow 将它们编排为连贯的数据管道。
应用程序按原样运行 - 不受任何云平台供应商的锁定。
Kafka Streams 和 Kafka 绑定器的 MessageChannel
绑定之间的互操作性
多个 Kafka Streams 类型(如 KStream
和 KTable
)作为处理程序参数
传入和传出流的内容类型转换
属性切换以在框架与本机 Kafka SerDe 之间切换,用于传入和传出消息转换
错误处理支持
反序列化错误记录的死信队列 (DLQ) 支持
分支支持
交互式查询支持
Kafka Streams 绑定器允许您发送到多个输出主题(Kafka Streams 中的分支 API)。
以下是此类方法的概述。
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<String, String>[] process(KStream<Object, String> input) {
...
}
请注意,方法上的返回类型是 KStream[]
。有关此工作原理的更多详细信息,请参阅此 示例。
Kafka Streams 绑定器还允许您绑定到 KStream
和 KTable
目标类型的多个输入,如下例所示
@StreamListener
public void process(@Input("input") KStream<String, PlayEvent> playEvents,
@Input("inputX") KTable<Long, Song> songTable) {
...
}
请注意方法参数列表中使用了多个输入。在这里,您可以看到两个 @Input
注解 - 一个用于 KStream
,另一个用于 KTable
。
此 示例 的工作版本。
类似于基于MessageChannel
的绑定实现,Kafka Streams绑定也支持传入和传出流的内容类型转换。任何其他类型的數據序列化完全由Kafka Streams本身处理。可以禁用框架提供的内容类型转换。相反,您可以使用Kafka Streams提供的SerDe功能将责任完全委托给Kafka。
当依赖Kafka Streams绑定进行内容类型转换时,它仅应用于消息中的“值”(即有效负载)。“键”始终由Kafka SerDe转换。
有关Kafka Streams绑定中如何处理内容类型协商和序列化的详细信息,请参阅文档。
Kafka Streams库内置支持处理反序列化异常(KIP-161)。除了原生反序列化错误处理支持外,Kafka Streams绑定还提供支持将错误有效负载路由到DLQ。有关详细信息,请参阅此文档部分。
这是一个示例,演示了Kafka Streams绑定中的DLQ功能。
Kafka Streams允许您从应用程序交互式查询状态存储,这可以用来深入了解正在进行的流数据。Kafka Streams绑定API公开了一个名为QueryableStoreRegistry
的类。您可以通过注入此bean(可能通过自动装配)在您的应用程序中将其作为Spring bean访问,如下例所示
@Autowired
QueryableStoreRegistry queryableStoreRegistry;
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("my-store",
QueryableStoreTypes.keyValueStore());
以下是一些基本和高级示例,演示了通过绑定进行交互式查询的功能。
如果应用程序用例需要同时使用基于MessageChannel
的Kafka绑定和Kafka Streams绑定,则它们都可以在同一个应用程序中使用。在这种情况下,您可以有多个StreamListener
方法,或者源和接收器/处理器类型方法的组合。以下应用程序示例显示了如何使用多个StreamListener
方法来定位各种类型的绑定
@StreamListener("binding2")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
}
@StreamListener("binding1")
public void sink(String input) {
}
interface MultipleProcessor {
String BINDING_1 = "binding1";
String BINDING_2 = "binding2";
String OUTPUT = "output";
@Input(BINDING_1)
SubscribableChannel binding1();
@Input(BINDING_2)
KStream<?, ?> binding2();
@Output(OUTPUT)
KStream<?, ?> output();
}
在此示例中,第一个方法是Kafka Streams处理器,第二个方法是常规的基于MessageChannel
的消费者。虽然您可以有多个具有不同目标类型(MessageChannel
与Kafka Stream类型)的方法,但在单个方法中无法混合使用这两种类型。
在本文中,我们看到了通过Spring Cloud Stream Kafka Streams绑定公开的高级结构和使用示例。除了允许使用Spring Cloud Stream的基于MessageChannel
的绑定外,此绑定实现还使我们能够一致地开发、测试和生成有状态的应用程序。
查看项目页面和文档。与往常一样,我们欢迎反馈和贡献,因此请通过GitHub、Stack Overflow和Gitter与我们联系。