先行一步
VMware 提供培训和认证,助力您的飞速发展。
了解更多紧随上一篇博客的脚步,我们在其中介绍了使用 Spring Cloud Stream 和 Kafka Streams 编写流应用程序的基本函数式编程模型,在这一部分,我们将进一步探索该编程模型。
让我们看几个场景。
如果您的应用程序从单个输入绑定消费数据并将其生产到输出绑定,则可以使用 Java 的 Function 接口来实现。请记住,这里的绑定不一定映射到单个输入 Kafka 主题,因为主题可以多路复用并连接到单个输入绑定(在单个绑定上配置逗号分隔的多个主题 - 请参阅下面的示例)。对于出站情况,绑定此处映射到一个主题。
这是一个处理器示例
请注意,此处理器中的实际业务逻辑实现以 lambda 表达式的形式给出。
@Bean
public Function<KStream<Object, String>, KStream<String, WordCount>> wordcount() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("wordcount-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
查看处理器的返回签名。它是一个 Function<KStream<Object, String>, KStream<String, WordCount>>
。该处理器消费一个 KStream
并生成另一个 KStream
。在底层,绑定器使用传入的 Kafka 主题来消费数据,然后将其提供给此输入 KStream
。类似地,在出站端,绑定器将数据作为 KStream
生成,这些数据将被发送到传出的 Kafka 主题。
以下是如何为该处理器提供输入主题
spring.cloud.stream.bindings.wordcount-in-0.destination=words
对于多路复用主题的情况,您可以使用此配置
spring.cloud.stream.bindings.wordcount-in-0.destination=words1,words2,word3
输出主题可以配置如下
spring.cloud.stream.bindings.wordcount-out-0.destination=counts
Kafka Streams 允许您使用称为分支(branching)的功能将数据发送到出站端的多个主题。本质上,它使用谓词(predicate)作为分支到多个主题的基础。这与上面的示例基本相同,但主要区别在于出站端以 KStream[] 的形式提供。
这是一个处理器示例
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> wordcount() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
}
请注意函数的第二个参数类型。它以 KStream[] 的形式提供。
您可以为这些绑定提供单独的输出主题
spring.cloud.stream.bindings.wordcount-out-0.destination=output1
spring.cloud.stream.bindings.wordcount-out-1.destination=output2
spring.cloud.stream.bindings.wordcount-out-2.destination=output3
当您有两个输入绑定和一个输出绑定时,可以将处理器表示为 java.util.function.BiFunction
类型的 bean。这是一个示例
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
BiFunction
有两个输入和一个输出。第一个输入是 KStream
,第二个输入是 KTable
,而输出是另一个 KStream
。如果您想在出站端拥有多个 KStream
,可以将类型签名更改为 KStream[]
,然后进行必要的实现更改。
场景 4:两个输入绑定且没有输出绑定
如果您只有两个输入绑定但没有输出,则可以使用 Java 的 BiConsumer
支持。可能的用例是您不想生成输出,而是更新一些状态存储。这是一个示例
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {
userClicksStream.foreach((key, value) -> latch.countDown());
userRegionsTable.toStream().foreach((key, value) -> latch.countDown());
};
}
如果您有三个、四个或 n 个输入绑定怎么办?在这种情况下,您不能依赖 Function 或 BiFunction 方法。您需要依赖部分应用的函数(partially applied functions)。基本上,您从一个 Function 开始,然后在此第一个函数的出站端,您提供另一个 Function 或 Consumer,直到您用完输入。这种部分应用函数的技术在函数式编程术语中通常称为函数柯里化(function currying)。这是一个使用三个输入和一个输出的示例
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
return orderStream -> (
customers -> (
products -> (
orderStream.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
仔细检查处理器的类型签名。
Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>>
我们从一个将 KStream
作为输入的函数开始,但第二个参数(此函数的输出)是另一个以 GlobalKTable
作为输入的 Function
。这个第二个 Function 的输出是另一个函数,该函数以另一个 GlobalKTable
作为输入。这个第三个函数用完了我们的输入,并且该函数的输出是一个 KStream
,它将用于输出绑定。
让我们从数学角度看这个模型。
我们将这三个函数称为 f(x)
、f(y)
和 f(z)
。
如果我们展开这些函数,它将看起来像这样
f(x) -> f(y) -> f(z) -> KStream<Long, EnrichedOrder>
.
x 变量代表 KStream<Long, Order>
,y 变量代表 GlobalKTable<Long, Customer>
,z 变量代表 GlobalKTable<Long, Product>
。
第一个函数 f(x)
对应应用程序的第一个输入绑定(KStream<Long, Order>
),其输出是函数 f(y)
。
函数 f(y)
对应应用程序的第二个输入绑定(GlobalKTable<Long, Customer>
),其输出是另一个函数 f(z)
。
函数 f(z)
的输入是应用程序的第三个输入(GlobalKTable<Long, Product>
),其输出是 KStream<Long, EnrichedOrder>
,这是应用程序的最终输出绑定。
来自三个部分函数(分别是 KStream
、GlobalKTable
、GlobalKTable
)的输入在方法体内可用,用于实现作为 lambda 表达式一部分的业务逻辑。
请记住,如上所述在 Java 中使用函数柯里化处理超过合理数量的输入(例如上面示例中的三个)可能会导致代码可读性问题。因此,您必须仔细评估和分解您的应用程序,以确定在单个处理器中拥有更多输入绑定的适当性。
在这篇博客文章中,我们快速回顾了可以在基于 Spring Cloud Stream 的 Kafka Streams 应用程序中使用的各种函数式编程模型。我们看到了如何使用 java.util.function.Function
(或在上一篇博客中看到的 Consumer
)、java.util.function.BiFunction
和 BiConsumer.
我们还看到了如何通过使用 Kafka Stream 的分支功能(它提供一个 KStream
数组作为输出)来支持出站端的多个绑定。最后,我们看到了如何通过部分应用(柯里化)函数来支持多于两个的输入绑定。在下一篇博客文章中,我们将了解 Kafka Streams 绑定器如何执行数据反序列化和序列化。