保持领先
VMware 提供培训和认证,助您加速进步。
了解更多新发布的 Spring Batch 5.2 带来了大量新特性!Spring Batch 是一种处理大量但有限的顺序数据访问的有力方式。想想:从 SQL 数据库读取并写入 CSV,或者从 FTP 服务器读取并对 MongoDB 进行分析——这就是批处理。你知道这是什么。这项工作的一半(抱歉双关语!)是将各种数据源和多个数据汇集点集成起来。另一个方面,正如你可以想象到的,对于耗时很长且可能失败的工作负载,是维护与每个批处理作业运行相关的持久且详细的元数据。同样,我无法深入详细介绍此版本中的所有新颖特性!因此,让我们从高层面看看其中一些特性。
JobRepository
。在不久前,它有两个 JobRepository
实现:一个支持 JDBC,另一个通过内存中的 Map
支持“持久化”。Map
选项对于测试或那些结果的持久性不如纯性能重要的工作负载非常有用。我们移除了 Map
实现,建议人们使用像 H2 这样的内存中 SQL 数据库配合 JDBC JobRepository
。有些人需要纯粹的性能,H2 选项不够好。在这个版本中,我们引入了一个“无资源”的 JobRepository
,它不保存任何状态,甚至不在内存中。我们还添加了一个基于 JDBC 的 JobRepository
的持久替代方案,即基于 MongoDB 的 JobRepository
实现。ItemReader
时,新增了为 Spring Data JPA 查询注册提示的支持。ItemReader
时,新增了对数据类(Kotlin data class
或 Java record
实例)的支持。Function<I,O>
,以适配 ItemReader
、ItemWriter
和 ItemProcessor
类型。CompositeItemReader<T>
,可以顺序地从多个委托的 ItemReader<T>
中读取数据。RecursiveCollectionLineAggregator
中支持可配置的行分隔符CompositeItemReader<T>
让我们来看看我最喜欢的两个新特性:CompositeItemReader<T>
和 SEDA 友好的 BlockingQueueItemWriter
以及 BlockingQueueItemReader
实现。
这是此 Spring Batch 应用中唯一的 Job
的定义
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Configuration
class BatchConfiguration {
static final BlockingQueue<Customer> CUSTOMERS = new LinkedBlockingQueue<>();
@Bean
Job job(JobRepository repository, Step one, Step two) {
return new JobBuilder("job", repository)//
.incrementer(new RunIdIncrementer()) //
.start(one)//
.next(two)//
.build();
}
}
这是一个简单的作业,包含两个 Step
实例,一个流向另一个。快速提醒:在 Spring Batch 中,Step
是一个工作单元。它描述了四件事:
ItemReader<T>
实例表示)ItemWriter<T>
实例表示)每个 Step
使用 ItemReader<I>
读取一个 chunk 的数据量,将一个类似集合的东西(称为 Chunk)传递给 ItemProcessor<I,O>
进行任意操作,然后将 ItemProcessor<I,O>
的输出发送给 ItemWriter<O>
。I
和 O
可以代表相同的泛型类型或不同的类型。然后,循环继续进行,直到 ItemReader
中的所有数据都被读取完毕。该步骤被视为完成,执行将转移到下一个步骤。
在这个示例应用中,我们将从 customer
表读取数据,读取 id
、name
、os
和 language
记录。我们也将从一个 .csv
文件读取类似的数据。我们将使用方便的新特性 CompositeItemReader<Customer>
来快速完成这项工作,从而避免进行单独的规范化步骤。
package com.example.bootiful_34.batch;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.List;
@Configuration
@ImportRuntimeHints(StepOneConfiguration.CustomersCsvRuntimeHintsRegistrar.class)
class StepOneConfiguration {
private static final Resource CSV = new ClassPathResource("/customers.csv");
@Bean
FlatFileItemReader<Customer> customerCsvItemReader() {
return new FlatFileItemReaderBuilder<Customer>()//
.resource(CSV)
.delimited()
.names("id", "name", "language", "os")
.name("customerCsvItemReader")
.fieldSetMapper(fs -> new Customer(fs.readInt(0), fs.readString(1), fs.readString(2), fs.readString(3)))
.build();
}
@Bean
JdbcCursorItemReader<Customer> customerJdbcItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Customer>()//
.name("customerJdbcItemReader")//
.dataSource(dataSource)//
.sql("select id, name, language, os from customer")//
.rowMapper((rs, rowNum) -> new Customer(rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))//
.build();
}
@Bean
CompositeItemReader<Customer> customerCompositeItemReader(JdbcCursorItemReader<Customer> customerJdbcItemReader,
FlatFileItemReader<Customer> customerCsvItemReader) {
return new CompositeItemReader<>(List.of(customerJdbcItemReader, customerCsvItemReader));
}
@Bean
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter() {
return new BlockingQueueItemWriter<>(BatchConfiguration.CUSTOMERS);
}
@Bean
Step one(JobRepository repository, PlatformTransactionManager txm,
CompositeItemReader<Customer> customerCompositeItemReader,
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter) {
return new StepBuilder("one", repository)//
.<Customer, Customer>chunk(10, txm)//
.reader(customerCompositeItemReader)//
.writer(customerBlockingQueueItemWriter)//
.build();
}
static class CustomersCsvRuntimeHintsRegistrar implements RuntimeHintsRegistrar {
@Override
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
hints.resources().registerResource(CSV);
}
}
}
在这个示例中,我们有三个 ItemReader
bean,但步骤只消费一个 CompositeItemReader<T>
bean。它将依次读取来自 FlatFileItemReader<Customer>
和 JdbcCursorItemReader<Customer>
bean 的任何数据。
在这个示例中,我们没有配置 ItemProcessor<Customer,Customer>
。
对于 ItemWriter<Customer>
,我们使用了框架中另一个新颖的补充:BlockingQueueItemWriter<Customer>
!其思想很简单:writer 将数据写入一个 Java java.util.concurrent.BlockingQueue
。BlockingQueue
变量是一个定义在 BatchConfiguration
类中的 static final
变量,名为 CUSTOMERS
。下一个步骤将配置一个 BlockingQueueItemReader<T>
,它将从同一个 java.util.concurrent.BlockingQueue
中读取。超简单,对吗?是的!但这将节省大量时间。
传统上,Spring Batch 应用的上下文只与当前步骤相关联。随着数据流经一个作业,Spring Batch 的 Step
只给你三次处理数据的机会:从 ItemReader<I>
、ItemProcessor<I,O>
和 ItemWriter<O>
。想在数据写入后进行更多处理?那就得等到下一个步骤!你已经将数据写入磁盘或其他持久介质,然后你必须重新读取它。Spring Batch 会跟踪你在读取和写入方面进行到多远,那我们为何要如此小心?为何我们需要如此频繁地将所有内容持久化?
现在情况不再如此了,因为 Spring Batch 支持将给定 Step
的输出写入 BlockingQueue
。值得注意的是,BlockingQueue
实例有一个额外的好处,即支持对写入数据量设置限制。这与分级事件驱动架构(SEDA)的风格配合得很好。SEDA 背后的思想是根据数据流经的不同阶段来定义工作。随着数据从一个阶段移动到另一个阶段,它流入(有界的)队列。这些有界队列提供反压。如果工作被拒绝或在容量超出时简单地写入磁盘,你无法压垮给定阶段的处理器。这称为反压,对于可伸缩性至关重要。
每个阶段仅从一个队列中获取工作。这提供了一种天然的负载均衡方式:启动更多给定阶段处理器的实例,工作就会均匀地分配给它们。你甚至可以结合 Spring Batch 的远程分区和分块范例,将工作分配到集群中,进一步扩展这种架构。
这种架构通常与消息系统相关联——队列通常被假定为消息总线中的队列(或主题);然而,该架构背后的原则在批处理系统中也同样适用。
让我们看看步骤二!
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
class StepTwoConfiguration {
@Bean
Step two(JobRepository repository, PlatformTransactionManager transactionManager,
BlockingQueueItemReader<Customer> blockingQueueItemReader, ItemWriter<Customer> customerItemWriter) {
return new StepBuilder("two", repository)//
.<Customer, Customer>chunk(10, transactionManager)//
.reader(blockingQueueItemReader)//
.writer(customerItemWriter)//
.build();
}
@Bean
BlockingQueueItemReader<Customer> blockingQueueItemReader() {
return new BlockingQueueItemReader<>(BatchConfiguration.CUSTOMERS);
}
@Bean
ItemWriter<Customer> customerItemWriter() {
return chunk -> chunk.forEach(System.out::println);
}
}
在这里,我们定义了另一个 Step
,它从同一个 BlockingQueue
中读取数据,然后简单地将所有内容打印出来。
强大、简单、可扩展的批处理?夫复何求?顺便说一句,请记住,Spring Batch 的大部分工作——输入和输出——都从 Java 21 的虚拟线程中获益巨大,Spring Boot 现在已经支持虚拟线程三个版本了!如果你使用的是 Java 21+(你肯定至少用的是 Java 21,对吗?),别忘了设置 spring.threads.virtual.enabled=true
。