使用 Spring Cloud Stream 和 Apache Kafka Streams 进行流处理。第 3 部分 - 数据反序列化和序列化

工程 | Soby Chacko | 2019 年 12 月 4 日 | ...

第 1 部分 - 编程模型 第 2 部分 - 编程模型续

继前两篇博客文章之后,在本系列关于使用 Spring Cloud Stream 和 Kafka Streams 编写流处理应用程序的文章中,我们现在将深入探讨这些应用程序如何处理入站数据反序列化和出站数据序列化的细节。

Kafka Streams 中的所有三种主要高级类型 - KStream<K,V>KTable<K,V>GlobalKTable<K,V> - 都使用键和值进行处理。

借助 Spring Cloud Stream Kafka Streams 支持,键始终使用原生 Serde 机制进行反序列化和序列化。Serde 是一个容器对象,它提供一个反序列化器和一个序列化器。

另一方面,值则使用 Serde 或绑定器提供的消息转换进行封送处理。从绑定器 3.0 版本开始,使用 Serde 是默认方法。在 Spring 中使用消息转换器是一项可选功能,仅在特殊情况下才需要使用。

让我们看一下这个处理器

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
  return (userClicksStream, userRegionsTable) -> (userClicksStream
        .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                    "UNKNOWN" : region, clicks),
              Joined.with(Serdes.String(), Serdes.Long(), null))
        .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
              regionWithClicks.getClicks()))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(Long::sum)
        .toStream());
}

这是我们在上一篇博客中看到的同一个处理器。它有两个输入和一个输出。第一个输入绑定是 KStream<String, Long>。键类型是 String,值类型是 Long。下一个输入绑定是 KTable<String, String>。这里,键和值都是 String 类型。最后,输出绑定是 KStream<String, Long>,键是 String,值是 Long

通常,您必须告诉应用程序在应用程序配置中要使用正确的 Serde。但是,使用 Kafka Streams 绑定器时,对于大多数标准类型,此信息会被推断出来,您无需提供任何特殊配置。

绑定器推断的类型是 Kafka Streams 提供开箱即用 Serde 实现的类型。这些类型是

  • Integer
  • Long
  • Short
  • Double
  • Float
  • Byte[]
  • UUID
  • String

换句话说,如果您的 KStreamKTableGlobalKTable 的键和值类型是这些类型之一,您无需提供任何特殊的 Serde 配置。

提供 Serde 对象作为 Spring Beans

如果类型不属于这些类型之一,您可以提供一个类型为 Serde<T> 的 bean,并且如果泛型类型 T 与实际类型匹配,绑定器将把该 bean 作为 Serde 进行委托。

例如,假设您有以下函数签名

@Bean
publicFunction<KStream<CustomKey, AvroIn>, KStream<CustomKey, AvroOut>> process() {

}

那么,键和值的类型与任何已知的 Serde 实现都不匹配。在这种情况下,您有两种选择。推荐的方法是提供一个 Serde bean,如下所示

@Bean
public Serde<CustomKey> customKeySerde(){ 
  	return new CustomKeySerde();
}

@Bean
public Serde<AvroIn> avroInSerde(){ 
  	final SpecificAvroSerde<AvroIn> avroInSerde = new SpecificAvroSerde<>();
avroInSerde.configure(...);
return avroInSerde;

}

@Bean
public Serde<AvroOut> avroInSerde(){ 
 	final SpecificAvroSerde<AvroOut> avroOutSerde = new SpecificAvroSerde<>();
avroOutSerde.configure(...);
return avroOutSerde;
}

通过配置提供 Serde

如果您不想将 Serde 作为通过编程创建的 Spring beans 提供,您也可以使用配置来定义它们,在配置中传递 Serde 实现类的完全限定名,如下所示

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

顺便说一下,即使您有匹配的 beans,像这样设置 Serde 也将具有更高的优先级,因为这些配置是设置在实际的消费者和生产者绑定上的。绑定器赋予它优先权是因为用户显式请求了它。

默认 Serde 并回退到 JsonSerde

此时,如果绑定器仍然无法匹配任何 Serde,它会寻找一个默认的来匹配。

如果所有方法都无法匹配,绑定器将回退到 Spring for Apache Kafka 项目提供的 JsonSerde 实现。如果您不使用上述任何机制并让绑定器回退到 JsonSerde,则必须确保类是 JSON 友好的。

在实际业务逻辑内部使用的 Serde

Kafka Streams 有几个 API 方法需要访问 Serde 对象。例如,看看前面 BiFunction 示例处理器中的 joinedgroupBy 方法调用。这实际上是应用程序开发者的责任,因为绑定器在这种情况下无法提供任何推断帮助。换句话说,绑定器对 Serde 推断、将 Serde 与提供的 bean 匹配等支持仅应用于应用程序的边缘,即输入或输出绑定处。可能会产生混淆,因为当您使用绑定器开发 Kafka Streams 应用程序时,您可能认为绑定器会完全隐藏 Serde 的复杂性,但这是一种错误的印象。绑定器仅在消费和生产时为您提供 Serde 方面的帮助。您的业务逻辑实现所需的任何 Serde 仍然需要由应用程序提供。

总结

在这篇博客文章中,我们概述了 Spring Cloud Stream 的 Kafka Streams 绑定器如何帮助您进行数据反序列化和序列化。绑定器可以推断输入和输出绑定上使用的键和值类型。我们看到默认始终使用原生 Serde 机制,但绑定器提供了禁用此功能并将任务委托给 Spring 的消息转换器的选项(如果需要)。我们还了解到,您的业务逻辑实现所需的任何 Serde 仍然需要由应用程序提供。

在下一篇博客文章中,我们将探讨 Kafka Streams 提供的用于消息反序列化和生产的各种错误处理机制以及绑定器如何支持它们。

订阅 Spring 邮件列表

订阅 Spring 邮件列表,保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举办的活动

查看 Spring 社区的所有即将举办的活动。

查看全部