闪亮的 Spring Boot 3.4: Spring Batch

工程 | Josh Long | 2024年11月24日 | ...

新发布的 Spring Batch 5.2 带来了大量新特性!Spring Batch 是一种处理大量但有限的顺序数据访问的有力方式。想想:从 SQL 数据库读取并写入 CSV,或者从 FTP 服务器读取并对 MongoDB 进行分析——这就是批处理。你知道这是什么。这项工作的一半(抱歉双关语!)是将各种数据源和多个数据汇集点集成起来。另一个方面,正如你可以想象到的,对于耗时很长且可能失败的工作负载,是维护与每个批处理作业运行相关的持久且详细的元数据。同样,我无法深入详细介绍此版本中的所有新颖特性!因此,让我们从高层面看看其中一些特性。

  • 我们从一个 Job Repository 实现增加到三个——数数看:三个!最近的 Spring Batch 只支持基于 JDBC 的 JobRepository。在不久前,它有两个 JobRepository 实现:一个支持 JDBC,另一个通过内存中的 Map 支持“持久化”。Map 选项对于测试或那些结果的持久性不如纯性能重要的工作负载非常有用。我们移除了 Map 实现,建议人们使用像 H2 这样的内存中 SQL 数据库配合 JDBC JobRepository。有些人需要纯粹的性能,H2 选项不够好。在这个版本中,我们引入了一个“无资源”的 JobRepository,它不保存任何状态,甚至不在内存中。我们还添加了一个基于 JDBC 的 JobRepository 的持久替代方案,即基于 MongoDB 的 JobRepository 实现。
  • 使用 JPA ItemReader 时,新增了为 Spring Data JPA 查询注册提示的支持。
  • 使用基于 JDBC 的 ItemReader 时,新增了对数据类(Kotlin data class 或 Java record 实例)的支持。
  • 支持适配更多函数类型,不仅仅是 Function<I,O>,以适配 ItemReaderItemWriterItemProcessor 类型。
  • 使用阻塞队列 Item Reader 和 Writer 实现并发步骤
  • 一个 CompositeItemReader<T>,可以顺序地从多个委托的 ItemReader<T> 中读取数据。
  • 作业注册的简化
  • RecursiveCollectionLineAggregator 中支持可配置的行分隔符

CompositeItemReader<T>

让我们来看看我最喜欢的两个新特性:CompositeItemReader<T> 和 SEDA 友好的 BlockingQueueItemWriter 以及 BlockingQueueItemReader 实现。

这是此 Spring Batch 应用中唯一的 Job 的定义

package com.example.bootiful_34.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Configuration
class BatchConfiguration {

	static final BlockingQueue<Customer> CUSTOMERS = new LinkedBlockingQueue<>();

	@Bean
	Job job(JobRepository repository, Step one, Step two) {
		return new JobBuilder("job", repository)//
			.incrementer(new RunIdIncrementer()) //
			.start(one)//
			.next(two)//
			.build();
	}

}

这是一个简单的作业,包含两个 Step 实例,一个流向另一个。快速提醒:在 Spring Batch 中,Step 是一个工作单元。它描述了四件事:

  • 多少数据构成一个“批次”工作?(在 Spring Batch 术语中这称为“chunk”)
  • 数据的来源(由一个 ItemReader<T> 实例表示)
  • 数据的写入目标(由一个 ItemWriter<T> 实例表示)
  • 一个处理来自源并将数据发送到目标的处理器。

每个 Step 使用 ItemReader<I> 读取一个 chunk 的数据量,将一个类似集合的东西(称为 Chunk)传递给 ItemProcessor<I,O> 进行任意操作,然后将 ItemProcessor<I,O> 的输出发送给 ItemWriter<O>IO 可以代表相同的泛型类型或不同的类型。然后,循环继续进行,直到 ItemReader 中的所有数据都被读取完毕。该步骤被视为完成,执行将转移到下一个步骤。

在这个示例应用中,我们将从 customer 表读取数据,读取 idnameoslanguage 记录。我们将从一个 .csv 文件读取类似的数据。我们将使用方便的新特性 CompositeItemReader<Customer> 来快速完成这项工作,从而避免进行单独的规范化步骤。

package com.example.bootiful_34.batch;

import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.List;

@Configuration
@ImportRuntimeHints(StepOneConfiguration.CustomersCsvRuntimeHintsRegistrar.class)
class StepOneConfiguration {

	private static final Resource CSV = new ClassPathResource("/customers.csv");

	@Bean
	FlatFileItemReader<Customer> customerCsvItemReader() {
		return new FlatFileItemReaderBuilder<Customer>()//
			.resource(CSV)
			.delimited()
			.names("id", "name", "language", "os")
			.name("customerCsvItemReader")
			.fieldSetMapper(fs -> new Customer(fs.readInt(0), fs.readString(1), fs.readString(2), fs.readString(3)))
			.build();
	}

	@Bean
	JdbcCursorItemReader<Customer> customerJdbcItemReader(DataSource dataSource) {
		return new JdbcCursorItemReaderBuilder<Customer>()//
			.name("customerJdbcItemReader")//
			.dataSource(dataSource)//
			.sql("select id, name, language, os from customer")//
			.rowMapper((rs, rowNum) -> new Customer(rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))//
			.build();
	}

	@Bean
	CompositeItemReader<Customer> customerCompositeItemReader(JdbcCursorItemReader<Customer> customerJdbcItemReader,
			FlatFileItemReader<Customer> customerCsvItemReader) {
		return new CompositeItemReader<>(List.of(customerJdbcItemReader, customerCsvItemReader));
	}

	@Bean
	BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter() {
		return new BlockingQueueItemWriter<>(BatchConfiguration.CUSTOMERS);
	}

	@Bean
	Step one(JobRepository repository, PlatformTransactionManager txm,
			CompositeItemReader<Customer> customerCompositeItemReader,
			BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter) {
		return new StepBuilder("one", repository)//
			.<Customer, Customer>chunk(10, txm)//
			.reader(customerCompositeItemReader)//
			.writer(customerBlockingQueueItemWriter)//
			.build();
	}

	static class CustomersCsvRuntimeHintsRegistrar implements RuntimeHintsRegistrar {

		@Override
		public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
			hints.resources().registerResource(CSV);
		}

	}

}

在这个示例中,我们有三个 ItemReader bean,但步骤只消费一个 CompositeItemReader<T> bean。它将依次读取来自 FlatFileItemReader<Customer>JdbcCursorItemReader<Customer> bean 的任何数据。

在这个示例中,我们没有配置 ItemProcessor<Customer,Customer>

分级事件驱动架构(SEDA)和批处理?是的!

对于 ItemWriter<Customer>,我们使用了框架中另一个新颖的补充:BlockingQueueItemWriter<Customer>!其思想很简单:writer 将数据写入一个 Java java.util.concurrent.BlockingQueueBlockingQueue 变量是一个定义在 BatchConfiguration 类中的 static final 变量,名为 CUSTOMERS。下一个步骤将配置一个 BlockingQueueItemReader<T>,它将从同一个 java.util.concurrent.BlockingQueue读取。超简单,对吗?是的!但这将节省大量时间。

传统上,Spring Batch 应用的上下文只与当前步骤相关联。随着数据流经一个作业,Spring Batch 的 Step 只给你三次处理数据的机会:从 ItemReader<I>ItemProcessor<I,O>ItemWriter<O>。想在数据写入后进行更多处理?那就得等到下一个步骤!你已经将数据写入磁盘或其他持久介质,然后你必须重新读取它。Spring Batch 会跟踪你在读取和写入方面进行到多远,那我们为何要如此小心?为何我们需要如此频繁地将所有内容持久化?

现在情况不再如此了,因为 Spring Batch 支持将给定 Step 的输出写入 BlockingQueue。值得注意的是,BlockingQueue 实例有一个额外的好处,即支持对写入数据量设置限制。这与分级事件驱动架构(SEDA)的风格配合得很好。SEDA 背后的思想是根据数据流经的不同阶段来定义工作。随着数据从一个阶段移动到另一个阶段,它流入(有界的)队列。这些有界队列提供反压。如果工作被拒绝或在容量超出时简单地写入磁盘,你无法压垮给定阶段的处理器。这称为反压,对于可伸缩性至关重要。

每个阶段仅从一个队列中获取工作。这提供了一种天然的负载均衡方式:启动更多给定阶段处理器的实例,工作就会均匀地分配给它们。你甚至可以结合 Spring Batch 的远程分区和分块范例,将工作分配到集群中,进一步扩展这种架构。

这种架构通常与消息系统相关联——队列通常被假定为消息总线中的队列(或主题);然而,该架构背后的原则在批处理系统中也同样适用。

让我们看看步骤二!

package com.example.bootiful_34.batch;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
class StepTwoConfiguration {

	@Bean
	Step two(JobRepository repository, PlatformTransactionManager transactionManager,
			BlockingQueueItemReader<Customer> blockingQueueItemReader, ItemWriter<Customer> customerItemWriter) {
		return new StepBuilder("two", repository)//
			.<Customer, Customer>chunk(10, transactionManager)//
			.reader(blockingQueueItemReader)//
			.writer(customerItemWriter)//
			.build();
	}

	@Bean
	BlockingQueueItemReader<Customer> blockingQueueItemReader() {
		return new BlockingQueueItemReader<>(BatchConfiguration.CUSTOMERS);
	}

	@Bean
	ItemWriter<Customer> customerItemWriter() {
		return chunk -> chunk.forEach(System.out::println);
	}

}

在这里,我们定义了另一个 Step,它从同一个 BlockingQueue 中读取数据,然后简单地将所有内容打印出来。

强大、简单、可扩展的批处理?夫复何求?顺便说一句,请记住,Spring Batch 的大部分工作——输入和输出——都从 Java 21 的虚拟线程中获益巨大,Spring Boot 现在已经支持虚拟线程三个版本了!如果你使用的是 Java 21+(你肯定至少用的是 Java 21,对吗?),别忘了设置 spring.threads.virtual.enabled=true

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

保持领先

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 通过一份简单订阅,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看所有