领先一步
VMware 提供培训和认证,助你加速进步。
了解更多继最近发布的 Spring Cloud Stream Elmhurst.RELEASE 之后,我们很高兴推出另一篇博客文章,专门介绍 Spring Cloud Stream 与 Apache Kafka Streams 库的原生集成。下面我们来回顾一下新的改进。
Spring Cloud Stream 框架使应用开发者能够编写事件驱动的应用,这些应用利用了 Spring Boot 和 Spring Integration 的坚实基础。所有这些的底层核心是 binder 实现,它负责应用和消息代理之间的通信。这些 binder 是基于 MessageChannel
的实现。
尽管 Spring Cloud Stream 建立的契约从编程模型角度来看得到了维护,但 Kafka Streams 绑定器不使用 MessageChannel
作为目标类型。该绑定器实现原生与 Kafka Streams 的“类型”进行交互——即 KStream
或 KTable
。应用可以直接使用 Kafka Streams 的原生类型,并在不妥协的情况下利用 Spring Cloud Stream 和 Spring 生态系统。
注意:Kafka Streams 绑定器并不能替代直接使用 Kafka Streams 库。
通过 Spring Initializr 是生成一个包含 Spring Cloud Stream Kafka Streams 应用所需组件的快速方法 - 见下方。
这里是一个使用 Spring Cloud Stream 和 Kafka Streams 编写的简单词频统计应用。
@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey()
.windowedBy(TimeWindows.of(5000)
.count(Materialized.as("wordcounts"))
.toStream()
.map((key, value) ->
new KeyValue<>(null, new WordCount(key.key(), value));
}
}
使用 @EnableBinding
注解和 KafkaStreamsProcessor
会告诉框架在 Kafka Streams 目标上执行绑定。你也可以使用自己的接口来实现多个“输入”和“输出”绑定。
@StreamListener
注解指示框架允许应用从绑定到“input”目标的 topic 中以 KStream
的形式消费事件。
process()
- 一个处理器,它接收包含文本数据的 KStream
中的事件。业务逻辑计算每个单词的出现次数,并在一个时间窗口(本例中为 5 秒)内将总计数存储到状态存储中。结果 KStream
包含该时间窗口内的单词及其对应的计数。
这里是该示例的完整版本。
Josh Long (@starbuxman) 制作了一个截屏视频,详细介绍了 Kafka Streams 绑定支持的各种特性。
熟悉 Spring Cloud Stream(例如:@EnableBinding
和 @StreamListener
)的开发者可以利用 Kafka Streams API 将其扩展到构建有状态应用。
开发者可以利用框架的内容类型转换功能进行入站和出站转换,也可以切换到 Kafka 提供的原生 SerDe。
将现有的 Kafka Streams 工作负载移植到独立的云原生应用中,并能够使用 Spring Cloud Data Flow 将它们编排成连贯的数据管道。
应用可按原样运行 - 不受任何云平台供应商锁定。
Kafka Streams 和 Kafka 绑定器的 MessageChannel
绑定之间的互操作性
支持多种 Kafka Streams 类型(例如 KStream
和 KTable
)作为 Handler 参数
入站和出站流的内容类型转换
用于在框架内容转换和原生 Kafka SerDe 之间切换的属性开关,用于入站和出站消息转换
错误处理支持
支持将反序列化错误的记录发送到死信队列 (DLQ)
分支支持
交互式查询支持
Kafka Streams 绑定器允许你发送到多个输出 topic(这是 Kafka Streams 中的 Branching API)。
下面是这类方法的大纲。
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<String, String>[] process(KStream<Object, String> input) {
...
}
注意,方法返回类型是 KStream[]
。有关其工作原理的更多详情,请参阅此示例。
Kafka Streams 绑定器还允许你绑定到多个 KStream
和 KTable
目标类型的输入,如下例所示
@StreamListener
public void process(@Input("input") KStream<String, PlayEvent> playEvents,
@Input("inputX") KTable<Long, Song> songTable) {
...
}
请注意方法参数列表中使用了多个输入。这里你可以看到两个 @Input
注解 - 一个用于 KStream
,另一个用于 KTable
。
这里是此示例的一个可运行版本。
与基于 MessageChannel
的绑定器实现类似,Kafka Streams 绑定器也支持入站和出站流的内容类型转换。任何其他类型的数据序列化完全由 Kafka Streams 本身处理。框架提供的边缘内容类型转换可以禁用。你可以选择完全将责任委托给 Kafka,使用 Kafka Streams 提供的 SerDe 功能。
当依赖 Kafka Streams 绑定器进行内容类型转换时,它仅应用于消息中的“value”(即 payload)。“keys”始终由 Kafka SerDe 进行转换。
关于内容类型协商和序列化如何在 Kafka Streams 绑定器中处理的详细信息,请参阅文档。
Kafka Streams 库内置了对处理反序列化异常的支持 (KIP-161)。除了原生的反序列化错误处理支持外,Kafka Streams 绑定器还提供了将错误 payload 路由到 DLQ 的支持。有关详情,请参阅此文档部分。
这里有一个示例,展示了 Kafka Streams 绑定器中的 DLQ 功能。
Kafka Streams 允许你从应用中交互式查询状态存储,这可用于获取关于进行中的流数据的洞察。Kafka Streams 绑定器 API 暴露了一个名为 QueryableStoreRegistry
的类。你可以通过注入此 bean(可能通过自动装配)来在应用中将其作为 Spring bean 访问,如下例所示
@Autowired
QueryableStoreRegistry queryableStoreRegistry;
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("my-store",
QueryableStoreTypes.keyValueStore());
这里有基础和高级示例,演示了通过绑定器实现的交互式查询功能。
如果应用场景需要同时使用基于 MessageChannel
的 Kafka 绑定器和 Kafka Streams 绑定器,两者可以在同一个应用中使用。在这种情况下,你可以有多个 StreamListener
方法,或者源方法与 sink/处理器类型方法的组合。下面的应用示例展示了如何使用多个 StreamListener
方法来针对各种类型的绑定
@StreamListener("binding2")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
}
@StreamListener("binding1")
public void sink(String input) {
}
interface MultipleProcessor {
String BINDING_1 = "binding1";
String BINDING_2 = "binding2";
String OUTPUT = "output";
@Input(BINDING_1)
SubscribableChannel binding1();
@Input(BINDING_2)
KStream<?, ?> binding2();
@Output(OUTPUT)
KStream<?, ?> output();
}
在此示例中,第一个方法是 Kafka Streams 处理器,第二个方法是常规的基于 MessageChannel
的消费者。尽管你可以拥有多个具有不同目标类型(MessageChannel
vs Kafka Stream 类型)的方法,但不能在单个方法中混合使用这两种类型。
在本文中,我们探讨了通过 Spring Cloud Stream Kafka Streams 绑定器暴露的高级构造和使用示例。除了允许使用 Spring Cloud Stream 基于 MessageChannel
的绑定器外,此绑定器实现还使我们能够持续地开发、测试和生产有状态应用。
请访问项目页面和文档。一如既往,我们欢迎反馈和贡献,请通过 GitHub、Stack Overflow 和 Gitter 与我们联系。