领先一步
VMware 提供培训和认证,以加快您的进度。
了解更多在本系列关于使用 Spring Cloud Stream 和 Kafka Streams 编写流处理应用程序的前面两篇博文中,我们将探讨这些应用程序如何处理入站的反序列化和出站的序列化。
Kafka Streams 中所有三种主要的更高级别的类型——KStream<K,V>
、KTable<K,V>
和 GlobalKTable<K,V>
——都使用键和值。
使用 Spring Cloud Stream Kafka Streams 支持,键始终使用本机 Serde
机制进行反序列化和序列化。Serde
是一个容器对象,它提供反序列化器和序列化器。
另一方面,值是使用 Serde
或绑定器提供的消息转换进行封送处理的。从绑定器的 3.0 版本开始,使用 Serde
是默认方法。在 Spring 中使用消息转换器是一个可选功能,您只需要在特殊情况下使用它。
让我们看看这个处理器
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这是我们在上一篇博文中看到的同一个处理器。它有两个输入和一个输出。第一个输入绑定是 KStream<String, Long>
。键的类型为 String
,值是 Long
。下一个输入绑定是 KTable<String, String>
。这里,键和值都是 String
类型。最后,输出绑定是 KStream<String, Long>
,键为 String
,值为 Long
。
通常,您必须告诉应用程序使用正确的 Serde
作为应用程序配置的一部分。但是,当使用 Kafka Streams 绑定器时,对于大多数标准类型,此信息都是推断出来的,您不需要提供任何特殊配置。
绑定器推断的类型是 Kafka Streams 为其提供开箱即用 Serde
实现的那些类型。这些类型是
换句话说,如果您的 KStream
、KTable
或 GlobalKTable
将这些作为键和值的类型,则您不需要提供任何特殊的 Serde
配置。
如果类型不是来自这些类型中的任何一种,您可以提供 Serde<T>
类型的 bean,并且如果泛型类型 T
与实际类型匹配,绑定器将将其委派为 Serde
。
例如,假设您具有以下函数签名
@Bean
publicFunction<KStream<CustomKey, AvroIn>, KStream<CustomKey, AvroOut>> process() {
}
然后,键和值的类型与任何已知的 Serde
实现都不匹配。在这种情况下,您有两个选择。推荐的方法是提供一个 Serde
bean,如下所示
@Bean
public Serde<CustomKey> customKeySerde(){
return new CustomKeySerde();
}
@Bean
public Serde<AvroIn> avroInSerde(){
final SpecificAvroSerde<AvroIn> avroInSerde = new SpecificAvroSerde<>();
avroInSerde.configure(...);
return avroInSerde;
}
@Bean
public Serde<AvroOut> avroInSerde(){
final SpecificAvroSerde<AvroOut> avroOutSerde = new SpecificAvroSerde<>();
avroOutSerde.configure(...);
return avroOutSerde;
}
如果您不想以编程方式创建 Spring bean 来提供 Serde
,您也可以使用配置来定义这些 bean,其中您传递 Serde
实现类的完全限定名称,如下所示
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
顺便说一句,即使您拥有匹配的 bean,这样设置 Serde 也会具有更高的优先级,因为这些配置是在实际的使用者和生产者绑定上设置的。绑定器赋予它优先级,因为用户明确请求了它。
此时,如果绑定器仍然无法匹配任何 Serde
,它会查找一个默认的匹配项。
如果所有方法都无法匹配一个,绑定器将回退到 Spring 为 Apache Kafka 项目提供的 JsonSerde 实现。如果您不使用上述任何机制并让绑定器回退到 JsonSerde
,则必须确保这些类是 JSON 友好的。
Kafka Streams 有几种 API 方法需要访问 Serde
对象。例如,查看早期 BiFunction
示例处理器中的方法调用 joined
或 groupBy
。这实际上是应用程序开发人员的责任,因为绑定器无法在此类情况下提供任何推断。换句话说,对 Serde
推断的绑定器支持、将 Serde
与提供的 bean 匹配等等仅应用于应用程序的边缘,即输入或输出绑定。可能会出现混淆,因为当您使用绑定器开发 Kafka Streams 应用程序时,您可能会认为绑定器将完全隐藏 Serde
的复杂性,这是一个错误的印象。绑定器仅在使用和生产时帮助您使用 Serde
。您的业务逻辑实现所需的任何 Serde
仍然需要由应用程序提供。
在这篇博文中,我们概述了 Spring Cloud Stream 的 Kafka Streams 绑定器如何帮助您对数据进行反序列化和序列化。绑定器可以推断输入和输出绑定中使用的键和值类型。我们看到默认情况下始终使用本机 Serde
机制,但绑定器允许您禁用此功能并根据需要委托给 Spring 的消息转换器。我们还发现,您的业务逻辑实现所需的任何 Serde
仍然需要由应用程序提供。
在下一篇博文中,我们将探讨 Kafka Streams 为消息的反序列化和生产提供的各种错误处理机制以及绑定器如何支持它们。