领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多在我们介绍了使用 Spring Cloud Stream 和 Kafka Streams 编写流应用程序的基本函数式编程模型的上一篇博客之后,在本部分中,我们将进一步探讨该编程模型。
让我们来看几个场景。
如果您的应用程序从单个输入绑定消费数据并向输出绑定生成数据,则可以使用 Java 的 Function 接口来实现。请记住,此处的绑定并不一定映射到单个输入 Kafka 主题,因为主题可以多路复用并附加到单个输入绑定(在单个绑定上配置用逗号分隔的多个主题 - 请参见下面的示例)。在输出的情况下,绑定在此处映射到单个主题。
这是一个示例处理器
请注意,实际的业务逻辑实现在此处理器中以 lambda 表达式的形式给出。
@Bean
public Function<KStream<Object, String>, KStream<String, WordCount>> wordcount() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("wordcount-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
查看处理器的返回签名。它是 Function<KStream<Object, String>, KStream<String, WordCount>>
。处理器消费一个 KStream
并生成另一个 KStream
。在后台,绑定器使用传入的 Kafka 主题来消费数据,然后将其提供给此输入 KStream
。类似地,在输出端,绑定器将数据作为 KStream
生成,该数据将发送到传出的 Kafka 主题。
您可以这样向此处理器提供输入主题
spring.cloud.stream.bindings.wordcount-in-0.destination=words
对于多路复用主题,您可以使用以下配置
spring.cloud.stream.bindings.wordcount-in-0.destination=words1,words2,word3
输出主题可以如下配置
spring.cloud.stream.bindings.wordcount-out-0.destination=counts
Kafka Streams 允许您通过使用称为分支的功能向多个主题发送数据。本质上,它使用谓词作为匹配的基础来分支到多个主题。这与上面的示例非常相似,但主要区别在于输出端提供为 KStream[]。
这是一个示例处理器
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> wordcount() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
}
请注意函数的第二个参数类型。它被提供为 KStream[]。
您可以为这些绑定提供各个输出主题
spring.cloud.stream.bindings.wordcount-out-0.destination=output1
spring.cloud.stream.bindings.wordcount-out-1.destination=output2
spring.cloud.stream.bindings.wordcount-out-2.destination=output3
当您有两个输入绑定和一个输出绑定时,您可以将处理器表示为 java.util.function.BiFunction
类型的 bean。这是一个示例
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
BiFunction
具有两个输入和一个输出。第一个输入是 KStream
,第二个是 KTable
,而输出是另一个 KStream
。如果您想在输出端有多个 KStream
,您可以将类型签名更改为 KStream[]
,然后进行必要的实现更改。
场景 4:两个输入绑定且没有输出绑定
如果您只有两个输入绑定但没有输出,则可以使用 Java 的 BiConsumer
支持。可能的用例是您不想生成输出,而是更新某些状态存储。这是一个示例
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {
userClicksStream.foreach((key, value) -> latch.countDown());
userRegionsTable.toStream().foreach((key, value) -> latch.countDown());
};
}
如果您有三个、四个或 n 个输入绑定怎么办?在这种情况下,您不能依赖 Function 或 BiFunction 方法。您需要依赖部分应用函数。基本上,您从一个 Function 开始,然后在第一个函数的输出端,提供另一个 Function 或 Consumer,直到用完您的输入。这种以这种方式部分应用函数的技术通常在函数式编程术语中称为函数柯里化。这是一个使用三个输入和一个输出的示例
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
return orderStream -> (
customers -> (
products -> (
orderStream.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
仔细检查处理器的类型签名。
Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>>
我们从一个以 KStream
为输入的函数开始,但第二个参数(此函数的输出)是另一个以 GlobalKTable
为输入的 Function
。第二个 Function 的输出是另一个函数,该函数的输入是另一个 GlobalKTable
。这个第三个函数用完了我们的输入,这个函数的输出是 KStream
,将用于输出绑定。
让我们从数学的角度来看这个模型。
让我们将这三个函数称为 f(x)
、f(y)
和 f(z)
。
如果我们展开这些函数,它将如下所示
f(x) -> f(y) -> f(z) -> KStream<Long, EnrichedOrder>
.
变量 x 代表 KStream<Long, Order>
,变量 y 代表 GlobalKTable<Long, Customer>
,变量 z 代表 GlobalKTable<Long, Product>
。
第一个函数 f(x)
具有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出是函数 f(y)
。
函数 f(y)
具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),其输出是另一个函数 f(z)
。
函数 f(z)
的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出是 KStream<Long, EnrichedOrder>
,这是应用程序的最终输出绑定。
来自三个部分函数(分别为 KStream
、GlobalKTable
、GlobalKTable
)的输入在方法体中可用,用于实现业务逻辑作为 lambda 表达式的一部分。
请记住,如上所述,在 Java 中使用函数柯里化处理超过合理数量的输入(例如上面的示例中的三个输入)可能会导致代码可读性问题。因此,您必须仔细评估和分解您的应用程序,以查看在单个处理器中具有大量输入绑定的适当性。
在这篇博文中,我们快速浏览了您可以在基于 Spring Cloud Stream 的 Kafka Streams 应用程序中使用的各种函数式编程模型。我们看到了如何使用 java.util.function.Function
(或如我们在上一篇博客中看到的 Consumer
)、java.util.function.BiFunction
和 BiConsumer
。我们还看到了如何通过使用 Kafka Stream 的分支功能(提供一个 KStream
数组作为输出)在输出端支持多个绑定。最后,我们看到了如何通过部分应用(柯里化)函数支持两个以上输入绑定。