Spring Batch 在 Kubernetes 上:高效、大规模的批处理

工程技术 | Mahmoud Ben Hassine | 2021 年 1 月 28 日 | ...

引言

批处理自早期穿孔卡片和磁带时代诞生以来,一直是计算机科学中一个充满挑战的领域。如今,现代云计算时代为如何在云环境中高效开发和运行批处理工作负载带来了全新的挑战。在这篇博文中,我将介绍批处理开发者或架构师在设计和大规模运行批处理应用时可能面临的一些挑战,并展示 Spring Batch、Spring Boot 和 Kubernetes 如何极大地简化这项任务。

在云端设计和运行批处理工作负载的挑战

与 Web 应用相比,设计云原生批处理应用可能看起来很容易,但这并非事实。批处理开发者面临许多挑战。

1. 容错性

批处理过程通常会与其他服务(如数据库、消息代理、Web 服务等)交互,这些服务在云环境中天生就不稳定。此外,运行这些进程的节点也可能随时出现故障,并被健康节点替换。云原生批处理应用应以容错方式进行设计。

2. 健壮性

由于人为失误导致批处理作业运行两次的情况并不少见,这可能会带来重大的财务后果(例如发生在 WalgreensANZ 银行NatWest 等公司的情况)。此外,某些平台,例如 Kubernetes,对于同一作业可能运行两次的情况存在一些已知限制(参阅 Cron Job 限制)。云原生批处理应用在设计时就应该准备好处理这类问题。

3. 成本效益

云基础设施根据 CPU/内存/带宽使用量计费。如果发生故障,如果无法从上次中断的地方重新启动作业,就会“浪费”上次运行的 CPU/内存/带宽使用量(因此会被重复计费或多次计费!),这是非常低效的。

4. 可观测性

任何现代批处理架构都应能够随时了解一些关键指标,包括:

  • 当前正在运行哪些作业?
  • 有哪些作业失败了(如果有的话)?
  • 关于运行情况的其他问题。

能够在仪表板上一目了然地查看这些关键绩效指标 (KPI) 对于高效运营至关重要。

5. 可伸缩性

我们正在处理前所未有的海量数据,这已无法在单台机器上处理。正确处理大量分布式数据可能是最具挑战性的一点。云原生批处理应用在设计时就应具备可伸缩性。

在设计和开发云原生批处理应用时,应考虑所有这些方面。这对开发者而言是大量的工作。Spring Batch 负责处理这些问题中的大部分。我将在下一节中详细解释。

Spring Batch 如何让批处理开发者的生活更轻松?

Spring Batch 是 JVM 上事实上的批处理框架。关于 Spring Batch 提供的丰富功能集,已经有整本书来阐述,但我想重点介绍一些与前面提到的云原生开发挑战相关的最重要功能:

1. 容错性

Spring Batch 提供了容错功能,例如事务管理、跳过和重试机制,这些功能在批处理作业与云环境中不稳定的服务交互时非常有用。

2. 健壮性

Spring Batch 使用集中式事务性作业仓库,可防止重复的作业执行。从设计上讲,导致同一作业运行两次的人为错误和平台限制是不可能的。

3. 成本效益

Spring Batch 作业将其状态保存在外部数据库中,这使得从上次中断的地方重新启动失败的作业成为可能。与从头开始重做工作并因此被计费两次或更多次的解决方案相比,这具有成本效益!

4. 可观测性

Spring Batch 提供了与 Micrometer 的集成,这在可观测性方面非常关键。基于 Spring Batch 的批处理基础设施提供了关键指标,例如当前活动的作业、读/写速率、失败的作业等。甚至可以使用自定义指标进行扩展。

5. 可伸缩性

如前所述,Spring Batch 作业将其状态保存在外部数据库中。因此,从 12 因素方法论的角度来看,它们是无状态进程。这种无状态特性使其适用于容器化并在云环境中以可伸缩的方式执行。此外,Spring Batch 提供了多种垂直和水平伸缩技术,例如多线程步骤和数据的远程分区/分块,以高效地扩展批处理作业。

Spring Batch 还提供了其他功能,但上面提到的功能在设计和开发云原生批处理过程时非常有用。

Kubernetes 如何让批处理运维人员的生活更轻松?

Kubernetes 是云端事实上的容器编排平台。大规模运营批处理基础设施绝非易事,而 Kubernetes 在这方面无疑是游戏规则的改变者。在云时代之前,在我以前的一份工作中,我曾担任批处理运维人员,负责管理一个由 4 台专用于批处理作业的机器组成的集群。以下是我当时不得不手动完成或想办法通过 (bash!) 脚本自动完成的一些任务:

  • 通过 SSH 连接到每台机器检查当前正在运行的作业
  • 通过 SSH 连接到每台机器收集失败作业的日志
  • 通过 SSH 连接到每台机器升级作业版本或更新其配置
  • 通过 SSH 连接到每台机器终止挂起作业并重新启动它们
  • 通过 SSH 连接到每台机器编辑/更新 crontab 文件进行作业调度
  • 还有许多其他类似的任务..

所有这些任务显然效率低下且容易出错,由于资源管理不善,导致四台专用机器利用率不足。如果你在 2021 年还在做这些任务(无论是手动还是通过脚本),我相信是时候考虑将你的批处理基础设施迁移到 Kubernetes 了。原因在于 Kubernetes 允许你通过针对整个集群的**一个**命令来完成所有这些任务,这从运维角度来看是一个**巨大**的差异。迁移到 Kubernetes 后,你可以:

  • 使用一个命令查询整个集群中当前正在运行的作业
  • 提交/调度作业而无需知道它们将在哪个节点上运行
  • 透明地更新作业定义
  • 自动运行作业直至完成(Kubernetes 作业创建一个或多个 Pod,并确保指定数量的 Pod 成功终止)
  • 优化集群资源的使用(Kubernetes 会像玩俄罗斯方块一样调度集群的机器),从而优化成本!
  • 使用许多其他有趣的功能

Spring Batch 在 Kubernetes 上:完美的结合,实战演练

在本节中,我将采用 Spring Batch 入门指南中开发的同一个作业(一个将 CSV 文件中的人员数据加载到关系型数据库表中的数据摄取作业),将其容器化,并部署到 Kubernetes 上。如果你想更进一步,将此作业包装在 Spring Cloud Task 中并将其部署到 Spring Cloud Data Flow 服务器中,请参阅 使用 Data Flow 部署 Spring Batch 应用

1. 设置数据库服务器

我使用 MySQL 数据库来存储 Spring Batch 元数据。数据库位于 Kubernetes 集群之外,这是有意为之。原因是为了模拟一个现实的迁移路径,即第一步只将无状态工作负载迁移到 Kubernetes。对于许多公司来说,将数据库迁移到 Kubernetes 尚不是一个选项(这是一个合理的决定)。要启动数据库服务器,请运行以下命令:

$ git clone [email protected]:benas/spring-batch-lab.git
$ cd blog/spring-batch-kubernetes
$ docker-compose -f src/docker/docker-compose.yml up

这将创建一个预填充了 Spring Batch 技术表以及业务表 PEOPLE 的 MySQL 容器。我们可以按如下方式检查:

$ docker exec -it mysql bash
root@0a6596feb06d:/# mysql -u root test -p # the root password is "root"
Enter password:
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show tables;
+------------------------------+
| Tables_in_test               |
+------------------------------+
| BATCH_JOB_EXECUTION          |
| BATCH_JOB_EXECUTION_CONTEXT  |
| BATCH_JOB_EXECUTION_PARAMS   |
| BATCH_JOB_EXECUTION_SEQ      |
| BATCH_JOB_INSTANCE           |
| BATCH_JOB_SEQ                |
| BATCH_STEP_EXECUTION         |
| BATCH_STEP_EXECUTION_CONTEXT |
| BATCH_STEP_EXECUTION_SEQ     |
| PEOPLE                       |
+------------------------------+
10 rows in set (0.01 sec)

mysql> select * from PEOPLE;
Empty set (0.00 sec)

2. 创建一个精美的容器化 Spring Batch 作业

访问 start.spring.io,生成一个包含以下依赖的项目:Spring Batch 和 MySQL 驱动。你可以使用此 链接 创建项目。解压项目并将其导入你喜欢的 IDE 后,可以修改主类,如下所示:

package com.example.demo;

import java.net.MalformedURLException;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.UrlResource;

@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication {

	public static void main(String[] args) {
		System.exit(SpringApplication.exit(
			SpringApplication.run(DemoApplication.class, args)));
	}

	@Bean
	@StepScope
	public Resource resource(@Value("#{jobParameters['fileName']}") String fileName) throws MalformedURLException {
		return new UrlResource(fileName);
	}

	@Bean
	public FlatFileItemReader<Person> itemReader(Resource resource)  {
		return new FlatFileItemReaderBuilder<Person>()
				.name("personItemReader")
				.resource(resource)
				.delimited()
				.names("firstName", "lastName")
				.targetType(Person.class)
				.build();
	}

	@Bean
	public JdbcBatchItemWriter<Person> itemWriter(DataSource dataSource) {
		return new JdbcBatchItemWriterBuilder<Person>()
				.dataSource(dataSource)
				.sql("INSERT INTO PEOPLE (FIRST_NAME, LAST_NAME) VALUES (:firstName, :lastName)")
				.beanMapped()
				.build();
	}

	@Bean
	public Job job(JobBuilderFactory jobs, StepBuilderFactory steps,
				   DataSource dataSource, Resource resource) {
		return jobs.get("job")
				.start(steps.get("step")
						.<Person, Person>chunk(3)
						.reader(itemReader(resource))
						.writer(itemWriter(dataSource))
						.build())
				.build();
	}

	public static class Person {
		private String firstName;
		private String lastName;
                // default constructor + getters/setters omitted for brevity
	}

}

@EnableBatchProcessing 注解设置了 Spring Batch 所需的所有基础设施 Bean(作业仓库、作业启动器等)以及一些实用工具,例如 JobBuilderFactoryStepBuilderFactory,以方便创建步骤和作业。在上面的代码片段中,我使用了这些实用工具创建了一个包含一个基于分块步骤的作业,该步骤定义如下:

  • 一个 Item Reader,它从 UrlResource 读取数据。在某些云环境中,文件系统可能是只读的甚至不存在,因此无需下载即可流式传输数据的能力几乎是一个基本要求。幸运的是,Spring Batch 已经为你考虑到了!所有基于文件的 Item Reader(用于纯文本文件、XML 文件和 JSON 文件)都基于强大的 Spring Framework Resource 抽象工作,因此任何 Resource 的实现都应该适用。在此示例中,我使用 UrlResource 直接从 GitHub 上的 sample-data.csv 的远程 URL 读取数据,而无需下载。文件名作为作业参数传入。
  • 一个 Item Writer,它将 Person 项写入 MySQL 中的 PEOPLE 表。

就是这样。接下来我们打包作业并使用 Spring Boot 的 Maven 插件为其创建 Docker 镜像:

$ mvn package
...
$ mvn spring-boot:build-image -Dspring-boot.build-image.imageName=benas/bootiful-job
[INFO] Scanning for projects...
[INFO]
…
[INFO] --- spring-boot-maven-plugin:2.4.1:build-image (default-cli) @ demo ---
[INFO] Building image 'docker.io/benas/bootiful-job:latest'
…
[INFO] Successfully built image 'docker.io/benas/bootiful-job:latest'
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

镜像现在应该已经正确构建,我们来检查一下:

$ docker images
REPOSITORY             TAG           IMAGE ID               CREATED             SIZE
benas/bootiful-job     latest        52244b284f08    41 seconds ago   242MB

注意 Spring Boot 是如何在无需创建 Dockerfile 的情况下创建 Docker 镜像的!Josh Long 曾就这项出色的功能撰写过一篇完整的博文:YMNNALFT: 使用 Spring Boot Maven Plugin 和 Buildpacks 轻松创建 Docker 镜像。现在让我们在 Docker 容器中运行这个作业,以检查一切是否按预期工作:

$ docker run \
   -e SPRING_DATASOURCE_URL=jdbc:mysql://192.168.1.53:3306/test \
   -e SPRING_DATASOURCE_USERNAME=root \
   -e SPRING_DATASOURCE_PASSWORD=root \
   -e SPRING_DATASOURCE_DRIVER-CLASS-NAME=com.mysql.cj.jdbc.Driver \
   benas/bootiful-job \
   fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv

你应该会看到类似如下输出:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)

2021-01-08 17:03:15.009  INFO 1 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication v0.0.1-SNAPSHOT using Java 1.8.0_275 on 876da4a1cfe0 with PID 1 (/workspace/BOOT-INF/classes started by cnb in /workspace)
2021-01-08 17:03:15.012  INFO 1 --- [           main] com.example.demo.DemoApplication         : No active profile set, falling back to default profiles: default
2021-01-08 17:03:15.899  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-01-08 17:03:16.085  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-01-08 17:03:16.139  INFO 1 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-01-08 17:03:16.292  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-01-08 17:03:16.411  INFO 1 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 1.754 seconds (JVM running for 2.383)
2021-01-08 17:03:16.414  INFO 1 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv]
2021-01-08 17:03:16.536  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}]
2021-01-08 17:03:16.596  INFO 1 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2021-01-08 17:03:17.481  INFO 1 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 884ms
2021-01-08 17:03:17.501  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}] and the following status: [COMPLETED] in 934ms
2021-01-08 17:03:17.513  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-08 17:03:17.534  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

作业现已完成,我们可以检查数据是否已成功加载到数据库中:

mysql> select * from PEOPLE;
+----+------------+-----------+
| ID | FIRST_NAME | LAST_NAME |
+----+------------+-----------+
|  1 | Jill       | Doe       |
|  2 | Joe        | Doe       |
|  3 | Justin     | Doe       |
|  4 | Jane       | Doe       |
|  5 | John       | Doe       |
+----+------------+-----------+
5 rows in set (0.00 sec)

大功告成!现在我们将这个作业部署到 Kubernetes 上。然而,在继续并将此作业部署到 Kubernetes 之前,我想展示两件事:

防止同一作业实例重复执行

如果你想了解 Spring Batch 如何防止重复作业执行,可以尝试使用相同的命令重新运行该作业。应用程序应该会启动失败并显示以下错误:

2021-01-08 20:21:20.752 ERROR 1 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:785) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) [spring-boot-2.4.1.jar:2.4.1]
	at com.example.demo.DemoApplication.main(DemoApplication.java:30) [classes/:0.0.1-SNAPSHOT]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_275]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_275]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_275]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_275]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:107) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) [workspace/:na]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) [workspace/:na]
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}.  If you want to run this job again, change the parameters.
…

Spring Batch 不允许在同一作业实例成功完成后再次运行。这是故意的设计,旨在防止由于人为错误或平台限制导致的重复作业执行,正如前一节所解释的。

防止同一作业实例并发执行

同样,Spring Batch 也阻止同一作业实例的并发执行。要测试这一点,可以添加一个进行 Thread.sleep 以减慢处理速度的 Item Processor,然后在第一个作业正在运行时尝试运行第二个作业执行(在另一个终端中)。第二次(并发)尝试会失败并出现以下错误:

2021-01-08 20:59:04.201 ERROR 1 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:785) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) [spring-boot-2.4.1.jar:2.4.1]
	at com.example.demo.DemoApplication.main(DemoApplication.java:31) [classes/:0.0.1-SNAPSHOT]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_275]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_275]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_275]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_275]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:107) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) [workspace/:na]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) [workspace/:na]
Caused by: org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobExecution: id=1, version=1, startTime=2021-01-08 20:58:46.434, endTime=null, lastUpdated=2021-01-08 20:58:46.435, status=STARTED, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, Job=[job]], jobParameters=[{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}]
…

得益于集中式作业仓库,Spring Batch 可以检测当前正在运行的执行(基于数据库中的作业状态),并通过抛出 JobExecutionAlreadyRunningException 来阻止在同一节点或集群中任何其他节点上的并发执行。

3. 在 Kubernetes 上部署作业

设置 Kubernetes 集群超出了本文的范围,因此我假设你已经有一个正在运行的 Kubernetes 集群,并且可以使用 kubectl 与其交互。在本文中,我使用 Docker Desktop 应用程序提供的单节点本地 Kubernetes 集群。

首先,我为外部数据库创建一个服务,如 Kubernetes 最佳实践:映射外部服务 中的“场景 1:集群外部带有 IP 地址的数据库”所述。以下是服务定义:

kind: Service
apiVersion: v1
metadata:
  name: mysql
spec:
    type: ClusterIP
    ports:
      - port: 3306
        targetPort: 3306
---
kind: Endpoints
apiVersion: v1
metadata:
  name: mysql
subsets:
  - addresses:
      - ip: 192.168.1.53 # This is my local IP, you might need to change it if needed
    ports:
      - port: 3306
---
apiVersion: v1
kind: Secret
metadata:
  name: db-secret
type: Opaque
data:
  # base64 of "root" ($>echo -n "root" | base64)
  db.username: cm9vdA==
  db.password: cm9vdA==

这个服务可以应用到 Kubernetes,如下所示:

$ kubectl apply -f src/kubernetes/database-service.yaml

现在,由于我们已经为作业创建了 Docker 镜像,将其部署到 Kubernetes 只需要使用以下清单文件定义一个 Job 资源即可:

apiVersion: batch/v1
kind: Job
metadata:
  name: bootiful-job-$JOB_NAME
spec:
  template:
    spec:
      restartPolicy: OnFailure
      containers:
        - name: bootiful-job
          image: benas/bootiful-job
          imagePullPolicy: Never
          args: ["fileName=$FILE_NAME"]
          env:
            - name: SPRING_DATASOURCE_DRIVER-CLASS-NAME
              value: com.mysql.cj.jdbc.Driver
            - name: SPRING_DATASOURCE_URL
              value: jdbc:mysql://mysql/test
            - name: SPRING_DATASOURCE_USERNAME
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: db.username
            - name: SPRING_DATASOURCE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: db.password

这个清单文件遵循了 Kubernetes 文档建议的基于模板创建作业的相同方法。此作业模板作为基础,用于为每个要摄取的输入文件创建作业。我已经摄取了 sample1.csv 文件,因此我将使用以下命令为另一个名为 sample2.csv 的远程文件创建一个作业:

$ JOB_NAME=sample2 \
  FILE_NAME="https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv" \
  envsubst < src/k8s/job.yaml | kubectl apply -f -

这个命令会替换作业模板中的变量,为给定的文件创建作业定义,然后将其提交给 Kubernetes。让我们检查 Kubernetes 中的作业和 Pod 资源:

$ kubectl get jobs
NAME                  COMPLETIONS   DURATION   AGE
bootiful-job-sample2   0/1           97s        97s

$ kubectl get pods
NAME                             READY   STATUS      RESTARTS   AGE
bootiful-job-sample2-n8mlb   0/1     Completed   0          7s

$ kubectl logs bootiful-job-sample2-n8mlb
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)

2021-01-08 17:48:42.053  INFO 1 --- [           main] com.example.demo.BootifulJobApplication  : Starting BootifulJobApplication v0.1 using Java 1.8.0_275 on bootiful-job-person-n8mlb with PID 1 (/workspace/BOOT-INF/classes started by cnb in /workspace)
2021-01-08 17:48:42.056  INFO 1 --- [           main] com.example.demo.BootifulJobApplication  : No active profile set, falling back to default profiles: default
2021-01-08 17:48:43.028  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-01-08 17:48:43.180  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-01-08 17:48:43.231  INFO 1 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-01-08 17:48:43.394  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-01-08 17:48:43.541  INFO 1 --- [           main] com.example.demo.BootifulJobApplication  : Started BootifulJobApplication in 1.877 seconds (JVM running for 2.338)
2021-01-08 17:48:43.544  INFO 1 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv]
2021-01-08 17:48:43.677  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv}]
2021-01-08 17:48:43.758  INFO 1 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2021-01-08 17:48:44.632  INFO 1 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 873ms
2021-01-08 17:48:44.653  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv}] and the following status: [COMPLETED] in 922ms
2021-01-08 17:48:44.662  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-08 17:48:44.693  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

然后你可以在 PEOPLE 表中检查新添加的人员:

mysql> select * from PEOPLE;
+----+------------+-----------+
| ID | FIRST_NAME | LAST_NAME |
+----+------------+-----------+
|  1 | Jill       | Doe       |
|  2 | Joe        | Doe       |
|  3 | Justin     | Doe       |
|  4 | Jane       | Doe       |
|  5 | John       | Doe       |
|  6 | David      | Doe       |
|  7 | Damien     | Doe       |
|  8 | Danny      | Doe       |
|  9 | Dorothy    | Doe       |
|  10 | Daniel    | Doe       |

+----+------------+-----------+
10 rows in set (0.00 sec)

就是这样,我们的作业已成功运行在 Kubernetes 中!

技巧与窍门

在结束本文之前,我想分享一些将 Spring Batch 作业迁移到 Kubernetes 云端时值得考虑的技巧和窍门。

1. 作业打包与部署

在单个容器或 Pod 中运行多个 Spring Batch 作业不是一个好主意。这不符合云原生开发的最佳实践和通常的 Unix 哲学。每个容器或 Pod 运行一个作业具有以下优点:

  • 日志分离
  • 独立的生命周期(错误、功能、部署等)
  • 独立的参数和退出码
  • 可重启性(如果失败,仅重新启动失败的作业)

2. 选择合适的 Spring Batch 作业参数

成功的 Spring Batch 作业实例不能重新启动。同样,成功的 Kubernetes 作业也不能重新启动。这使得为每个 Spring Batch 作业实例设计一个 Kubernetes 作业成为完美的匹配!因此,正确选择 Spring Batch 中的标识性作业参数成为一项至关重要的任务,因为它决定了作业实例的身份,进而影响 Kubernetes 作业的设计(参见第 3 点)。框架的两个重要方面受此选择影响:

  • 作业标识:Spring Batch 根据作业实例的身份防止重复和并发的作业执行。
  • 故障场景:Spring Batch 依赖于作业实例的身份,以便从上一次中断的地方开始新的作业执行。

批处理是关于处理**固定、不可变**的数据集。如果输入数据不是固定的,那么流处理工具更合适。Spring Batch 中的标识性作业参数应代表**唯一可标识的不可变**数据集。正确选择一组标识性作业参数的一个好提示是计算它们的哈希值(或者更准确地说是它们所代表的数据的哈希值),并确保该哈希值是稳定的。以下是一些示例:

作业参数 好/坏 评论
fileName=log.txt 不断增长的日志文件不是固定的数据集
fileName=transactions-2020-08-20.csv 只要文件内容是固定的
folderName=/in/data 内容可变的文件夹不是固定的数据集
folderName=/in/data/2020/12/20 包含某一天所有订单文件的文件夹
jmsQueueName=events 消息从队列中移除,因此这不是固定的数据集
orderDate=2020-08-20 例如,如果在 D+1 天的数据库查询中使用

不幸的是,许多人在设计良好的标识性作业参数时失败了,最终添加了时间戳或随机数作为额外的标识性作业参数,充当作业实例的判别器。使用不断增长的“run.id”参数就是这种失败的表现。

3. 选择正确的 Kubernetes 作业部署模式

Kubernetes 文档提供了一个完整的章节,叫做作业模式(Job patterns),其中描述了如何选择正确的作业部署模式。在本文中,我遵循了使用扩展进行并行处理(Parallel processing using expansions)的方法,从模板为每个文件创建一个作业。虽然这种方法允许并行处理多个文件,但当需要摄入大量文件时,可能会给 Kubernetes 带来压力,因为这将导致创建大量 Kubernetes 作业对象。如果您的所有文件结构相似,并且您想创建一个单一作业一次性摄入它们,您可以使用 Spring Batch 提供的 MultiResourceItemReader 并创建一个单一的 Kubernetes 作业。另一种选择是使用一个带有分区步骤的单一作业,其中每个工作步骤处理一个文件(这可以通过使用内置的 MultiResourcePartitioner 来实现)。

4. 优雅/强制关机的影响

当 Spring Batch 作业执行失败时,如果作业实例是可重启的,您可以重新启动它。只要作业执行优雅地关机,您就可以实现自动化,因为这使得 Spring Batch 有机会正确地将作业执行的状态设置为 FAILED 并将其 END_TIME 设置为非空值。然而,如果作业执行突然失败,作业执行的状态仍将是 STARTED 且其 END_TIMEnull。当您尝试重新启动此类作业执行时,Spring Batch 将认为(因为它只查看数据库状态)此实例当前有一个作业执行正在运行,并因 JobExecutionAlreadyRunningException 而失败。在这种情况下,应更新元数据表以允许重新启动此类失败的执行——例如:

> update BATCH_JOB_EXECUTION set status = 'FAILED', END_TIME = '2020-01-15 10:10:28.235' where job_execution_id = X;
> update BATCH_STEP_EXECUTION set status = 'FAILED' where job_execution_id = X and step_name='failed step name';

Spring Batch 作业的优雅/强制关机与 Kubernetes 作业的重启策略直接相关。例如,对于 restartPolicy=OnFailure,当一个 Pod 突然失败且作业控制器紧接着创建一个新的 Pod 时,您无法及时更新数据库,新的 Spring Batch 作业执行将因 JobExecutionAlreadyRunningException 而失败。第三个 Pod 也会发生同样的情况,以此类推,直到 Pod 达到 CrashLoopBackOff 状态,并且一旦超过 backoffLimit 就会被删除。

现在,如果您按照上述代码片段所示,采用使用 System.exit(SpringApplication.exit(SpringApplication.run(MyBatchApplication.class, args))); 运行 Spring Boot Batch 应用程序的最佳实践,当 Kubernetes 启动Pod 终止过程时,Spring Boot(进而 Spring Batch)可以正确处理 SIGTERM 信号并优雅地关机您的应用程序。有了这个机制,当 Pod 优雅地关机时,Spring Batch 作业实例可以自动重启直到完成。不幸的是,Kubernetes Pod 的优雅关机并非总是能保证,您在设置重启策略和 backoffLimit 值时应考虑到这一点,以确保您有足够的时间根据需要为失败的作业更新作业仓库。

应该注意的是,docker 的 ENTRYPOINTshell 形式不会将 Unix 信号发送给容器中运行的子进程。因此,为了让在容器中运行的 Spring Batch 作业能够正确拦截 Unix 信号,ENTRYPOINT 的形式应该是 exec。这也与上面提到的 Kubernetes 的 Pod 终止过程直接相关。关于此事的更多详细信息,可以在Kubernetes 最佳实践:优雅终止(Kubernetes best practices: terminating with grace)这篇博客文章中找到。

5. 选择正确的 Kubernetes 作业并发策略

正如我前面指出的,Spring Batch 会阻止同一作业实例的并发执行。因此,如果您遵循“每个 Spring Batch 作业实例对应一个 Kubernetes 作业”的部署模式,将作业的 spec.parallelism 设置为大于 1 的值是没有意义的,因为这会并行启动两个 Pod,其中一个肯定会因 JobExecutionAlreadyRunningException 而失败。然而,对于分区作业而言,将 spec.parallelism 设置为大于 1 的值非常有意义。在这种情况下,分区可以在并行 Pod 中执行。正确选择并发策略与选择哪种作业模式紧密相关(如第 3 点所述)。

6. 作业元数据清理

删除一个 Kubernetes 作业会删除其对应的 Pod。Kubernetes 提供了一种通过使用 ttlSecondsAfterFinished 参数自动清理已完成作业的方法。然而,Spring Batch 中没有与此等效的机制:您应该手动清理作业仓库。对于任何严肃的生产批处理基础设施,您都应该考虑到这一点,因为作业实例和执行可能会增长得非常快,具体取决于部署作业的频率和数量。我认为这里有一个很好的机会来创建一个 Kubernetes 自定义资源定义(Custom Resource Definition),以便在删除相应的 Kubernetes 作业时删除 Spring Batch 的元数据。

结论

我希望这篇文章能为在云中设计、开发和运行批处理应用程序所面临的挑战,以及 Spring Batch、Spring Boot 和 Kubernetes 如何极大地简化这项任务提供一些启示。由于 Spring 生态系统的生产力,本文展示了如何通过三个简单的步骤从start.spring.io到 Kubernetes,但这仅仅是问题的表面。本文是博客系列的第一部分,在后续文章中我将涵盖在 Kubernetes 上运行 Spring Batch 作业的其他方面。在接下来的文章中,我将探讨使用MicrometerWavefront进行作业可观测性,然后是如何在 Kubernetes 上扩展 Spring Batch 作业。敬请期待!

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持连接

订阅

取得进步

VMware 提供培训和认证,助您快速进步。

了解更多

获取支持

Tanzu Spring 通过一份简单订阅,即可获得对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区的所有即将到来的活动。

查看全部