Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
创建批处理服务
本指南将指导你完成创建基本批处理驱动的解决方案的过程。
你将构建什么
你将构建一个从 CSV 电子表格导入数据、使用自定义代码对其进行转换并将最终结果存储在数据库中的服务。
你需要什么
-
大约 15 分钟
-
一个喜爱的文本编辑器或 IDE
-
Java 17 或更高版本
-
你还可以将代码直接导入你的 IDE
如何完成本指南
与大多数 Spring 入门指南 一样,你可以从头开始完成每一步,或者你可以绕过你已经熟悉的设置步骤。无论哪种方式,你最终都会得到可用的代码。
要从头开始,请转到 从 Spring Initializr 开始。
要跳过基础知识,请执行以下操作
-
下载并解压本指南的源代码仓库,或使用 Git 克隆它:
git clone https://github.com/spring-guides/gs-batch-processing.git
-
cd 到
gs-batch-processing/initial
-
跳转到 业务数据。
完成后,你可以将你的结果与 gs-batch-processing/complete
中的代码进行比较。
从 Spring Initializr 开始
你可以使用这个 预初始化项目,然后点击生成以下载一个 ZIP 文件。该项目已配置为适合本教程中的示例。
手动初始化项目
-
导航到 https://start.spring.io。此服务将提取应用程序所需的所有依赖项,并为你完成大部分设置。
-
选择 Gradle 或 Maven 以及你想要使用的语言。本指南假设你选择了 Java。
-
点击依赖项,然后选择Spring Batch和HyperSQL 数据库。
-
点击生成。
-
下载生成的 ZIP 文件,该文件是一个 web 应用程序的存档,已使用你的选择进行配置。
如果你的 IDE 集成了 Spring Initializr,你可以从你的 IDE 完成此过程。 |
你还可以从 Github 分叉项目,然后在你的 IDE 或其他编辑器中打开它。 |
业务数据
通常,你的客户或业务分析师会提供一个电子表格。对于这个简单的示例,你可以在 src/main/resources/sample-data.csv
中找到一些虚构的数据
此电子表格在每一行包含一个名字和一个姓氏,用逗号分隔。这是一个相当常见的模式,Spring 可以处理,无需自定义。
接下来,你需要编写一个 SQL 脚本来创建一个表以存储数据。你可以在 src/main/resources/schema-all.sql
中找到这样的脚本
DROP TABLE people IF EXISTS;
CREATE TABLE people (
person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);
Spring Boot 在启动时自动运行 schema-@@platform@@.sql 。-all 是所有平台的默认值。 |
创建业务类
现在你可以看到数据输入和输出的格式,你可以编写代码来表示一行数据,如下面的示例(来自 src/main/java/com/example/batchprocessing/Person.java
)所示
package com.example.batchprocessing;
public record Person(String firstName, String lastName) {
}
你可以通过构造函数使用名字和姓氏实例化 Person
记录。
创建中间处理器
批处理中的一个常见范例是摄取数据、转换数据,然后将其管道到其他地方。在这里,你需要编写一个简单的转换器,将名称转换为大写。以下清单(来自 src/main/java/com/example/batchprocessing/PersonItemProcessor.java
)显示了如何执行此操作
package com.example.batchprocessing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) {
final String firstName = person.firstName().toUpperCase();
final String lastName = person.lastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
return transformedPerson;
}
}
PersonItemProcessor
实现了 Spring Batch 的 ItemProcessor
接口。这使得将代码连接到您将在本指南后面定义的批处理作业变得容易。根据该接口,您会收到一个传入的 Person
对象,然后将其转换为大写的 Person
。
输入和输出类型不必相同。事实上,在读取一个数据源后,有时应用程序的数据流需要不同的数据类型。 |
组合批处理作业
现在您需要组合实际的批处理作业。Spring Batch 提供了许多实用程序类,减少了编写自定义代码的需要。相反,您可以专注于业务逻辑。
要配置您的作业,您必须首先创建一个 Spring @Configuration
类,如下面的 src/main/java/com/example/batchprocessing/BatchConfiguration.java
中的示例。此示例使用基于内存的数据库,这意味着完成后数据将消失。现在,将以下 Bean 添加到您的 BatchConfiguration
类中,以定义一个读取器、一个处理器和一个写入器
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names("firstName", "lastName")
.targetType(Person.class)
.build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
第一段代码定义了输入、处理器和输出。
-
reader()
创建一个ItemReader
。它查找一个名为sample-data.csv
的文件,并解析每行项目,其中包含足够的信息将其转换为Person
。 -
processor()
创建一个您之前定义的PersonItemProcessor
实例,用于将数据转换为大写。 -
writer(DataSource)
创建一个ItemWriter
。此项针对 JDBC 目标,并自动获取 Spring Boot 创建的 dataSource 的副本。它包括插入单个Person
所需的 SQL 语句,由 Java 记录组件驱动。
最后一段(来自 src/main/java/com/example/batchprocessing/BatchConfiguration.java
)显示了实际的作业配置
@Bean
public Job importUserJob(JobRepository jobRepository,Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importUserJob", jobRepository)
.listener(listener)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository)
.<Person, Person> chunk(3, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
第一个方法定义了作业,第二个方法定义了单个步骤。作业由步骤构建,其中每个步骤可能涉及一个读取器、一个处理器和一个写入器。
然后您列出每个步骤(尽管此作业只有一个步骤)。作业结束,Java API 生成了一个配置完善的作业。
在步骤定义中,您定义一次要写入多少数据。在这种情况下,它一次最多写入三条记录。接下来,您使用前面注入的 Bean 配置读取器、处理器和写入器。
chunk() 前缀为 <Person,Person> ,因为它是一个泛型方法。这表示每个处理“块”的输入和输出类型,并与 ItemReader<Person> 和 ItemWriter<Person> 对齐。 |
批处理配置的最后一点是获取作业完成时的通知方式。以下示例(来自 src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java
)显示了这样的类
package com.example.batchprocessing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate
.query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
.forEach(person -> log.info("Found <{{}}> in the database.", person));
}
}
}
JobCompletionNotificationListener
监听作业为 BatchStatus.COMPLETED
的时间,然后使用 JdbcTemplate
检查结果。
使应用程序可执行
尽管批处理可以嵌入到 Web 应用程序和 WAR 文件中,但下面演示的更简单的方法创建了一个独立的应用程序。您将所有内容打包到一个可执行 JAR 文件中,由一个古老的 Java main()
方法驱动。
Spring Initializr 为您创建了一个应用程序类。对于此简单示例,它无需进一步修改即可工作。以下清单(来自 src/main/java/com/example/batchprocessing/BatchProcessingApplication.java
)显示了应用程序类
package com.example.batchprocessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchProcessingApplication {
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
}
}
@SpringBootApplication
是一个便捷注释,它添加了以下所有内容
-
@Configuration
:将类标记为应用程序上下文的 bean 定义源。 -
@EnableAutoConfiguration
:告诉 Spring Boot 根据类路径设置、其他 bean 和各种属性设置开始添加 bean。例如,如果类路径中包含spring-webmvc
,此注释将应用程序标记为 Web 应用程序并激活关键行为,例如设置DispatcherServlet
。 -
@ComponentScan
:告诉 Spring 在com/example
包中查找其他组件、配置和服务,以便它找到控制器。
main()
方法使用 Spring Boot 的 SpringApplication.run()
方法启动应用程序。您是否注意到没有一行 XML?也没有 web.xml
文件。此 Web 应用程序是 100% 纯 Java,您不必处理配置任何管道或基础设施。
请注意,SpringApplication.exit()
和 System.exit()
确保在作业完成后 JVM 退出。有关更多详细信息,请参阅 Spring Boot 参考文档中的应用程序退出部分。
出于演示目的,有代码用于创建 JdbcTemplate
、查询数据库和打印出批处理作业插入的人员姓名。
请注意应用程序如何不使用 |
构建可执行 JAR
您可以使用 Gradle 或 Maven 从命令行运行应用程序。您还可以构建一个包含所有必需的依赖项、类和资源的可执行 JAR 文件并运行它。构建可执行 jar 使得在整个开发生命周期、不同环境中轻松地交付、版本化和部署服务作为应用程序,等等。
如果您使用 Gradle,可以使用 ./gradlew bootRun
运行应用程序。或者,您可以使用 ./gradlew build
构建 JAR 文件,然后运行 JAR 文件,如下所示
如果您使用 Maven,可以使用 ./mvnw spring-boot:run
运行应用程序。或者,您可以使用 ./mvnw clean package
构建 JAR 文件,然后运行 JAR 文件,如下所示
此处描述的步骤会创建一个可运行的 JAR。您还可以构建一个经典 WAR 文件。 |
该作业会为每个被转换的人打印一行。作业运行后,您还可以看到查询数据库的输出。它应类似于以下输出
Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
Found <{Person[firstName=JILL, lastName=DOE]}> in the database.
Found <{Person[firstName=JOE, lastName=DOE]}> in the database.
Found <{Person[firstName=JUSTIN, lastName=DOE]}> in the database.
Found <{Person[firstName=JANE, lastName=DOE]}> in the database.
Found <{Person[firstName=JOHN, lastName=DOE]}> in the database.
摘要
恭喜!您构建了一个批处理作业,该作业从电子表格中摄取数据,处理数据,然后将其写入数据库。