领先一步
VMware 提供培训和认证,以加快您的进度。
了解更多这是博客文章系列的第一篇,我们将探讨如何使用Spring Cloud Stream和Kafka Streams编写流处理应用程序。
Spring Cloud Stream Horsham 版本 (3.0.0) 对应用程序如何使用Kafka和Kafka Streams绑定器利用Apache Kafka进行了多项更改。此版本带来的主要改进之一是通过使用完全函数式编程范式编写应用程序的一流支持。本博文介绍了如何使用这种函数式编程模型来开发使用Spring Cloud Stream和Kafka Streams的流处理应用程序。在本系列后续的博客文章中,我们将更详细地探讨。
这是一个经常令人困惑的问题:如果我想编写基于Apache Kafka的应用程序,我应该使用哪个绑定器?Spring Cloud Stream为Kafka提供了两个独立的绑定器——spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams。顾名思义,如果您想编写想要使用普通Kafka生产者和消费者的标准事件驱动应用程序,则可以使用第一个绑定器。另一方面,如果您想使用Kafka Streams库开发流处理应用程序,请使用第二个绑定器。同样,在本博文中,我们将重点关注Kafka Streams的第二个绑定器。
关于此博客系列的一般说明。这主要关注Spring Cloud Stream和Kafka Streams之间的接触点,并没有深入探讨Kafka Streams本身的细节。为了编写使用Kafka Streams的非平凡流处理应用程序,强烈建议深入了解Kafka Streams库。本系列仅停留在实际Kafka Streams库的边缘,主要关注如何从Spring Cloud Stream的角度与之交互。
从本质上讲,所有Spring Cloud Stream应用程序都是Spring Boot应用程序。为了启动一个新项目,请访问Spring Initializr,然后创建一个新项目。选择“Cloud Stream”和“Spring for Apache Kafka Streams”作为依赖项。这将生成一个包含开始开发应用程序所需所有组件的项目。以下是Initializr的屏幕截图,其中选择了基本依赖项。
这是一个非常基本但功能齐全的Kafka Streams应用程序,它是使用Spring Cloud Stream的功能性编程支持编写的
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
如您所见,这是一个非常简单的应用程序,它只是打印到标准输出,但它仍然是一个完整的Kafka Streams应用程序。在外部层,我们使用@SpringBootApplication
注解表明这是一个引导应用程序。然后,我们提供一个java.util.function.Consumer
bean,我们在其中通过lambda表达式封装应用程序的逻辑。消费者以KStream作为输入,键和值都表示为字符串类型。
就是这样。您可以针对Kafka代理运行此应用程序并查看其运行情况。在幕后,Spring Cloud Stream的Kafka Streams绑定器会将其转换为一个具有StreamsBuilder
、Kafka Streams拓扑等的适当Kafka Streams应用程序。Spring Cloud Stream的主要原则之一是向用户隐藏复杂性和样板代码,以便应用程序开发人员可以专注于手头的业务问题。绑定器将负责创建Kafka Streams拓扑,连接到Kafka集群,绑定到主题并从该Kafka主题(在本例中绑定为KStream)消费数据。通常,如果他们不使用Spring Cloud Stream等框架,则应用程序开发人员有责任执行所有这些操作。
如果您了解Kafka Streams内部机制,您可能想知道上面介绍的内容是否有效。我们没有提供Kafka Streams所需的一些基本内容(例如集群信息、应用程序ID、要消费的主题、要使用的Serdes等)。简短的答案是,这将在无需提供任何配置属性的情况下工作。这是因为绑定器将使用许多合理的默认值并就从哪些主题消费数据等方面提出意见。但是,对于生产环境使用,如果绑定器使用的默认值不合理,我们建议提供所有适用的属性。
让我们来看一下Kafka Streams需要的一些基本内容以及绑定器如何为它们提供默认值。
默认情况下,绑定器将尝试连接到在localhost:9092上运行的集群。如果不是这种情况,您可以使用Spring Cloud Stream提供的配置属性来覆盖它。请参阅Spring Cloud Stream参考指南。
在Kafka Streams应用程序中,application.id是必填字段。没有它,您将无法启动Kafka Streams应用程序。默认情况下,绑定器将生成一个应用程序ID并将其分配给处理器。它使用函数bean名称作为前缀。例如,如果您有如上的消费者,绑定器将生成应用程序ID为process-applicationId。您可以使用此处概述的策略来覆盖此设置。请参阅Spring Cloud Stream参考指南。
对于上述处理器,您可以按如下方式提供要消费的主题:
spring.cloud.stream.bindings.process-in-0.destination: my-input-topic
在本例中,我们指的是,对于函数 bean(process)及其第一个输入 (in-0),它应该绑定到名为my-input-topic的 Kafka 主题。如果您没有提供这样的显式目标,绑定器将假设您使用的是与绑定名称相同的主题(在本例中为process-in-0)。
Kafka Streams 使用一个名为 Serde 的特殊类来处理数据编组。它本质上是对入站的反序列化器和出站的序列化器的封装。通常,您必须告诉 Kafka Streams 对每个消费者使用什么 Serde。但是,Binder 通过使用作为 Kafka Streams 部分提供的参数类型来推断此信息。例如,在KStream<String, String> 的情况下,绑定器假设它需要使用 String 反序列化器。与往常一样,您可以通过多种方式覆盖这些设置。
可以。Spring Cloud Stream Kafka Streams 绑定器可以轻松地提供多个处理器,这些处理器在单个应用程序中表示为java.util.function.Function或java.util.function.Consumer bean。绑定器会将每个这样的处理器隔离到其自己的应用程序 ID 和 StreamsBuilder。它确保它们之间不会相互干扰。从 Kafka Streams 的角度来看,它们是具有自己专用拓扑的多个处理器。虽然在测试和快速尝试某些功能时,这是一个合理的用例,但在单个应用程序中拥有多个处理器可能会导致其成为难以维护的单体应用。
在这篇博文中,我们快速介绍了如何使用 Spring Cloud Stream 的函数式编程支持来编写使用 Kafka Streams 的流处理应用程序。我们看到绑定器负责许多基础设施和配置细节,这使您可以专注于手头的业务逻辑。在下一篇文章中,我们将进一步探讨这种编程模型,以了解如何使用 Spring Cloud Stream 和 Kafka Streams 开发更复杂的流处理应用程序。