使用 Spring Cloud Stream 和 Apache Kafka Streams 进行流处理。第二部分 - 编程模型续篇

工程 | Soby Chacko | 2019年12月03日 | ...

紧随上一篇博客的脚步,我们在其中介绍了使用 Spring Cloud Stream 和 Kafka Streams 编写流应用程序的基本函数式编程模型,在这一部分,我们将进一步探索该编程模型。

让我们看几个场景。

场景 1:单个输入绑定和单个输出绑定

如果您的应用程序从单个输入绑定消费数据并将其生产到输出绑定,则可以使用 Java 的 Function 接口来实现。请记住,这里的绑定不一定映射到单个输入 Kafka 主题,因为主题可以多路复用并连接到单个输入绑定(在单个绑定上配置逗号分隔的多个主题 - 请参阅下面的示例)。对于出站情况,绑定此处映射到一个主题。

这是一个处理器示例

请注意,此处理器中的实际业务逻辑实现以 lambda 表达式的形式给出。

@Bean
public Function<KStream<Object, String>, KStream<String, WordCount>> wordcount() {

  return input -> input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .map((key, value) -> new KeyValue<>(value, value))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
        .windowedBy(TimeWindows.of(5000))
        .count(Materialized.as("wordcount-store"))
        .toStream()
        .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
              new Date(key.window().start()), new Date(key.window().end()))));
}

查看处理器的返回签名。它是一个 Function<KStream<Object, String>, KStream<String, WordCount>>。该处理器消费一个 KStream 并生成另一个 KStream。在底层,绑定器使用传入的 Kafka 主题来消费数据,然后将其提供给此输入 KStream。类似地,在出站端,绑定器将数据作为 KStream 生成,这些数据将被发送到传出的 Kafka 主题。

以下是如何为该处理器提供输入主题

spring.cloud.stream.bindings.wordcount-in-0.destination=words

对于多路复用主题的情况,您可以使用此配置

spring.cloud.stream.bindings.wordcount-in-0.destination=words1,words2,word3

输出主题可以配置如下

spring.cloud.stream.bindings.wordcount-out-0.destination=counts

场景 2:通过 Kafka Streams 分支实现多个输出绑定

Kafka Streams 允许您使用称为分支(branching)的功能将数据发送到出站端的多个主题。本质上,它使用谓词(predicate)作为分支到多个主题的基础。这与上面的示例基本相同,但主要区别在于出站端以 KStream[] 的形式提供。

这是一个处理器示例

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>[]> wordcount() {

     Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
     Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
     Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

     return input -> input
           .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
           .groupBy((key, value) -> value)
           .windowedBy(TimeWindows.of(5000))
           .count(Materialized.as("WordCounts-branch"))
           .toStream()
           .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                 new Date(key.window().start()), new Date(key.window().end()))))
           .branch(isEnglish, isFrench, isSpanish);
  }
}

请注意函数的第二个参数类型。它以 KStream[] 的形式提供。

您可以为这些绑定提供单独的输出主题

spring.cloud.stream.bindings.wordcount-out-0.destination=output1
spring.cloud.stream.bindings.wordcount-out-1.destination=output2
spring.cloud.stream.bindings.wordcount-out-2.destination=output3

场景 3:两个输入绑定和一个输出绑定。

当您有两个输入绑定和一个输出绑定时,可以将处理器表示为 java.util.function.BiFunction 类型的 bean。这是一个示例

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
  return (userClicksStream, userRegionsTable) -> (userClicksStream
        .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                    "UNKNOWN" : region, clicks),
              Joined.with(Serdes.String(), Serdes.Long(), null))
        .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
              regionWithClicks.getClicks()))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(Long::sum)
        .toStream());
}

BiFunction 有两个输入和一个输出。第一个输入是 KStream,第二个输入是 KTable,而输出是另一个 KStream。如果您想在出站端拥有多个 KStream,可以将类型签名更改为 KStream[] ,然后进行必要的实现更改。

场景 4:两个输入绑定且没有输出绑定

如果您只有两个输入绑定但没有输出,则可以使用 Java 的 BiConsumer 支持。可能的用例是您不想生成输出,而是更新一些状态存储。这是一个示例

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
  return (userClicksStream, userRegionsTable) -> {
     userClicksStream.foreach((key, value) -> latch.countDown());
     userRegionsTable.toStream().foreach((key, value) -> latch.countDown());
  };
}

场景 5:多于两个输入绑定。

如果您有三个、四个或 n 个输入绑定怎么办?在这种情况下,您不能依赖 Function 或 BiFunction 方法。您需要依赖部分应用的函数(partially applied functions)。基本上,您从一个 Function 开始,然后在此第一个函数的出站端,您提供另一个 Function 或 Consumer,直到您用完输入。这种部分应用函数的技术在函数式编程术语中通常称为函数柯里化(function currying)。这是一个使用三个输入和一个输出的示例

@Bean
public Function<KStream<Long, Order>,
     Function<GlobalKTable<Long, Customer>,
           Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {

  return orderStream -> (
        customers -> (
              products -> (
                    orderStream.join(customers,
                          (orderId, order) -> order.getCustomerId(),
                          (order, customer) -> new CustomerOrder(customer, order))
                          .join(products,
                                (orderId, customerOrder) -> customerOrder
                                      .productId(),
                                (customerOrder, product) -> {
                                   EnrichedOrder enrichedOrder = new EnrichedOrder();
                                   enrichedOrder.setProduct(product);
                                   enrichedOrder.setCustomer(customerOrder.customer);
                                   enrichedOrder.setOrder(customerOrder.order);
                                   return enrichedOrder;
                                })
              )
        )
  );
}

仔细检查处理器的类型签名。

Function<KStream<Long, Order>,
     Function<GlobalKTable<Long, Customer>,
           Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>>

我们从一个将 KStream 作为输入的函数开始,但第二个参数(此函数的输出)是另一个以 GlobalKTable 作为输入的 Function。这个第二个 Function 的输出是另一个函数,该函数以另一个 GlobalKTable 作为输入。这个第三个函数用完了我们的输入,并且该函数的输出是一个 KStream,它将用于输出绑定。

让我们从数学角度看这个模型。

我们将这三个函数称为 f(x)f(y)f(z)

如果我们展开这些函数,它将看起来像这样

f(x) -> f(y) -> f(z) -> KStream<Long, EnrichedOrder>.

x 变量代表 KStream<Long, Order>y 变量代表 GlobalKTable<Long, Customer>z 变量代表 GlobalKTable<Long, Product>

第一个函数 f(x) 对应应用程序的第一个输入绑定(KStream<Long, Order>),其输出是函数 f(y)

函数 f(y) 对应应用程序的第二个输入绑定(GlobalKTable<Long, Customer>),其输出是另一个函数 f(z)

函数 f(z) 的输入是应用程序的第三个输入(GlobalKTable<Long, Product>),其输出是 KStream<Long, EnrichedOrder>,这是应用程序的最终输出绑定。

来自三个部分函数(分别是 KStreamGlobalKTableGlobalKTable)的输入在方法体内可用,用于实现作为 lambda 表达式一部分的业务逻辑。

请记住,如上所述在 Java 中使用函数柯里化处理超过合理数量的输入(例如上面示例中的三个)可能会导致代码可读性问题。因此,您必须仔细评估和分解您的应用程序,以确定在单个处理器中拥有更多输入绑定的适当性。

总结

在这篇博客文章中,我们快速回顾了可以在基于 Spring Cloud Stream 的 Kafka Streams 应用程序中使用的各种函数式编程模型。我们看到了如何使用 java.util.function.Function(或在上一篇博客中看到的 Consumer)、java.util.function.BiFunctionBiConsumer. 我们还看到了如何通过使用 Kafka Stream 的分支功能(它提供一个 KStream 数组作为输出)来支持出站端的多个绑定。最后,我们看到了如何通过部分应用(柯里化)函数来支持多于两个的输入绑定。在下一篇博客文章中,我们将了解 Kafka Streams 绑定器如何执行数据反序列化和序列化。

订阅 Spring 邮件列表

订阅 Spring 邮件列表,保持连接

订阅

先行一步

VMware 提供培训和认证,助力您的飞速发展。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅即可为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部