使用Spring Cloud Stream和Apache Kafka Streams进行流处理。第3部分 - 数据反序列化和序列化

工程 | Soby Chacko | 2019年12月4日 | ...

第1部分 - 编程模型 第2部分 - 编程模型续

在本系列关于使用 Spring Cloud Stream 和 Kafka Streams 编写流处理应用程序的前面两篇博文中,我们将探讨这些应用程序如何处理入站的反序列化和出站的序列化。

Kafka Streams 中所有三种主要的更高级别的类型——KStream<K,V>KTable<K,V>GlobalKTable<K,V>——都使用键和值。

使用 Spring Cloud Stream Kafka Streams 支持,键始终使用本机 Serde 机制进行反序列化和序列化。Serde 是一个容器对象,它提供反序列化器和序列化器。

另一方面,值是使用 Serde 或绑定器提供的消息转换进行封送处理的。从绑定器的 3.0 版本开始,使用 Serde 是默认方法。在 Spring 中使用消息转换器是一个可选功能,您只需要在特殊情况下使用它。

让我们看看这个处理器

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
  return (userClicksStream, userRegionsTable) -> (userClicksStream
        .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                    "UNKNOWN" : region, clicks),
              Joined.with(Serdes.String(), Serdes.Long(), null))
        .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
              regionWithClicks.getClicks()))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(Long::sum)
        .toStream());
}

这是我们在上一篇博文中看到的同一个处理器。它有两个输入和一个输出。第一个输入绑定是 KStream<String, Long>。键的类型为 String,值是 Long。下一个输入绑定是 KTable<String, String>。这里,键和值都是 String 类型。最后,输出绑定是 KStream<String, Long>,键为 String,值为 Long

通常,您必须告诉应用程序使用正确的 Serde 作为应用程序配置的一部分。但是,当使用 Kafka Streams 绑定器时,对于大多数标准类型,此信息都是推断出来的,您不需要提供任何特殊配置。

绑定器推断的类型是 Kafka Streams 为其提供开箱即用 Serde 实现的那些类型。这些类型是

  • 整数 (Integer)
  • 长整型 (Long)
  • 短整型 (Short)
  • 双精度浮点型 (Double)
  • 单精度浮点型 (Float)
  • 字节数组 (Byte[])
  • UUID
  • 字符串 (String)

换句话说,如果您的 KStreamKTableGlobalKTable 将这些作为键和值的类型,则您不需要提供任何特殊的 Serde 配置。

将 Serde 对象作为 Spring Bean 提供

如果类型不是来自这些类型中的任何一种,您可以提供 Serde<T> 类型的 bean,并且如果泛型类型 T 与实际类型匹配,绑定器将将其委派为 Serde

例如,假设您具有以下函数签名

@Bean
publicFunction<KStream<CustomKey, AvroIn>, KStream<CustomKey, AvroOut>> process() {

}

然后,键和值的类型与任何已知的 Serde 实现都不匹配。在这种情况下,您有两个选择。推荐的方法是提供一个 Serde bean,如下所示

@Bean
public Serde<CustomKey> customKeySerde(){ 
  	return new CustomKeySerde();
}

@Bean
public Serde<AvroIn> avroInSerde(){ 
  	final SpecificAvroSerde<AvroIn> avroInSerde = new SpecificAvroSerde<>();
avroInSerde.configure(...);
return avroInSerde;

}

@Bean
public Serde<AvroOut> avroInSerde(){ 
 	final SpecificAvroSerde<AvroOut> avroOutSerde = new SpecificAvroSerde<>();
avroOutSerde.configure(...);
return avroOutSerde;
}

通过配置提供 Serde

如果您不想以编程方式创建 Spring bean 来提供 Serde,您也可以使用配置来定义这些 bean,其中您传递 Serde 实现类的完全限定名称,如下所示

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

顺便说一句,即使您拥有匹配的 bean,这样设置 Serde 也会具有更高的优先级,因为这些配置是在实际的使用者和生产者绑定上设置的。绑定器赋予它优先级,因为用户明确请求了它。

默认 Serde 并回退到 JsonSerde

此时,如果绑定器仍然无法匹配任何 Serde,它会查找一个默认的匹配项。

如果所有方法都无法匹配一个,绑定器将回退到 Spring 为 Apache Kafka 项目提供的 JsonSerde 实现。如果您不使用上述任何机制并让绑定器回退到 JsonSerde,则必须确保这些类是 JSON 友好的。

在实际业务逻辑中使用的 Serde

Kafka Streams 有几种 API 方法需要访问 Serde 对象。例如,查看早期 BiFunction 示例处理器中的方法调用 joinedgroupBy。这实际上是应用程序开发人员的责任,因为绑定器无法在此类情况下提供任何推断。换句话说,对 Serde 推断的绑定器支持、将 Serde 与提供的 bean 匹配等等仅应用于应用程序的边缘,即输入或输出绑定。可能会出现混淆,因为当您使用绑定器开发 Kafka Streams 应用程序时,您可能会认为绑定器将完全隐藏 Serde 的复杂性,这是一个错误的印象。绑定器仅在使用和生产时帮助您使用 Serde。您的业务逻辑实现所需的任何 Serde 仍然需要由应用程序提供。

总结

在这篇博文中,我们概述了 Spring Cloud Stream 的 Kafka Streams 绑定器如何帮助您对数据进行反序列化和序列化。绑定器可以推断输入和输出绑定中使用的键和值类型。我们看到默认情况下始终使用本机 Serde 机制,但绑定器允许您禁用此功能并根据需要委托给 Spring 的消息转换器。我们还发现,您的业务逻辑实现所需的任何 Serde 仍然需要由应用程序提供。

在下一篇博文中,我们将探讨 Kafka Streams 为消息的反序列化和生产提供的各种错误处理机制以及绑定器如何支持它们。

获取 Spring Newsletter

通过 Spring Newsletter 保持联系

订阅

领先一步

VMware 提供培训和认证,以加快您的进度。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部