抢占先机
VMware 提供培训和认证,助您加速进步。
了解更多继前两篇博客文章之后,在本系列关于使用 Spring Cloud Stream 和 Kafka Streams 编写流处理应用程序的文章中,我们现在将深入探讨这些应用程序如何处理入站数据反序列化和出站数据序列化的细节。
Kafka Streams 中的所有三种主要高级类型 - KStream<K,V>
、KTable<K,V>
和 GlobalKTable<K,V>
- 都使用键和值进行处理。
借助 Spring Cloud Stream Kafka Streams 支持,键始终使用原生 Serde
机制进行反序列化和序列化。Serde
是一个容器对象,它提供一个反序列化器和一个序列化器。
另一方面,值则使用 Serde
或绑定器提供的消息转换进行封送处理。从绑定器 3.0 版本开始,使用 Serde
是默认方法。在 Spring 中使用消息转换器是一项可选功能,仅在特殊情况下才需要使用。
让我们看一下这个处理器
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这是我们在上一篇博客中看到的同一个处理器。它有两个输入和一个输出。第一个输入绑定是 KStream<String, Long>
。键类型是 String
,值类型是 Long
。下一个输入绑定是 KTable<String, String>
。这里,键和值都是 String
类型。最后,输出绑定是 KStream<String, Long>
,键是 String
,值是 Long
。
通常,您必须告诉应用程序在应用程序配置中要使用正确的 Serde
。但是,使用 Kafka Streams 绑定器时,对于大多数标准类型,此信息会被推断出来,您无需提供任何特殊配置。
绑定器推断的类型是 Kafka Streams 提供开箱即用 Serde
实现的类型。这些类型是
换句话说,如果您的 KStream
、KTable
或 GlobalKTable
的键和值类型是这些类型之一,您无需提供任何特殊的 Serde
配置。
如果类型不属于这些类型之一,您可以提供一个类型为 Serde<T>
的 bean,并且如果泛型类型 T
与实际类型匹配,绑定器将把该 bean 作为 Serde
进行委托。
例如,假设您有以下函数签名
@Bean
publicFunction<KStream<CustomKey, AvroIn>, KStream<CustomKey, AvroOut>> process() {
}
那么,键和值的类型与任何已知的 Serde
实现都不匹配。在这种情况下,您有两种选择。推荐的方法是提供一个 Serde
bean,如下所示
@Bean
public Serde<CustomKey> customKeySerde(){
return new CustomKeySerde();
}
@Bean
public Serde<AvroIn> avroInSerde(){
final SpecificAvroSerde<AvroIn> avroInSerde = new SpecificAvroSerde<>();
avroInSerde.configure(...);
return avroInSerde;
}
@Bean
public Serde<AvroOut> avroInSerde(){
final SpecificAvroSerde<AvroOut> avroOutSerde = new SpecificAvroSerde<>();
avroOutSerde.configure(...);
return avroOutSerde;
}
如果您不想将 Serde
作为通过编程创建的 Spring beans 提供,您也可以使用配置来定义它们,在配置中传递 Serde
实现类的完全限定名,如下所示
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
顺便说一下,即使您有匹配的 beans,像这样设置 Serde 也将具有更高的优先级,因为这些配置是设置在实际的消费者和生产者绑定上的。绑定器赋予它优先权是因为用户显式请求了它。
此时,如果绑定器仍然无法匹配任何 Serde
,它会寻找一个默认的来匹配。
如果所有方法都无法匹配,绑定器将回退到 Spring for Apache Kafka 项目提供的 JsonSerde 实现。如果您不使用上述任何机制并让绑定器回退到 JsonSerde
,则必须确保类是 JSON 友好的。
Kafka Streams 有几个 API 方法需要访问 Serde
对象。例如,看看前面 BiFunction
示例处理器中的 joined
或 groupBy
方法调用。这实际上是应用程序开发者的责任,因为绑定器在这种情况下无法提供任何推断帮助。换句话说,绑定器对 Serde
推断、将 Serde
与提供的 bean 匹配等支持仅应用于应用程序的边缘,即输入或输出绑定处。可能会产生混淆,因为当您使用绑定器开发 Kafka Streams 应用程序时,您可能认为绑定器会完全隐藏 Serde
的复杂性,但这是一种错误的印象。绑定器仅在消费和生产时为您提供 Serde
方面的帮助。您的业务逻辑实现所需的任何 Serde
仍然需要由应用程序提供。
在这篇博客文章中,我们概述了 Spring Cloud Stream 的 Kafka Streams 绑定器如何帮助您进行数据反序列化和序列化。绑定器可以推断输入和输出绑定上使用的键和值类型。我们看到默认始终使用原生 Serde
机制,但绑定器提供了禁用此功能并将任务委托给 Spring 的消息转换器的选项(如果需要)。我们还了解到,您的业务逻辑实现所需的任何 Serde
仍然需要由应用程序提供。
在下一篇博客文章中,我们将探讨 Kafka Streams 提供的用于消息反序列化和生产的各种错误处理机制以及绑定器如何支持它们。