Kubernetes 上的 Spring Batch:高效大规模批处理

工程 | Mahmoud Ben Hassine | 2021年1月28日 | ...

引言

自打孔卡和磁带时代起,批处理就是计算机科学中一个充满挑战的领域。如今,现代云计算时代为如何在云环境中高效地开发和运行批处理工作负载带来了一系列新的挑战。在这篇博文中,我将介绍批处理开发者或架构师在设计和运行大规模批处理应用程序时可能面临的一些挑战,并展示 Spring Batch、Spring Boot 和 Kubernetes 如何极大地简化这项任务。

在云中设计和运行批处理工作负载的挑战

与 Web 应用程序相比,设计云原生批处理应用程序似乎很容易,但事实并非如此。批处理开发者面临许多挑战。

1. 容错性

批处理过程通常会与其他服务(例如数据库、消息代理、Web 服务等)进行交互,而这些服务在云环境中本质上是不稳定的。此外,运行这些进程的节点本身也可能随时失效并被健康的节点替换。云原生批处理应用程序应该以容错的方式进行设计。

2. 健壮性

运行批处理作业两次的人为错误导致严重财务后果的情况并不少见(例如沃尔格林澳大利亚国民银行苏格兰皇家银行等)。此外,Kubernetes 等一些平台在意外运行同一作业两次方面存在一些已知的局限性。云原生批处理应用程序应该通过设计来处理此类问题。

3. 成本效率

云基础设施按 CPU/内存/带宽使用情况计费。如果发生故障,无法从中断处重新启动作业并“损失”先前运行的 CPU/内存/带宽使用情况(因此会被计费两次或更多次!)将是低效的。

4. 可观察性

任何现代批处理架构都应该能够在任何时间点知道一些关键指标,包括

  • 当前正在运行哪些作业?
  • 如果有任何作业失败?
  • 关于事情进展的其他问题。

能够在一目了然的仪表板上查看这些 KPI 对于高效运营至关重要。

5. 可扩展性

我们正在处理前所未有的海量数据,这些数据已经不可能在一台机器上处理了。正确处理大容量分布式数据可能是最具挑战性的问题。云原生批处理应用程序应该具有可扩展性。

在设计和开发云原生批处理应用程序时,应该考虑所有这些方面。这对开发者来说是一项相当大的工作。Spring Batch 负责处理大多数这些问题。我在下一节中解释详细信息。

Spring Batch 如何让批处理开发人员的工作更轻松?

Spring Batch 是 JVM 上事实上的批处理框架。已经写了很多书来介绍 Spring Batch 提供的丰富的功能集,但我想重点介绍在云原生开发环境中解决前面提到的挑战的最相关功能。

1. 容错性

Spring Batch 提供了容错功能,例如事务管理以及跳过和重试机制,这些机制在批处理作业与云环境中不稳定的服务交互时非常有用。

2. 健壮性

Spring Batch 使用集中式事务性作业存储库,这可以防止重复执行作业。通过设计,可能导致同一作业运行两次的人为错误和平台限制是不可能的。

3. 成本效率

Spring Batch 作业将其状态保存在外部数据库中,这使得可以从中断处重新启动失败的作业。与从头开始重新执行工作并因此会被计费两次或更多次的其它解决方案相比,这是具有成本效益的!

4. 可观测性

Spring Batch 集成了 Micrometer,这对于可观测性至关重要。基于 Spring Batch 的批处理基础设施提供了关键指标,例如当前活动的作业、读/写速率、失败的作业等等。它甚至可以扩展自定义指标。

5. 可扩展性

如前所述,Spring Batch 作业将其状态保存在外部数据库中。因此,从 12 要素方法 的角度来看,它们是无状态的进程。这种无状态特性使它们适合容器化并在云环境中以可扩展的方式执行。此外,Spring Batch 提供了几种垂直和水平扩展技术,例如多线程步骤和远程数据分区/分块,以便高效地扩展批处理作业。

Spring Batch 提供了其他功能,但上面提到的功能在设计和开发云原生批处理过程时非常有用。

Kubernetes 如何简化批处理操作员的工作?

Kubernetes 是云计算领域事实上的容器编排平台。大规模运行批处理基础设施绝非易事,而 Kubernetes 在这方面确实是一个改变游戏规则的工具。在云时代之前,在我之前的一份工作中,我担任批处理操作员的角色,我必须管理一个专门用于批处理作业的 4 台机器的集群。以下是一些我必须手动完成或找到一种方法使用 (bash!) 脚本自动化的任务:

  • 通过 ssh 登录到每台机器以检查当前正在运行的作业。
  • 通过 ssh 登录到每台机器以收集失败作业的日志。
  • 通过 ssh 登录到每台机器以升级作业版本或更新其配置。
  • 通过 ssh 登录到每台机器以终止挂起的作业并重新启动它们。
  • 通过 ssh 登录到每台机器以编辑/更新 crontab 文件以进行作业调度。
  • 许多其他类似的任务……

所有这些任务显然效率低下且容易出错,由于资源管理不善,导致四台专用机器利用率不足。如果您在 2021 年仍在执行此类任务(手动或通过脚本),我认为现在是时候考虑将您的批处理基础设施迁移到 Kubernetes 了。原因是 Kubernetes 允许您对**整个**集群使用**单个**命令完成所有这些任务,从操作角度来看,这与之前有着**巨大**的不同。迁移到 Kubernetes 允许您:

  • 使用单个命令查询整个集群的当前运行作业。
  • 提交/调度作业,而无需知道它们将在哪个节点上运行。
  • 透明地更新作业定义。
  • 自动运行作业直至完成(Kubernetes 作业创建一个或多个 Pod,并确保指定数量的 Pod 成功终止)。
  • 优化集群资源的使用(Kubernetes 对集群机器进行资源优化分配),从而优化账单!
  • 使用许多其他有趣的功能。

Spring Batch 在 Kubernetes 上:完美的结合,实际应用

在本节中,我将采用在 Spring Batch 的 入门指南(这是一个将一些人员数据从 CSV 文件加载到关系数据库表中的数据导入作业)中开发的相同作业,将其容器化,并将其部署到 Kubernetes 上。如果您想通过将此作业包装在 Spring Cloud Task 中并在 Spring Cloud Data Flow 服务器中部署它来更进一步,请参阅 使用 Data Flow 部署 Spring Batch 应用程序

1. 设置数据库服务器

我使用 MySQL 数据库来存储 Spring Batch 元数据。数据库位于 Kubernetes 集群外部,这是有意的。原因是模拟一个现实的迁移路径,其中只有无状态工作负载在第一步迁移到 Kubernetes。对于许多公司来说,将数据库迁移到 Kubernetes 仍然不是一个选项(这是一个合理的决定)。要启动数据库服务器,请运行以下命令:

$ git clone [email protected]:benas/spring-batch-lab.git
$ cd blog/spring-batch-kubernetes
$ docker-compose -f src/docker/docker-compose.yml up

这将创建一个预先填充了 Spring Batch 的技术表 以及 业务表,PEOPLE 的 MySQL 容器。我们可以这样检查:

$ docker exec -it mysql bash
root@0a6596feb06d:/# mysql -u root test -p # the root password is "root"
Enter password:
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show tables;
+------------------------------+
| Tables_in_test               |
+------------------------------+
| BATCH_JOB_EXECUTION          |
| BATCH_JOB_EXECUTION_CONTEXT  |
| BATCH_JOB_EXECUTION_PARAMS   |
| BATCH_JOB_EXECUTION_SEQ      |
| BATCH_JOB_INSTANCE           |
| BATCH_JOB_SEQ                |
| BATCH_STEP_EXECUTION         |
| BATCH_STEP_EXECUTION_CONTEXT |
| BATCH_STEP_EXECUTION_SEQ     |
| PEOPLE                       |
+------------------------------+
10 rows in set (0.01 sec)

mysql> select * from PEOPLE;
Empty set (0.00 sec)

2. 创建一个优秀的、容器化的 Spring Batch 作业

访问 start.spring.io 并使用以下依赖项生成一个项目:Spring Batch 和 MySQL 驱动程序。您可以使用此 链接 创建项目。解压缩项目并在您喜欢的 IDE 中加载它之后,您可以更改主类,如下所示:

package com.example.demo;

import java.net.MalformedURLException;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.UrlResource;

@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication {

	public static void main(String[] args) {
		System.exit(SpringApplication.exit(
			SpringApplication.run(DemoApplication.class, args)));
	}

	@Bean
	@StepScope
	public Resource resource(@Value("#{jobParameters['fileName']}") String fileName) throws MalformedURLException {
		return new UrlResource(fileName);
	}

	@Bean
	public FlatFileItemReader<Person> itemReader(Resource resource)  {
		return new FlatFileItemReaderBuilder<Person>()
				.name("personItemReader")
				.resource(resource)
				.delimited()
				.names("firstName", "lastName")
				.targetType(Person.class)
				.build();
	}

	@Bean
	public JdbcBatchItemWriter<Person> itemWriter(DataSource dataSource) {
		return new JdbcBatchItemWriterBuilder<Person>()
				.dataSource(dataSource)
				.sql("INSERT INTO PEOPLE (FIRST_NAME, LAST_NAME) VALUES (:firstName, :lastName)")
				.beanMapped()
				.build();
	}

	@Bean
	public Job job(JobBuilderFactory jobs, StepBuilderFactory steps,
				   DataSource dataSource, Resource resource) {
		return jobs.get("job")
				.start(steps.get("step")
						.<Person, Person>chunk(3)
						.reader(itemReader(resource))
						.writer(itemWriter(dataSource))
						.build())
				.build();
	}

	public static class Person {
		private String firstName;
		private String lastName;
                // default constructor + getters/setters omitted for brevity
	}

}

@EnableBatchProcessing 注解设置了 Spring Batch所需的所有基础设施 bean(作业存储库、作业启动器等)以及一些实用程序,例如 JobBuilderFactoryStepBuilderFactory,以方便创建步骤和作业。在上面的代码片段中,我使用了这些实用程序来创建一个具有单个分块导向步骤的作业,定义如下:

  • 一个从 UrlResource 读取数据的项目读取器。在某些云环境中,文件系统是只读的,甚至不存在,因此无需下载即可流式传输数据的能力几乎是必不可少的。幸运的是,Spring Batch 为您提供了支持!所有基于文件的项目读取器(用于平面文件、XML 文件和 JSON 文件)都针对强大的 Spring Framework Resource 抽象工作,因此任何 Resource 的实现都应该可以工作。在此示例中,我使用 UrlResource 直接从 GitHub 的 sample-data.csv 的远程 URL 读取数据,而无需下载它。文件名作为作业参数传入。
  • 一个将 Person 项目写入 MySQL 中 PEOPLE 表的项目写入器。

就是这样。让我们打包作业并使用 Spring Boot 的 Maven 插件为其创建一个 Docker 镜像:

$ mvn package
...
$ mvn spring-boot:build-image -Dspring-boot.build-image.imageName=benas/bootiful-job
[INFO] Scanning for projects...
[INFO]
…
[INFO] --- spring-boot-maven-plugin:2.4.1:build-image (default-cli) @ demo ---
[INFO] Building image 'docker.io/benas/bootiful-job:latest'
…
[INFO] Successfully built image 'docker.io/benas/bootiful-job:latest'
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

现在应该正确构建镜像了,但让我们检查一下:

$ docker images
REPOSITORY             TAG           IMAGE ID               CREATED             SIZE
benas/bootiful-job     latest        52244b284f08    41 seconds ago   242MB

请注意 Spring Boot 如何在无需创建 Dockerfile 的情况下创建 Docker 镜像!Josh Long 撰写了一篇关于此出色功能的完整博文:YMNNALFT:使用 Spring Boot Maven 插件和 Buildpacks轻松创建 Docker 镜像。现在让我们在 Docker 容器中运行此作业,以检查一切是否按预期工作:

$ docker run \
   -e SPRING_DATASOURCE_URL=jdbc:mysql://192.168.1.53:3306/test \
   -e SPRING_DATASOURCE_USERNAME=root \
   -e SPRING_DATASOURCE_PASSWORD=root \
   -e SPRING_DATASOURCE_DRIVER-CLASS-NAME=com.mysql.cj.jdbc.Driver \
   benas/bootiful-job \
   fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv

您应该看到类似的内容:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)

2021-01-08 17:03:15.009  INFO 1 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication v0.0.1-SNAPSHOT using Java 1.8.0_275 on 876da4a1cfe0 with PID 1 (/workspace/BOOT-INF/classes started by cnb in /workspace)
2021-01-08 17:03:15.012  INFO 1 --- [           main] com.example.demo.DemoApplication         : No active profile set, falling back to default profiles: default
2021-01-08 17:03:15.899  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-01-08 17:03:16.085  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-01-08 17:03:16.139  INFO 1 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-01-08 17:03:16.292  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-01-08 17:03:16.411  INFO 1 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 1.754 seconds (JVM running for 2.383)
2021-01-08 17:03:16.414  INFO 1 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv]
2021-01-08 17:03:16.536  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}]
2021-01-08 17:03:16.596  INFO 1 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2021-01-08 17:03:17.481  INFO 1 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 884ms
2021-01-08 17:03:17.501  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}] and the following status: [COMPLETED] in 934ms
2021-01-08 17:03:17.513  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-08 17:03:17.534  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

作业现在已完成,我们可以检查数据是否已成功加载到数据库中:

mysql> select * from PEOPLE;
+----+------------+-----------+
| ID | FIRST_NAME | LAST_NAME |
+----+------------+-----------+
|  1 | Jill       | Doe       |
|  2 | Joe        | Doe       |
|  3 | Justin     | Doe       |
|  4 | Jane       | Doe       |
|  5 | John       | Doe       |
+----+------------+-----------+
5 rows in set (0.00 sec)

就是这样!现在让我们将此作业部署到 Kubernetes 上。但是,在继续并将此作业部署到 Kubernetes 之前,我想展示两件事:

防止同一作业实例的重复作业执行

如果您想了解 Spring Batch 如何防止重复作业执行,您可以尝试使用相同的命令重新运行作业。应用程序应该会因以下错误而无法启动:

2021-01-08 20:21:20.752 ERROR 1 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:785) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) [spring-boot-2.4.1.jar:2.4.1]
	at com.example.demo.DemoApplication.main(DemoApplication.java:30) [classes/:0.0.1-SNAPSHOT]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_275]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_275]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_275]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_275]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:107) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) [workspace/:na]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) [workspace/:na]
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}.  If you want to run this job again, change the parameters.
…

Spring Batch 不允许在成功完成之后重新运行相同的作业实例。这是为了防止由于人为错误或平台限制而导致的重复作业执行,如上一节所述。

防止同一作业实例的并发作业执行

同样,Spring Batch 也防止同一作业实例的并发执行。为了测试它,添加一个包含Thread.sleep的项目处理器来减慢处理速度,并在第一个作业执行运行时(在单独的终端中)尝试运行第二个作业执行。第二次(并发)尝试将失败,并出现以下错误:

2021-01-08 20:59:04.201 ERROR 1 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:785) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) [spring-boot-2.4.1.jar:2.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) [spring-boot-2.4.1.jar:2.4.1]
	at com.example.demo.DemoApplication.main(DemoApplication.java:31) [classes/:0.0.1-SNAPSHOT]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_275]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_275]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_275]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_275]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:107) [workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) [workspace/:na]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) [workspace/:na]
Caused by: org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobExecution: id=1, version=1, startTime=2021-01-08 20:58:46.434, endTime=null, lastUpdated=2021-01-08 20:58:46.435, status=STARTED, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, Job=[job]], jobParameters=[{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample1.csv}]
…

借助集中式作业存储库,Spring Batch 可以检测当前正在运行的执行(基于数据库中的作业状态),并通过抛出JobExecutionAlreadyRunningException来防止在同一节点或集群的任何其他节点上的并发执行。

3. 在 Kubernetes 上部署作业

设置 Kubernetes 集群超出了本文的范围,因此我假设您已经启动并运行了一个 Kubernetes 集群,并且可以使用kubectl与之交互。在这篇文章中,我使用了 Docker Desktop 应用程序提供的单节点本地 Kubernetes 集群。

首先,我为外部数据库创建一个服务,如 Kubernetes最佳实践:映射外部服务 中“方案 1:具有 IP 地址的集群外部数据库”中所述。以下是服务定义:

kind: Service
apiVersion: v1
metadata:
  name: mysql
spec:
    type: ClusterIP
    ports:
      - port: 3306
        targetPort: 3306
---
kind: Endpoints
apiVersion: v1
metadata:
  name: mysql
subsets:
  - addresses:
      - ip: 192.168.1.53 # This is my local IP, you might need to change it if needed
    ports:
      - port: 3306
---
apiVersion: v1
kind: Secret
metadata:
  name: db-secret
type: Opaque
data:
  # base64 of "root" ($>echo -n "root" | base64)
  db.username: cm9vdA==
  db.password: cm9vdA==

此服务可以应用于 Kubernetes,如下所示:

$ kubectl apply -f src/kubernetes/database-service.yaml

现在,由于我们已经为我们的作业创建了一个 Docker 镜像,因此将其部署到 Kubernetes 只需定义一个具有以下清单的Job资源即可:

apiVersion: batch/v1
kind: Job
metadata:
  name: bootiful-job-$JOB_NAME
spec:
  template:
    spec:
      restartPolicy: OnFailure
      containers:
        - name: bootiful-job
          image: benas/bootiful-job
          imagePullPolicy: Never
          args: ["fileName=$FILE_NAME"]
          env:
            - name: SPRING_DATASOURCE_DRIVER-CLASS-NAME
              value: com.mysql.cj.jdbc.Driver
            - name: SPRING_DATASOURCE_URL
              value: jdbc:mysql://mysql/test
            - name: SPRING_DATASOURCE_USERNAME
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: db.username
            - name: SPRING_DATASOURCE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: db.password

此清单遵循与 基于模板创建作业 相同的方法,正如 Kubernetes 文档所建议的那样。此作业模板用作为每个输入文件创建作业的基础。我已经导入了sample1.csv文件,所以我使用以下命令为名为 sample2.csv 的另一个远程文件创建作业:

$ JOB_NAME=sample2 \
  FILE_NAME="https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv" \
  envsubst < src/k8s/job.yaml | kubectl apply -f -

此命令替换作业模板中的变量以创建给定文件的作业定义,然后将其提交到 Kubernetes。让我们检查 Kubernetes 中的作业和 Pod 资源。

$ kubectl get jobs
NAME                  COMPLETIONS   DURATION   AGE
bootiful-job-sample2   0/1           97s        97s

$ kubectl get pods
NAME                             READY   STATUS      RESTARTS   AGE
bootiful-job-sample2-n8mlb   0/1     Completed   0          7s

$ kubectl logs bootiful-job-sample2-n8mlb
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)

2021-01-08 17:48:42.053  INFO 1 --- [           main] com.example.demo.BootifulJobApplication  : Starting BootifulJobApplication v0.1 using Java 1.8.0_275 on bootiful-job-person-n8mlb with PID 1 (/workspace/BOOT-INF/classes started by cnb in /workspace)
2021-01-08 17:48:42.056  INFO 1 --- [           main] com.example.demo.BootifulJobApplication  : No active profile set, falling back to default profiles: default
2021-01-08 17:48:43.028  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-01-08 17:48:43.180  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-01-08 17:48:43.231  INFO 1 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-01-08 17:48:43.394  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-01-08 17:48:43.541  INFO 1 --- [           main] com.example.demo.BootifulJobApplication  : Started BootifulJobApplication in 1.877 seconds (JVM running for 2.338)
2021-01-08 17:48:43.544  INFO 1 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv]
2021-01-08 17:48:43.677  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv}]
2021-01-08 17:48:43.758  INFO 1 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2021-01-08 17:48:44.632  INFO 1 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 873ms
2021-01-08 17:48:44.653  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{fileName=https://raw.githubusercontent.com/benas/spring-batch-lab/master/blog/spring-batch-kubernetes/data/sample2.csv}] and the following status: [COMPLETED] in 922ms
2021-01-08 17:48:44.662  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-08 17:48:44.693  INFO 1 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

然后,您可以检查PEOPLE表中新添加的人员。

mysql> select * from PEOPLE;
+----+------------+-----------+
| ID | FIRST_NAME | LAST_NAME |
+----+------------+-----------+
|  1 | Jill       | Doe       |
|  2 | Joe        | Doe       |
|  3 | Justin     | Doe       |
|  4 | Jane       | Doe       |
|  5 | John       | Doe       |
|  6 | David      | Doe       |
|  7 | Damien     | Doe       |
|  8 | Danny      | Doe       |
|  9 | Dorothy    | Doe       |
|  10 | Daniel    | Doe       |

+----+------------+-----------+
10 rows in set (0.00 sec)

就是这样,我们的作业已成功在 Kubernetes 中运行!

技巧与窍门

在结束本文之前,我想分享一些在将 Spring Batch 作业迁移到 Kubernetes 云端时值得考虑的技巧和窍门。

1. 作业打包和部署

在一个容器或 Pod 中运行多个 Spring Batch 作业并不是一个好主意。这通常不遵循云原生开发最佳实践和 Unix 哲学。每个容器或 Pod 运行一个作业具有以下优点:

  • 单独的日志
  • 独立的生命周期(错误、功能、部署等)
  • 单独的参数和退出代码
  • 可重启性(如果失败,只需重启失败的作业)

2. 选择正确的 Spring Batch 作业参数

成功的 Spring Batch 作业实例无法重启。同样,成功的 Kubernetes 作业也无法重启。这使得每个 Spring Batch 作业实例设计一个 Kubernetes 作业成为完美的匹配!因此,正确选择 Spring Batch 中的标识作业参数成为一项关键任务,因为这样做决定了作业实例的身份,进而决定了 Kubernetes 作业的设计(参见第 3 点)。框架的两个重要方面受此选择的影响:

  • 作业标识:Spring Batch 基于作业实例的身份来防止重复和并发作业执行。
  • 失败场景:Spring Batch 依靠作业实例的身份来启动从上一个作业执行中断的地方开始的新作业执行。

批量处理是关于处理**固定、不可变的**数据集。如果输入数据不是固定的,那么流处理工具更合适。Spring Batch 中的标识作业参数应该表示一个**唯一可标识的不可变**数据集。正确选择一组标识作业参数的一个好提示是计算它们的哈希值(更准确地说是它们表示的数据的哈希值),并确保该哈希值是稳定的。以下是一些示例:

作业参数 好/坏 评论
fileName=log.txt 不断增长的日志文件不是固定数据集
fileName=transactions-2020-08-20.csv 只要文件内容是固定的
folderName=/in/data 内容可变的文件夹不是固定数据集
folderName=/in/data/2020/12/20 包含特定日期所有收到的订单文件的文件夹
jmsQueueName=events 项目从队列中删除,因此这不是固定数据集
orderDate=2020-08-20 例如,用于 D+1 的数据库 select 查询中

不幸的是,许多人在设计良好的标识作业参数方面失败了,最终添加时间戳或随机数作为额外的标识作业参数,充当作业实例鉴别符。“run.id”参数的不断增长就是一个这样的失败症状。

3. 选择正确的 Kubernetes 作业部署模式

Kubernetes 的文档提供了一个名为 作业模式 的完整部分,其中描述了如何选择正确的作业部署模式。在这篇文章中,我遵循了 使用扩展进行并行处理 方法,从模板为每个文件创建一个作业。虽然此方法允许并行处理多个文件,但在需要导入许多文件时,它可能会给 Kubernetes 带来压力,因为这将导致创建许多 Kubernetes 作业对象。如果所有文件都具有类似的结构,并且您想创建一个作业来一次性导入它们,则可以使用 Spring Batch 提供的MultiResourceItemReader并创建一个 Kubernetes 作业。另一个选项是使用单个作业和一个分区步骤,其中每个工作步骤处理一个文件(这可以通过使用内置的MultiResourcePartitioner来实现)。

4. 良好/异常关闭的影响

当 Spring Batch 作业执行失败时,如果作业实例可重启,您可以重启它。只要作业执行正常关闭,您就可以自动化此操作,因为这使 Spring Batch 有机会将作业执行的状态正确设置为FAILED并将END_TIME设置为非空值。但是,如果作业执行意外失败,作业执行的状态仍然设置为STARTED,其END_TIMEnull。当您尝试重启此类作业执行时,Spring Batch 将认为(因为它只查看数据库状态)此实例当前正在运行作业执行,并出现JobExecutionAlreadyRunningException错误。在这种情况下,应更新元数据表以允许重启此类失败的执行,例如:

> update BATCH_JOB_EXECUTION set status = 'FAILED', END_TIME = '2020-01-15 10:10:28.235' where job_execution_id = X;
> update BATCH_STEP_EXECUTION set status = 'FAILED' where job_execution_id = X and step_name='failed step name';

Spring Batch 作业的良好/异常关闭直接与 Kubernetes 作业重启策略相关。例如,使用restartPolicy=OnFailure,当 Pod 意外失败并且作业控制器立即之后创建新 Pod 时,您无法及时更新数据库,并且新的 Spring Batch 作业执行将失败并出现JobExecutionAlreadyRunningException错误。第三个 Pod 也是如此,依此类推,直到 Pod 达到CrashLoopBackOff状态并在超过backoffLimit后被删除。

现在,如果您按照最佳实践使用System.exit(SpringApplication.exit(SpringApplication.run(MyBatchApplication.class, args)));运行 Spring Boot Batch 应用程序,如上所示的代码片段,则 Spring Boot(以及 Spring Batch)可以正确处理SIGTERM信号,并在 Kubernetes 启动 Pod 终止进程 时优雅地关闭您的应用程序。有了这个,当 Pod 优雅地关闭时,Spring Batch 作业实例可以自动重启直到完成。不幸的是,Kubernetes Pod 的优雅关闭并非完全可靠,在设置重启策略和backoffLimit值时,您应该考虑这一点,以确保您有足够的时间根据需要更新作业存储库以处理失败的作业。

需要注意的是,Docker 的ENTRYPOINTshell 形式不会向容器中运行的子进程发送 Unix 信号。因此,为了使在容器中运行的 Spring Batch 作业能够正确地拦截 Unix 信号,ENTRYPOINT 形式应为exec。这与上面提到的 Kubernetes 的 Pod 终止过程也直接相关。更多关于此事的详细信息可以在Kubernetes最佳实践:优雅终止博文中找到。

5. 选择正确的 Kubernetes Job 并发策略

正如我前面指出的,Spring Batch 阻止了对同一作业实例的并发作业执行。因此,如果您遵循“每个 Spring Batch 作业实例一个 Kubernetes 作业”的部署模式,将作业的spec.parallelism设置为大于 1 的值是没有意义的,因为这会并行启动两个 Pod,其中一个肯定会因JobExecutionAlreadyRunningException而失败。但是,对于分区作业,将spec.parallelism设置为大于 1 的值非常有意义。在这种情况下,分区可以在并行的 Pod 中执行。正确选择并发策略与选择的作业模式紧密相关(如第 3 点所述)。

6. 作业元数据清理

删除 Kubernetes 作业会删除其对应的 Pod。Kubernetes 提供了一种使用ttlSecondsAfterFinished参数自动清理已完成作业的方法。但是,Spring Batch 中没有与此等效的功能:您应该手动清理作业存储库。对于任何重要的生产批处理基础设施,您都应该考虑到这一点,因为作业实例和执行次数会根据部署作业的频率和数量快速增长。我认为这里有一个很好的机会可以创建一个 Kubernetes 自定义资源定义 (CRD),在对应的 Kubernetes 作业被删除时删除 Spring Batch 的元数据。

结论

我希望这篇文章能阐明在云中设计、开发和运行批处理应用程序的挑战,以及 Spring Batch、Spring Boot 和 Kubernetes 如何极大地简化这项任务。这篇文章展示了如何通过 Spring 生态系统的生产力,只需三个简单的步骤即可从start.spring.io迁移到 Kubernetes,但这仅仅是触及了问题的表面。这篇文章是一个博客系列的第一部分,我将在其中介绍运行 Spring Batch 作业在 Kubernetes 上的其他方面。在接下来的文章中,我将讨论使用MicrometerWavefront进行作业可观察性,以及如何在 Kubernetes 上扩展 Spring Batch 作业。敬请期待!

获取 Spring 新闻通讯

关注 Spring 新闻通讯

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部