领先一步
VMware 提供培训和认证,助您加速进步。
了解更多这是我们一系列博客文章的第一篇,我们将在此探讨如何使用 Spring Cloud Stream 和 Kafka Streams 编写流处理应用程序。
Spring Cloud Stream Horsham 版本 (3.0.0) 引入了多项变更,以改进应用程序利用 Apache Kafka 的方式,通过 Kafka 和 Kafka Streams 绑定器实现。此版本的主要增强功能之一是支持使用功能性编程范式来编写应用程序。这篇博文将介绍如何使用 Spring Cloud Stream 和 Kafka Streams 的这种函数式编程模型来开发流处理应用程序。在本系列的后续博文中,我们将深入探讨更多细节。
这是一个常常令人困惑的问题:如果我想基于 Apache Kafka 编写应用程序,应该使用哪种绑定器?Spring Cloud Stream 提供了两个独立的 Kafka 绑定器:spring-cloud-stream-binder-kafka 和 spring-cloud-stream-binder-kafka-streams。顾名思义,第一个绑定器适用于您想编写标准事件驱动型应用程序,并使用普通 Kafka 生产者和消费者的情况。另一方面,如果您想使用 Kafka Streams 库开发流处理应用程序,则应使用第二个绑定器。在此博文中,我们将再次专注于第二个 Kafka Streams 绑定器。
关于本系列博客的一般说明。此系列主要关注 Spring Cloud Stream 和 Kafka Streams 之间的交互点,而不深入探讨 Kafka Streams 本身。要编写使用 Kafka Streams 的非平凡流处理应用程序,强烈建议深入理解 Kafka Streams 库。本系列仅停留在实际 Kafka Streams 库的外围,主要关注如何从 Spring Cloud Stream 的角度与它进行交互。
本质上,所有 Spring Cloud Stream 应用程序都是 Spring Boot 应用程序。要引导一个新项目,请访问 Spring Initializr,然后创建一个新项目。选择 “Cloud Stream” 和 “Spring for Apache Kafka Streams” 作为依赖项。这将生成一个包含您开始开发应用程序所需的所有组件的项目。这是 Initializr 的一个屏幕截图,其中选择了基本依赖项。

这是一个非常基础但功能齐全的 Kafka Streams 应用程序,它使用 Spring Cloud Stream 的函数式编程支持编写而成。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
如您所见,这是一个非常简单的应用程序,它只向标准输出打印内容,但它是一个功能齐全的 Kafka Streams 应用程序。在外层,我们使用 @SpringBootApplication 注解来表明这是一个 Spring Boot 应用程序。然后,我们提供一个 java.util.function.Consumer Bean,通过 lambda 表达式封装应用程序的逻辑。该 Consumer 以 KStream 作为输入,其中键和值都表示为 String 类型。
就是这样。您可以针对 Kafka Broker 运行此应用程序,并查看其运行情况。在后台,Spring Cloud Stream 的 Kafka Streams 绑定器会将其转换为一个标准的 Kafka Streams 应用程序,包含 StreamsBuilder、Kafka Streams 拓扑等。Spring Cloud Stream 的主要宗旨之一就是隐藏复杂性和样板代码,让应用程序开发人员能够专注于手头的业务问题。绑定器将负责创建 Kafka Streams 拓扑、连接到 Kafka 集群、绑定到主题以及从该 Kafka 主题中消费数据,在本例中,它被绑定为 KStream。通常,如果应用程序开发人员不使用 Spring Cloud Stream 这样的框架,他们需要负责完成所有这些工作。
如果您了解 Kafka Streams 的内部机制,您可能会怀疑上面展示的内容是否有效。我们没有提供 Kafka Streams 所需的一些基本内容(例如集群信息、应用程序 ID、要消费的主题、要使用的 Serdes 等)。简而言之,在不提供任何配置属性的情况下,这确实能够正常工作。这是因为绑定器将使用许多合理的默认值,并自行决定要从哪些主题进行消费等。尽管如此,对于生产环境,我们建议在绑定器的默认值不适用时提供所有适用的属性。
让我们看一下 Kafka Streams 所需的一些基本内容,以及绑定器如何为它们提供默认值。
默认情况下,绑定器将尝试连接到运行在 localhost:9092 的集群。如果不是这样,您可以通过 Spring Cloud Stream 可用的配置属性进行覆盖。请参阅 Spring Cloud Stream 参考指南。
在 Kafka Streams 应用程序中,application.id 是一个必需字段。没有它,您就无法启动 Kafka Streams 应用程序。默认情况下,绑定器将生成一个应用程序 ID 并将其分配给处理器。它使用函数 Bean 名称作为前缀。例如,如果您有如上的 Consumer,绑定器将生成应用程序 ID 为 process-applicationId。您可以使用此处概述的策略来覆盖此设置。请参阅 Spring Cloud Stream 参考指南。
对于上面的处理器,您可以如下提供要从中消费的主题:
spring.cloud.stream.bindings.process-in-0.destination: my-input-topic
在这种情况下,我们指定对于函数 Bean(process)及其第一个输入(in-0),它将绑定到一个名为 my-input-topic 的 Kafka 主题。如果您不提供像这样显式的目标,绑定器将假定您正在使用与绑定名称(在本例中为 process-in-0)相同的主题。
Kafka Streams 使用一种称为 Serde 的特殊类来处理数据编组。它本质上是入站反序列化器和出站序列化器的包装器。通常,您必须告诉 Kafka Streams 为每个 Consumer 使用哪个 Serde。然而,绑定器通过使用 Kafka Streams 中提供的参数化类型来推断此信息。例如,对于 KStream<String, String>,绑定器会假定它需要使用 String 反序列化器。一如既往,您可以通过多种方式覆盖这些设置。
是的,您可以。Spring Cloud Stream 的 Kafka Streams 绑定器可以方便地在一个应用程序中提供多个处理器,这些处理器表示为 java.util.function.Function 或 java.util.function.Consumer Bean。绑定器会将每个处理器隔离到自己的应用程序 ID 和 StreamsBuilder 中。它确保它们之间不会发生任何干扰。从 Kafka Streams 的角度来看,它们是具有各自专用拓扑的多个处理器。虽然这在进行测试和快速尝试某些功能时是一个合法的用例,但在单个应用程序中拥有多个处理器可能会使其成为一个难以维护的整体。
在这篇博文中,我们简要介绍了 Spring Cloud Stream 的函数式编程支持如何用于编写使用 Kafka Streams 的流处理应用程序。我们看到绑定器处理了大量的基础设施和配置细节,让您可以专注于手头的业务逻辑。在下一篇博文中,我们将进一步探讨这种编程模型,看看如何使用 Spring Cloud Stream 和 Kafka Streams 开发更多非平凡的流处理应用程序。