Spring Batch和Spring Integration的实际应用

工程 | Dave Syer | 2010年2月15日 | ...

`Spring Batch` 和 `Spring Integration` 的用户有一些共同的疑问,我们经常被问到它们如何结合使用。最近发布了 `Spring Batch Admin` 1.0.0.M2,它大量使用了 Spring Integration,因此它是研究一些具体用例的好工具,这也是我们在本文中计划要做的事情。

Spring Batch 集成

1.0.0.M2 版本的一部分是 `Spring Batch Integration` 模块,最近从 Spring Batch 迁移出来,并在 Batch Admin 中有了新的归宿。许多 Batch-Integration 的交叉用例已在 Spring Batch Integration 中实现或演示。之所以要迁移到新的位置,是因为 Batch Admin 使用了大量 Batch Integration 的功能,因此调整这些项目的发布周期更有意义。

Spring Batch Admin

Spring Batch Admin 是 SpringSource 的一个开源项目。它旨在为开发人员提供 Web UI 和工具,用于构建他们自己的 UI 来与 Spring Batch 作业进行交互(启动、停止、调查失败原因等)。就 1.0 的计划功能而言,最近的里程碑版本相当完整,但如果您有任何想法或贡献,请访问 论坛问题跟踪器,并参与社区。

开箱即用的目标运行时是 servlet 容器的单个实例(例如 SpringSource tc Server),在该容器中,系统无需或几乎无需配置即可运行。但是我们希望能够支持对基本用例的定制和扩展,包括将部署扩展到服务器集群,而 Spring Integration 正被证明是许多扩展点的关键。

结合 Batch 和 Integration

Spring Batch 和 Spring Integration 之间的界限并不总是清晰的,但是可以遵循一些指导原则。主要是:考虑粒度,并应用常用模式。本文介绍了一些常用模式。更多模式在 Spring Batch Integration 和 Spring Batch Admin 中实现(并可能成为未来文章的主题)。

向批处理流程添加消息传递功能可以实现操作自动化,还可以分离和制定关键问题的策略。例如,消息可能会触发作业执行,然后可以以多种方式公开消息的发送。或者,当作业完成或失败时,可能会触发发送消息,而这些消息的使用者可能具有与应用程序本身无关的操作问题。

反过来也适用:消息传递也可以嵌入到作业中,但这不在本文的讨论范围之内。例如:通过通道读取或写入要处理的项。

以下是一些使用 Spring Integration 和 Spring Batch Integration 在 Batch Admin 中实现的用例。

模式:消息触发

Spring Integration 的优点在于消息生产者和消息消费者之间关注点的分离,一个很好的具体示例是消息能够触发作业执行的能力。在这种情况下,消费者是完全通用的,并且是标准 Spring Batch 的一个非常薄的包装器JobLauncher(此处代码来自 Spring Batch Integration `JobLaunchingMessageHandler`)

@ServiceActivator
public JobExecution launch(JobLaunchRequest request) {

    Job job = request.getJob();
    JobParameters jobParameters = request.getJobParameters();

    return jobLauncher.run(job, jobParameters);
    
}

从上面的代码片段可以看出,包装器非常薄,几乎不值得一提,但它的优点是它具有非常清晰且明显的责任,并且易于隔离测试。JobLaunchRequest对象是输入参数的特殊包装器JobLauncher,以便它们可以构成 Spring Integration 中消息的有效负载。

JobLaunchingMessageHandler连接到MessageChannel在 Spring Batch Admin 中(在 Manager jar/META-INF/bootstrap/integration/launch-context.xml):

<service-activator input-channel="job-requests">
    <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
        <beans:constructor-arg ref="jobLauncher" />
    </beans:bean>
</service-activator>

中)。这就是该集成模式的消费者端完成。这是一种本地方法,因为它并不特别适合远程调用,因为JobLaunchRequest故意不是Serializable(因为Job不是)。

要本地启动一个Job,我们只需要创建一个生产者并使用它来发送一个JobLaunchRequestjob-requests通道。Batch Admin Manager 模块中有一个集成测试就是这样做的,但是此处集成方法的真正强大之处在于能够制定请求策略并让它们来自各种不同的生产者。

模式:通道重用

可以以多种方式将消息发送到 Batch Admin 中的job-requests通道。为了摆脱本地调用并远程公开作业,只需将其他形式的传入请求适配到JobLaunchRequest,而 Spring Integration 使这变得非常容易。这是我们称为通道重用的模式的基本场景。示例包括

HTTP:浏览器

Spring Integration HTTP 适配器模块可用于通过 HTTP 接受输入消息

<http:inbound-channel-adapter name="/job-requests" channel="job-launches" 
    request-mapper="bodyInboundRequestMapper" view="reload-job-executions" />

此代码段可在 Batch Admin Manager 模块中找到(META-INF/servlet/integration-servlet.xml)。它公开一个端点 URLhttp://.../batch/job-requests,我们可以使用它通过在浏览器中提交表单来发送作业执行请求。

原则上,请求可以采用我们喜欢的任何形式,因为我们可以转换此适配器下游和JobLaunchingMessageHandler上游的消息。在 Spring Batch Admin 中,转换是由另一个 POJO 消息处理程序(`StringToJobLaunchRequestAdapter`)对适配器的输出进行的。

HTTP:命令行

上面使用的相同 HTTP 适配器可用于从 UN*X 命令行远程启动作业。这是使用 Spring Integration HTTP 适配器的一种非常好的方法:您可以使用简单的 shell 脚本自动执行许多操作。例如,如果应用程序在本地部署并具有名为“staging”的作业,则此方法有效

$ echo staging[input.file=foo] | curl -v -d @- -H "Content-Type: text/plain" \
  https://127.0.0.1:8080/springone-web-demo/batch/job-requests

名为“staging”的作业使用一个参数启动(input.file=foo),其中foo是要读取为输入的文件的绝对路径。作业使用这样的项目读取器配置

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
    <property name="linesToSkip" value="1" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
    </property>
    <property name="resource" value="#{jobParameters[input.file]}" />
</bean>

(此代码片段不在 Spring Batch Admin 示例中,但在我们在 2009 年 Spring One Americas 上演示的示例中。)

文件轮询

Spring Integration 可以轮询目录中的文件(使用文件适配器模块)。生成的的消息只需要适应job-requests通道即可。Spring Batch Admin 在一个简单的消息处理程序中执行此操作(FileToJobLaunchRequestAdapte):
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
    JobParameters jobParameters = new JobParametersBuilder().addString(
            "input.file", file.getAbsolutePath()).toJobParameters();
    return new JobLaunchRequest(job, jobParameters);
}

此简单的 POJO 方法声明为@ServiceActivator(它可能是@Transformer),因此它可以在消息处理链中插入到JobLaunchingMessageHandler之前,以将File转换为JobLaunchRequest.

Restart

通常可以在 Spring Batch 中重新启动失败的作业,并且此功能可通过 Spring Batch Admin UI 中的 Web 浏览器使用。它也可以在命令行上使用,或者对于可以向名为job-restarts:

<channel id="job-restarts" />
    <service-activator input-channel="job-restarts" output-channel="job-requests">
    <beans:bean class="org.springframework.batch.admin.integration.JobNameToJobRestartRequestAdapter">
        <beans:property name="jobLocator" ref="jobRegistry" />
        <beans:property name="jobExplorer" ref="jobExplorer" />
    </beans:bean>
</service-activator>

的 Spring Integration 通道发送消息的任何人来说,都可以使用。此通道只需要作业名称,并且它已被公开为 HTTP 入站端点,因此从 UN*X 命令行,您可以执行以下操作

$ echo staging | curl -v -d @- -H "Content-Type: text/plain" \
  https://127.0.0.1:8080/springone-web-demo/batch/job-restarts

重试

如果作业由于可恢复的错误(例如对远程服务的调用中的超时或网络故障)而反复失败,您可能希望自动重新启动它。可以使用 Spring Batch 的一些功能在作业内部的低级别处理重试,但是要重试整个作业需要对运行时进行一些操作。现在job-requests通道接受启动请求,这可以使用 Spring Integration 来简单地完成。此端点将充当过滤器,查找已知可重试的作业中的故障条件,然后充当重新启动转换器(如上例所示)。因此,这样的链将有效

<chain input-channel="input-files" output-channel="job-requests" 
        xmlns="http://www.springframework.org/schema/integration">
    <filter>
        <bean class="...RetryableJobExecutionFilter" 
            xmlns="http://www.springframework.org/schema/beans">
            <property name="pattern" value="(&amp;s).*TimeoutException.*" />
        </bean>
	</filter>
    <service-activator>
        <bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter" 
            xmlns="http://www.springframework.org/schema/beans">
            <property name="job" ref="job1" />
        </bean>
    </service-activator>
</chain>

其中RetryableJobExecutionFilter可能实现如下

public boolean isRetryable(JobExecution jobExecution) {
    boolean retryable = false;
    for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
        if (stepExecution.getStatus().isLessThan(BatchStatus.STOPPED)) {
            continue;
        }
        if (stepExecution.getExitStatus().getExitDescription().matches(pattern)) {
            retryable = true;
            break;
        }
    }
    return retryable;
}

此示例在我们 Spring One 演示中;它不在 Spring Batch Admin 中,尽管对于您需要的任何特定过滤器,实现起来都很简单。

输入文件上传

可以直接通过 Spring Batch Admin UI 将文件上传到应用程序。不建议使用 HTTP POST 上传大型文件,主要是因为应用程序必须在内存中缓冲内容,但这对于上传小型或中型数据集以供 Spring Batch 处理来说是一个不错的功能。

示例应用程序实际上并没有使用 Spring Integration 的文件轮询器(但对于想要按照上述说明配置它的客户端来说它是可用的);它使用的是文件上传后一次性直接消息触发器。策略是让 Manager 模块上传文件,然后向发布-订阅通道(input-files).

任何可以使用输入文件的作业只需要有一个上游组件订阅该通道,并在感兴趣时传递文件即可。示例中是通过按其父目录名称过滤文件来完成的。

<chain input-channel="input-files" output-channel="job-requests" 
        xmlns="http://www.springframework.org/schema/integration">
    <filter expression="payload.parent.name=='sample'" />
    <service-activator>
        <bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter" 
            xmlns="http://www.springframework.org/schema/beans">
            <property name="job" ref="job1" />
        </bean>
    </service-activator>
</chain>

如果输入文件的父目录(可以在 Web UI 中设置)是“sample”,则它将被传入服务激活器,该激活器将其转换为JobLaunchRequest并将其发送给JobLaunchingMessageHandler进行处理(如前所述)。

模式:POJO 消息处理

将文件发送到input-files通道是在 Batch Admin Manager 中完成的,遵循 Spring 应用程序的最佳传统,通过简单的 POJO 和基于接口的组件实现。有一个FileService接口和一个本地实现,该实现使用临时目录来编排通过 HTTP 传入的文件。文件上传后,服务通过具有自定义接口的简单消息网关发送文件。
public interface FileSender {
    
    void send(File file);

}

该接口没有实现(单元测试中除了存根外),因为 Spring Integration 可以提供一个。

<gateway id="fileSender" 
    service-interface="org.springframework.batch.admin.service.FileSender"
    default-request-channel="input-files" />

<beans:bean class="org.springframework.batch.admin.service.LocalFileService">
    <beans:property name="fileSender" ref="fileSender" />
</beans:bean>

配置文件上传

Spring Batch Admin 允许用户上传 Spring 配置文件,以便从 UI 启动和管理作业。这对于在运行时重新参数化作业非常有用,例如,当运行一组性能测试以测量各种性能调整(例如更改步骤中的提交间隔)的效果时。

为了接受输入的配置文件,我们使用了一个消息通道,以便多个不同的输入方法可以重用它。配置通过名为job-configurations:

<service-activator input-channel="job-configurations" output-channel="job-registrations">
	<beans:bean class="org.springframework.batch.admin.integration.JobConfigurationResourceLoader">
		<beans:property name="jobRegistry" ref="jobRegistry" />
	</beans:bean>
</service-activator>

的通道传入。这里的服务激活器只接受一个 SpringResource并将其视为配置文件:加载一个ApplicationContext,扫描其中的Job组件并在提供的注册表中注册它们。注册后,可以从 UI 中的主作业菜单或通过job-requests通道启动作业,如上所述。

与输入文件一样,配置文件可以通过 HTTP 传入,在这种情况下可以作为文件附件或纯文本参数,也可以通过文件轮询传入。轮询用例在 Manager 模块中实现,因此值得快速查看一下它的工作原理。在META-INF/bootstrap/integration/configuration-context.xml中,我们找到了这个

<file:inbound-channel-adapter directory="target/config" channel="job-configuration-files"
	filename-pattern=".*\.xml">
	<poller max-messages-per-poll="1">
		<cron-trigger expression="5/1 * * * * *" />
	</poller>
</file:inbound-channel-adapter>

适配器将轮询一个目录(此处为演示目的硬编码为“target/config”,但在实际应用程序中将被参数化),并查找名称以“.xml”结尾的文件。当匹配该模式的文件到达时,它将(作为java.io.File)发送到job-configuration-files通道。消息从那里被转换,以便File变成Resource,并且它可以被发送到job-configurations通道。

模式:信息消息

一旦开始使用 Spring Integration 消息来驱动许多应用程序功能,通常能够利用消息流来进行信息或报告目的。例如,在作业启动、停止(完成或失败)时发送消息将很有用。这很容易使用 Spring Integration 的MessagePublishingInterceptor来完成。在 Spring Batch Admin Manager 中,拦截器被配置为发送作业执行消息。


<aop:config>
	<aop:advisor advice-ref="jobMessagePublishingInterceptor" pointcut="execution(* *..Job+.execute(..))" />
</aop:config>

<bean id="jobMessagePublishingInterceptor" class="org.springframework.integration.aop.MessagePublishingInterceptor"
	xmlns="http://www.springframework.org/schema/beans">
	<constructor-arg index="0">
		<bean class="org.springframework.batch.admin.integration.TrivialExpressionSource" p:payload="#args[execution]" />
	</constructor-arg>
	<property name="defaultChannel" ref="job-operator" />
</bean>

每次执行Job时,AOP 顾问都会传递参数值(一个JobExecution)发送到通道。然后,相关方可以订阅该通道并获取有关最近执行的消息的信息。Spring Batch Admin 本身不会对这些消息做任何处理,只是在控制台中记录它们,并在 UI 中列出它们以便检查。在 Spring Batch Admin 之上构建自己应用程序的客户端可能会发现这些消息对于通知操作员或报告系统作业结果很有用。

设置信息消息可能会产生打开新应用程序功能的副作用:上面描述的作业重试功能是通过连接一个端点来监听通道。

下一步是什么?

我们希望本文能让你了解如何在批处理应用程序中使用 Spring Integration 的一些方法。上面几乎所有代码示例都以某种形式存在于 Spring Batch Admin 中,但这绝不是故事的结尾,Spring Batch Integration 和 Spring Batch Admin 项目中还有更多示例。访问Batch Admin网站了解更多信息,并了解在哪里可以获取代码进行操作。InfoQ 上还有一个视频,介绍了 Spring One 上由 Spring Batch 和 Spring Integration 负责人(Dave Syer 和 Mark Fisher)介绍的本文中的一些主题。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部