抢先一步
VMware 提供培训和认证,以加速您的进步。
了解更多这是博客系列的第 3 部分,我们将在其中介绍 Spring Cloud Stream 应用程序的 Java 函数。
系列中的其他部分。
在本系列的前两个博客中,我们提供了 一般介绍 关于将所有现有的 Spring Cloud Stream 应用启动器 迁移到函数以及我们可以 组合它们 的各种方法。在本篇博文中,我们将继续该系列,展示如何开发、测试这些函数以及如何使用它们生成 Spring Cloud Stream 应用程序。特别是,这里我们将重点介绍如何编写一个供应商函数(实现 java.util.function.Supplier
),然后为 Spring Cloud Stream 生成相应的源应用程序。
为了更好地理解这个概念,我们将以一个用例为例并实现一个解决方案来满足它。
我们需要一个函数,当以正确的配置调用时,它可以以 atom、rss 等格式提供博客 Feed 的内容。我们需要支持两种供应商调用模型 - 一种是通过编程方式调用函数(例如,在 FaaS 环境中调用的 REST 端点),另一种是流式供应商,在该供应商中,一旦 Feed 可用,我们就会获得持续的 Feed 流。我们希望基于 ROME 库(一个流行的 Feed 聚合库)构建这些供应商。我们将从非 Spring 开发人员和 Spring 开发人员的角度来看待这一点。
让我们假设你并非 Spring 开发人员,也不熟悉 Spring Integration(它已经为 ROME 提供了抽象)。在这种情况下,我们当然可以直接使用 ROME 来生成 Feed 记录。例如,这是此场景下有效的 Supplier。
public Supplier<SyndEntry> feedSupplier()
{
return () -> {
//Use the ROME framework directly to produce syndicated entries.
}
}
这里的优势在于,我们可以开发 Supplier 而无需任何 Spring 知识,并且可以使用该环境提供的抽象或依赖于 Spring Cloud Function 等框架,将其直接部署到无服务器环境中。
这本质上意味着,如果您是 Java 开发人员,但 Spring 框架技能不多,您仍然可以使用 java.util.function
包中定义的接口(如 Function
、Supplier
和 Consumer
)编写函数,并提供业务逻辑。然后,我们可以获取我们开发的此工件,并通过添加 Spring Cloud Stream 绑定器依赖项并将其转换为 SpringBootApplication
来将其转换为 Spring Cloud Stream 应用程序。通过提供一些配置属性(如中间件目标),我们可以在像 Spring Cloud Data Flow 这样的平台上部署应用程序(该平台将应用程序作为数据管道的一部分进行编排),从而获得即时的增值。这样,我们就可以完全独立于任何 Spring 依赖项编写函数,并在部署过程的最后阶段引入 Spring 组件(如 Spring Cloud Stream、Spring Cloud Function 和 Spring Cloud Data Flow 等)。下图捕捉了这个想法。
我们可以观察到,函数组件可以独立调用,也可以在将其转换为 Spring Cloud Stream 应用程序后作为 Spring Cloud Data Flow 管道的一部分进行调用。
虽然上述模型可能是一个良好的起点,但当我们开始深入直接使用 ROME 框架时,我们可能会很快意识到它涉及大量繁重的工作和更深入的库知识。出错的可能性很高,因此我们需要编写大量测试以验证我们的自定义实现是否按预期工作,以及所有异常情况是否都已涵盖。我们开始想知道是否已经有更简单的抽象可用。这样,我们就不需要编写任何 ROME 特定的代码,因为抽象层会处理所有复杂性。幸运的是,我们有一个解决方案。Spring Integration 为许多企业技术提供了许多入站和出站适配器。Feed 适配器 就是其中之一,其实现基于 ROME。事实上,我们在预打包的 stream-applications 中提供的许多函数组件都基于 Spring Integration 适配器。这些适配器已被广泛使用并经过大量测试,适用于大量企业用例。但是,我们可能想要为其编写 Supplier 的技术在 Spring Integration 中不可用。在这种情况下,正如我们上面所看到的,我们当然可以自己编写代码并从 Supplier 中调用它。
如果您尚未这样做,请分叉并克隆 stream applications 存储库。然后在 functions/supplier
下创建一个新的 feed-supplier
模块。使用现有的 Supplier 之一作为模板进行指导。
在项目中添加以下 Spring Integration Feed 适配器依赖项。这将引入来自 Spring Integration 的 Feed 适配器以及任何其他传递依赖项。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
现在我们有了核心依赖项,让我们开始编写一些代码。由于期望在 Spring Boot 上下文中使用函数,因此我们需要创建一个 ConfigurationProperties
类来驱动 Supplier 函数的配置。以下是它的样子。
package org.springframework.cloud.fn.supplier.feed;
@ConfigurationProperties("feed.supplier")
public class FeedSupplierProperties {
/**
* Key used in metadata store to avoid duplicate read from the feed
*/
private String metadataKey;
/**
* Feed url.
*/
private URL feedUrl;
// rest is omitted
}
我们可以看到,我们在所有属性上都使用了 feed.supplier
前缀。
接下来,让我们创建一个基于 Spring 的配置类,在其中提供所有必要的组件。我们将逐步构建它。以下是该类的基本结构。
package org.springframework.cloud.fn.supplier.feed;
...
@Configuration
@EnableConfigurationProperties(FeedSupplierProperties.class)
public class FeedSupplierConfiguration {
}
将这些字段添加到类中。
private final ConcurrentMetadataStore metadataStore;
private final Resource resource;
private final FeedSupplierProperties feedSuppplierProperties;
关于这些字段的简要说明。Spring Integration 中的 Feed 适配器提供了一种功能,用于不读取我们已从 Feed 中读取的相同条目。我们上面定义的 metadataKey
属性用于此目的。它的工作方式是使用元数据存储。有 各种元数据存储可用于流行的数据库。包含以下依赖项以用于内存中的简单元数据存储。
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>metadata-store-common</artifactId>
<version>${project.version}</version>
</dependency>
请注意,此要求特定于此 Supplier,并非所有 Supplier 都可能需要它。
如果不可用基于 HTTP(或 HTTPS)的 URL(可以通过配置属性设置),则用户可以提供一个用于读取 Feed 的 Resource
bean。
让我们添加一个构造函数来使用这些字段。
FeedSupplierConfiguration(FeedSupplierProperties feedSupplierProperties,
ConcurrentMetadataStore metadataStore,
@Nullable Resource resource) {
this.feedSuppplierProperties = feedSupplierProperties;
this.metadataStore = metadataStore;
this.resource = resource;
}
Resource
可为空,因为大多数情况下,我们只需将 URL 字符串作为配置属性传递,而不提供 Resource
bean。
Spring Integration Feed 适配器提供了一个名为 FeedEntryMessageSource 的类,它实现了 MessageSource
接口。我们将在 Supplier 中使用这个消息源。让我们将其配置为一个 Spring Bean。下面的代码非常容易理解。
@Bean
public FeedEntryMessageSource feedEntryMessageSource() {
final FeedEntryMessageSource feedEntryMessageSource = this.resource == null ? new FeedEntryMessageSource(this.feedSuppplierProperties.getFeedUrl(),
this.feedSuppplierProperties.getMetadataKey()) :
...
return feedEntryMessageSource;
}
现在我们已经准备好了 MessageSource Bean,编写一个简单的 Supplier 并通过调用 Supplier 的 get
方法以编程方式调用它就相对简单了。如下所示。
@Bean
public Supplier<Message<SyndEntry>> feedSupplier() {
return () -> feedEntryMessageSource().receive();
}
我们可以将这个 Supplier Bean 注入到我们的应用程序中,并以编程方式调用 get
方法。当这个 Supplier
在 Spring Cloud Stream 应用程序中使用时(我们稍后会看到),它将使用 Spring Cloud Stream 提供的 默认轮询器,默认情况下,该轮询器将每秒触发一次 Supplier。这个调度可以在轮询器中进行更改。
非响应式轮询解决方案看起来还不错,但是我们可能会问,如果我不想定期显式轮询,而是希望数据在消息源中可用时立即以流式方式获取,该怎么办?好吧,我们有一个解决方案——开发一个响应式 Supplier,它会在数据可用时立即传递接收到的数据。让我们看看细节。
同样,Spring Integration 提供了一些抽象,我们可以使用它们将 FeedEntryMessageSource
转换为响应式发布者,如下所示。
@Bean
public Supplier<Flux<Message<SyndEntry>>> feedSupplier() {
return () -> IntegrationReactiveUtils.messageSourceToFlux(feedEntryMessageSource());
}
您可能会注意到,此 Supplier 返回的是 Flux<Message<SyndEntry>>
,而不是像初始非响应式 Supplier 中那样返回 Message<SyndEntry>
。在初始非响应式 Supplier 中,我们依赖于以编程方式调用 Supplier 或其他一些轮询机制。
好的,从 Spring Integration 获取 MessageSource
并将其转换为 Flux
非常不错。如果没有这样的 MessageSource
,而我们必须手工编写要为其编写响应式 Supplier 的系统的基本数据检索,该怎么办?对于这些情况,我们可以使用 Project Reactor 提供的各种功能,然后以编程方式将数据馈送到它们。最重要的是,当我们编写响应式流式 Supplier 时,必须将数据作为 Flux
返回。
让我们为这个响应式 Supplier 添加一个单元测试。我们可以使用 RFC 4287 - Atom 联合格式 中描述的 Atom Feed 示例作为我们的测试数据。将其包含在 src/test/resources
中。
以下是测试类。
@SpringBootTest(properties = {"feed.supplier.feedUrl=classpath:atom.xml",
"feed.supplier.metadataKey=feedTest" })
@DirtiesContext
public class FeedSupplierTests {
@Autowired
Supplier<Flux<Message<SyndEntry>>> feedSupplier;
@Test
public void testFromSampleRssFile() {
final Flux<Message<SyndEntry>> messageFlux = feedSupplier.get();
StepVerifier.create(messageFlux)
.assertNext((message) -> {
assertThat(message.getPayload().getTitle().trim()).isEqualTo("Atom draft-07 snapshot");
assertThat(message.getPayload().getContents().size()).isEqualTo(1);
assertThat(message.getPayload().getContents().get(0).getValue().contains("The Atom draft is finished.")).isTrue();
})
.thenCancel()
.verify();
}
@SpringBootApplication
static class FeedSupplierTestApplication {
}
}
functions 项目在 Maven BOM 中聚合了所有可用的函数。将 feed-supplier
添加到此 BOM 中。如果您是基于此函数生成 Spring Cloud Stream 应用程序,则主要需要此操作。
在流程的这一点上,我们可以向存储库提交包含 Supplier 的拉取请求,但如果我们想从 Supplier 生成基于 Spring Cloud Stream 绑定的应用程序,请继续阅读。生成后,这些应用程序可以独立运行,也可以作为 Spring Cloud Data Flow 中的数据编排管道的组成部分运行。
继续并在 applications/source
下创建一个名为 feed-source
的新模块。正如我们在之前的博文中提到的,java.util.function.Supplier
被映射为 Spring Cloud Stream 源。
我们不需要在 Feed Supplier 的基础上添加任何自定义代码,因为它可以按原样使用。但是,既然我们现在正在讨论 Spring Cloud Stream 应用程序,我们需要使用 测试绑定器 和 Supplier 函数来查看 Supplier 如何与 Spring Cloud Stream 配合使用。
我们可以使用其中一个 现有的源 作为模板来指导我们完成此过程。我们甚至可以复制其中一个并逐步进行更改。
所有应用程序都使用父 POM stream-applications-core,它引入了所有必要的测试依赖项,例如上面提到的测试绑定器。它还为应用程序生成器插件提供了基础结构,该插件负责生成基于绑定的应用程序。
我们想强调的一点是,除非应用程序模块包含自定义代码,否则此模块仅仅成为生成基于绑定的应用程序的应用程序生成器。换句话说,我们不会向其中添加带有 @SpringBootApplication
的类,而是会为我们生成它。
添加以下依赖项以使用测试绑定器进行测试
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
现在,我们可以添加一个测试来验证 feed-supplier
是否与 Spring Cloud Stream 中的测试绑定器一起工作。基本上,我们需要确保 Supplier 通过测试绑定器生成数据,并且将其传递到测试绑定器上的目标位置。
以下是测试背后的总体思路
public class FeedSourceTests {
@Test
public void testFileSource() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(FeedSourceTestApplication.class))
.web(WebApplicationType.NONE)
.run("--spring.cloud.function.definition=feedSupplier", "--feed.supplier.feedUrl=classpath:atom.xml", "--feed.supplier.metadataKey=feedTest")) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
Object title = JsonPath.parse(new String(sourceMessage.getPayload())).read("$.title");
assertThat(title).isEqualTo("Atom draft-07 snapshot");
}
}
@SpringBootApplication
@Import(FeedSupplierConfiguration.class)
public static class FeedSourceTestApplication {
}
}
该测试与我们为 Supplier 添加的单元测试非常相似,但有一个很大的区别。在 Supplier 中,我们直接调用它并验证生成的数据。在这里,我们没有直接调用 Supplier,而是 Spring Cloud Stream 中的绑定机制自动为我们执行此操作。我们从输出目标接收数据,然后进行验证。
测试通过后,就该生成应用程序了。
默认情况下,插件会为 Spring Cloud Stream 中的 Kafka 和 Rabbit 绑定器生成应用程序。这在 stream-applications-core
中的父 POM 中配置。如果我们需要自定义不同绑定器的生成,则需要在那里进行更改。以下是应用程序生成器插件的配置。
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedApp>
<name>feed</name>
<type>source</type>
<version>${project.version}</version>
<configClass>org.springframework.cloud.fn.supplier.feed.FeedSupplierConfiguration.class</configClass>
</generatedApp>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-composite-function-support</artifactId>
<version>${stream-apps-core.version}</version>
</dependency>
</dependencies>
</configuration>
</plugin>
让我们快速回顾一下这里的一些细节。我们请求插件创建一个名为 feed-source
的应用程序,并希望它使用我们上面开发的 Supplier
作为主配置类。在插件的依赖项部分中,我们还需要添加应用程序所需的任何依赖项,在本例中为 feed-supplier
。我们需要在所有生成的源应用程序中添加所有处理器函数。这是因为我们可以将源与其他处理器组合,而无需像我们在 之前的博文 中看到的那样,将它们作为单独的微服务运行。有关使用处理器进行函数组合的更多详细信息,也可以在 此处 找到。这就是我们在插件的依赖项部分添加依赖项 stream-applications-composite-function-support
的原因。
构建应用程序模块,我们将在 apps
文件夹中看到基于绑定的应用程序。它们将命名为 feed-source-kafka
和 feed-source-rabbit
。我们可以转到其中任何一个应用程序,构建它,然后将其用作独立应用程序或 Spring Cloud Data Flow 中管道的一部分。
在这篇博文中,我们看到了开发、测试和贡献供应商/Spring Cloud Stream 应用程序组合的整个过程。请按照此处列出的步骤编写您自己的供应商和数据源。如果您已这样做,请考虑将它们贡献回存储库。
这篇博文是系列文章中的第三篇,将涵盖许多相关主题。在接下来的几周内,请期待更多深入探讨和聚焦的主题。在本系列的下一篇博文中,类似于我们在本文中关于编写新的 Supplier 和 Source 的内容,我们将编写一个 Consumer 函数,然后从中生成一个 Sink 应用程序。