Chapter11 Spring Batch
by ag-lee
CHAPTER11 스프링 배치
스프링 배치
- 스프링 배치는 배치프로세스의 상태를 관리하기 위해 JobRepository를 저장할 저장소를 필요로 한다.
- JobRepository는 이런 메타데이터를 저장하기 위해 스프링 배치가 제공하는 인터페이스로 저장소로 어떤걸 쓸지는 개발자가 정해도 된다.
- Job 단위로 컴포넌트를 포함한 모든 정보와 메타데이터를 총괄한 JobRepository를 중심으로 작동한다.
- 각 Job은 하나 이상의 순차적인 스텝으로 구성된다.
- 스텝은 조건부로 다음 스텝을 진행하거나, 동시성 스텝을 지원할 수 있다.
- Job은 보통 실행 시점에 Job Parameter와 엮어 Job의 런타임 로직을 매개변수화한다.
- Job은 각 작업을 식별하기 위해 JobInstance를 생성한다.
- JobInstance는 Job과 JobParameter로 이루어져 있고 이게 실행되는 것을 JobExecution이라고 한다.
- JobInstance가 실행되다 에러가 나면 JobInstance는 처음부터 다시 시작하고 새로운 JobExecution이 만들어진다.
레시피 11-1 스프링 배치 기초 공사하기
- 스프링 배치 DB를 설정해 스프링 애플리케이션을 구성한다.
JobRepository
SimpleJobRepository
는 JobRepository를 구현한 유일한 클래스이다.- JobRepositoryFactoryBean을 이용해 생성한다.
- 배치 처리 상태를 데이터 저장소에 보관한다.
- JobRepository는 DB를 전제로 작동해 스키마가 미리 준비되어 있어야한다. (스키마는 DB별로 배치에서 제공)
@Bean
public JobRepositoryFactoryBean jobRepository() {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource());
jobRepositoryFactoryBean.setTransactionManager(transactionManager());
return jobRepositoryFactoryBean;
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository().getObject());
return jobLauncher;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor processor = new JobRegistryBeanPostProcessor();
processor.setJobRegistry(jobRegistry());
return processor;
}
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
-
jobRegistry()
: 특정 Job에 대한 정보를 담고 있는 중앙 저장소이자, 시스템 내부의 전체 Job을 관장한다. -
SimpleJobLauncher
: 배치 job을 시동하는 매커니즘을 건네주는 역할을 한다.jobLauncher
로 배치 솔루션과 필요한 매개변수를 지정한다.
-
JobRegistryBeanPostProcessor
: 스프링 컨텍스트 파일을 스캐닝해 런치된 job을 발견하면 등록한 jobRegistry() 에 엮는 Bean 후처리기이다.
SimpleJobRepository
-
JobRepository
는 Repository를 구현한 객체로 Job과 스텝을 아울러 도메인 모델에 대한 조회/저장 작업을 처리한다. -
위에서 한 설정을 설정 클래스에
@EnableBatchProcessing
을 붙여 기본 값을 바로 구성하는 방법이 있다.
##레시피 11-2 데이터 읽기/쓰기
- Csv 파일에서 데이터를 읽어 DB에 입력하는 배치 프로그램을 만든다.
- 스프링 배치는 XML 스키마를 이용해 솔루션 모델을 정의한다.
job 구성하기
@Configuration
public class UserJob {
private static final String INSERT_REGISTRATION_QUERY =
"INSERT INTO into user_registration (first_name, last_name, company," +
"address, city, state, zip, county, phone_number, fax)" +
" values " +
"(:firstName, :lastName, :company, :address, :city, :state, :zip, :county,"
+":url, :phoneNumber, :fax)";
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private DataSource dataSource;
@Value('file:${user.home}/batchs/registrations.csv')
private Resource input;
@Bean
public Job insertIntoDbFromCsvJob() {
return jobs.get("User Registration Import Job")
.start(step1())
.build();
}
@Bean
public Step step1() {
return steps.get("User Registration CSV To DB Step")
.<UserRegistration, UserRegistration>chunk(5)
.reader(csvFileReader())
.writer(jdbcItemWriter())
.build();
}
@Bean
public FlatFileItemReader<UserRegistration> csvFileReader() {
FlatFileItemReader<UserRegistration> itemReader = new UserRegistration<>();
itemReader.setLineMapper(lineMapper());
itemReader.setResource(input);
return itemReader;
}
@Bean
public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
JdbcBatchItemWriter<UserRegistration> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql(INSERT_REGISTRATION_QUERY);
itemWriter.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>());
return itemWriter;
}
@Bean
public DefaultLineMapper<UserRegistration> lineMapper() {
DefaultLineMapper<UserRegistration> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer());
lineMapper.setFieldSetMapper(fieldSetMapper());
return lineMapper;
}
}
- Job은 여러 스텝으로 구성되며 각 스텝은 주어진 Job을 묵묵히 수행한다.
- 스텝은 잡을 수행하는 가장 작은 단위이다.
- 처리로직은
Tasket
으로 기술하는데, 직접 구현해도 되고 여러 처리 시나리오에 맞게 짜여진 것 중 하나를 골라써도 된다.- 청크 지향 처리를 할 경우에는
chunk()
라는 구성메소드를 사용한다. - 청크 지향 처리는 입력기가 입력을 읽고 부가적인 처리를 한 후 종합해 commit-interval 속성을 이용해 처리 주기를 설정해 트랜잭션을 커밋할 때 몇 개의 아이템을 보낼 지 정한다.
- 이미 가동중인 트랜잭션 관리자가 있으면 함께 커밋하고, 커밋 직접에 DB 메타데이터를 수정해 해당 잡을 완료한 사실을 알린다.
- 청크 지향 처리를 할 경우에는
입력
- Csv 파일 읽기
- 스프링 배치가 제공하는
FlatFileItemReader<T>
클래스는 파일의 필드와 값을 구분하는 작업을LineMapper<T>
에게 맡기고 LineMapper는 다시LineTokenizer
에게 작업을 맡겨 필드를 식별한다. - ItemReader는 파일을 읽어
UserRegistration
자바 빈으로 반환해준다.
- 스프링 배치가 제공하는
@Bean
public FlatFileItemReader<UserRegistration> csvFileReader() {
FlatFileItemReader<UserRegistration> = new FlatFileItemReader<>();
itemReader.setLineMapper(lineMapper());
itemReader.setResource(input);
return itemReader;
}
// 빈 등록 생략 코드 보고싶으면 p576
@Bean
public DelimetedLineTokenizer tokenizer() {
DelimetedLineTokenzier tokenizer = new DelimetedLineTokenzier();
tokenizer.setDelimiter(",");
tokenizer.setNames(
new String[] {"firstName", "lastName", "company", "address", "city", "state", "zip", "county", "url", "phoneNumber", "fax"});
return tokenizer;
}
출력
- 출력기는 입력기가 읽은 아이템 컬렉션을 한데 모아 처리하는 작업을 담당한다.
- 여기서는
JdbcBatchItemWriter
를 이용해 데이터를 입력받아 DB에 넣어준다.- 개발자가 Sql을 입력해주면 commit-interval 에 따라 일정 주기로 DB에서 데이터를 읽어 sql 프로퍼티에 설정한 SQL을 실행한 뒤 전체 트랜잭션을 커밋한다.
@Bean
public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
JdbcBatchItemWriter<UserRegistration> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql(INSERT_REGISTRATION_QUERY);
itemWriter.setItemSqlParameterSourceProvider<>();
return itemWriter;
}
ItemReader/ItemWriter를 간단하게 구성하기
- Spring4부터는 Builder를 제공하기 때문에 빈마다 일일이 구성할 필요 없이 사용이 가능하다.. (앞에서말해ㅠ)
@Bean
public FlatFileItemReader<UserRegistration> csvFileReader() throws Exception {
return new FlatFileItemReaderBuilder<UserRegistration>()
.name(ClassUtils.getShortName(FlatFileItemReader.class))
.resource(input)
.targetType(UserRegistration.class)
.delimited()
.names(new String[] { /* 배열 */ })
.build();
}
레시피 11-3 커스텀 ItemWriter/ItemReader 작성하기
- 커스텀하게 쓰면된다.
레시피 11-4 출력하기 전에 입력 데이터 처리하기
- 스프링 배치는 입력기가 읽은 데이터에 커스텀 로직을 적용할 수 있도록 지원한다.
chunk
엘리먼트의 processor 속성에ItemProcessor
형 빈을 설정하면 된다.
@Bean
public Step step1() {
return steps.get("User Registration CSV To DB Step")
.<UserRegistration, UserRegistration>chunk(5)
.reader(csvFileReader())
.processor(userRegistrationValidationItemProcessor())
.writer(jdbcItemWriter())
.build();
}
- DB에 출력하기 전 데이터의 유효성을 검증하는 예제로 올바르지 않은 레코드가 발견되면 ItemProcesso<I,O>는 null을 반환하고 작업을 중단한다.
- 입력은
process()
메소드에 입력되는 매개변수이고, 출력은 반환되는 값이다.
- 입력은
public class UserRegistrationValidationItemProcessor implements ItemProcessor<UserRegistration, UserRegistration> {
private String stripNonNumber(String input) {/* */}
private boolean isTelephoneValid(String input) {/* */}
private boolean isZipCodeValid(String input) {/* */}
private boolean isValidState(String input) {/* */}
@Override
public UserRegistration process(UserRegistration input) throw Exception {
String zipCode = stripNonNumbers(input.getZip());
String telephone = stripNonNUmbers(input.getTelephone());
String state = StringUtils.defaultString(input.getState());
if(isTelephoneValid(telephone) && isZipCodeValid(zipCode) &&
isValidState(state)) {
input.setZip(zipCode);
input.setTelephone(telephone);
return input;
}
return null;
}
}
-
처리를 마치면 스프링 배치 메타데이터 테이블에 여러가지 정보가 쌓인다.
- Job의 종료 상태, 커밋 횟수, 읽은 아이템 개수, 걸러진 아이템 개수를 확인할 수 있다.
filter_count
: 걸러진 아이템 개수를 의미한다.read_count
: 읽은 아이템 개수write_count
: DB에 쓰여진 아이템 개수
select * from BATCH_STEP_EXCUTION;
- Job의 종료 상태, 커밋 횟수, 읽은 아이템 개수, 걸러진 아이템 개수를 확인할 수 있다.
처리기를 서로 연결하기
- 하나 이상의 처리기를 사용할 때
CompositeItemProcessor<I,O>
는 한 필터의 출력을 그 다음 필터의 입력으로 전달하는 클래스이다.- 하나의 책임을 지닌 `ItemProcessor<I,O> 를 여러 개 작성하고 필요한만큼 연결해 쓴다.
@Bean
public CompositeItemProcessor<Customer, Customer> compositeBankCustomerProcessor() {
List<ItemProcessor<Customer, Customer>> delegates =
Arrays.asList(creditScoreValidationProcessor(), bbProcessor(), cProcessor());
CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
processor.setDelegates(delegates);
return processor;
}
레시피 11-5 트랜잭션
- 읽기/쓰기를 하면서 예외상황 발생에 대응하기 위해 __트랜젝션__을 관리한다.
- 스텝에 트랜젝션을 적용한 다음 다음 스텝에 재시도 로직을 건다.
트랜젝션
- 스프링이 제공하는
PlatformTransactionManager
을 연결하고 스프링 배치가 참조할 수 있도록 구성한다.- 기본적으로
JdbcItemWriter
를 구성할 때 스프링 배치가transactionManager
라는PlatformTransactionManager
를 컨텍스트에서 얻어 사용하기 때문에 명시할 필요는 없다. - 하지만, 분명하게 명시하고 싶은 경우 아래처럼 설정해준다.
- 기본적으로
@Bean
public Step step1() {
return steps.get("User Registration CSV To DB Step")
.<UserRegistration, UserRegistration>chunk(5)
.reader(csvFileReader())
.processor(userRegistrationValidationItemProcessor())
.writer(jdbcItemWriter())
.transactionManager(new DataSourceTransactionManager());
.build();
}
ItemReader<T>
가 읽은 아이템은 보통 한곳에 모아두고ItemWriter<T>
커밋이 실패하면 이렇게 모아둔 아이템들을 원상태 그대로 다시 전송한다.- 일반적으로는 이런 처리 방식이 효율적이지만 트랜젝션이 적용된 리소스에서 아이템을 읽을 때는 문제가 발생할 수 있다.
- 메시지 큐에서 읽은 아이템은 트랜젝션이 실패하면 롤백되어야 하기 때문이다.
- 이런 경우 아래와 같이 reader가 트랜젝션을 수행하는 큐임을 명시해준다.
@Bean
public Step step1() {
return steps.get("User Registration CSV To DB Step")
.<UserRegistration, UserRegistration>chunk(5)
.reader(csvFileReader()).readerIsTransactionalQueue()
.processor(userRegistrationValidationItemProcessor())
.writer(jdbcItemWriter())
.transactionManager(new DataSourceTransactionManager());
.build();
}
롤백
- 트랜젝션을 수동으로 롤백시켜야 하는 경우에는 자바 구성을 통해 수동으로 롤백시켜야 하는 경우가 있다.
faultToleant()
로 오류가 허용된 스텝을 얻은 후,noRollback
으로 오류가 발생하지 않아야 하는 오류를 지정한다.
@Bean
public Step step1() {
return steps.get("step1")
.<UserRegistration, UserRegistration>chunk(10)
.faultTolearnt()
.noRollback(YourException.class)
.reader(csvFileReader())
.processor(userRegistrationValidationItemProcessor())
.writer(jdbcItemWriter())
.transactionManager(new DataSourceTransactionManager());
.build();
}
레시피 11-6 재시도
- 다시 시도해서 성공할 가능성이 있는 부분을 재시도하는 로직을 작성한다.
스텝 구성하기
@Bean
public Step step1() {
return steps.get("step1")
.<UserRegistration, UserRegistration>chunk(10)
.faultTolearnt()
.retryLimit(3).retry(DeadlockLoserDataAccessException.class)
.reader(csvFileReader())
.processor(userRegistrationValidationItemProcessor())
.writer(jdbcItemWriter())
.transactionManager(new DataSourceTransactionManager());
.build();
}
- 위에서 나온
faultToleant()
로 오류가 나더라도 멈추지 않도록 설정한다. retryLimit()
: 재시도 횟수를 정해준다.retry()
: 재시도의 대상 예외를 지정해준다. 예외가 여러 개인 경우 여러번 호출한다.
재시도 템플릿
- 스프링 배치가 제공하는 재시도 및 복구 서비스를 코드에 활용하는 방법
- 재시도 로직이 구현된 커스텀
ItemWriter
를 작성 - 전체 서비스 인터페이스에 재시도 기능을 추가
- 재시도 로직이 구현된 커스텀
스프링 배치 RetryTemplate
- 비즈니스 로직과 재시도 로직을 분리해 재시도 없이 한번만 시도하는 것처럼 코드를 작성할 수 있다.
- 재시도 -> 실패 -> 복구의 반복적인 과정을 하나의 API 호출로 감싼다.
#####커스텀 ItemWriter에 RetryTemplate
를 추가하기
public class RetryableUserRegistrationServiceWriter implments ItemWriter<UserRegistration> {
// 생략
private final RetryTemplate retryTemplate;
// 생략
@Override
public void write(List<? extends UserRegistration> items) throws Exception {
for (final UserRegistration userRegistration : items) {
UserRegistration registeredUserRegistration = retryTemeplate.excute(
(RetryCallback<UserRegistration, Exception>) context ->
userRegistrationService.registerUser(userRegistration));
)
}
}
}
BackOffPolicy
- RetryTemlate의 재시도 시간 간격을 지정하는 기능
- 같은 호출을 할 때 스텝의 시간 간격이 잠기지 않도록 예방하는 수단
AOP 기반의 재시도
-
스프링 배치가 제공하는 AOP 어드바이저를 이용해
RetryTemplate
처럼 재시도를 해도 성공을 보장할 수 없는 메소드 호출을 감싸서 재시도를 처리할 수 있다. -
userRegistartionService
에 재시도 로직을 어드바이스로 추가해RetryTemplate
이 빠진 코드를 되돌릴 수 있다.- 메서드에
@Retryable
을 붙여 재시도 로직을 추가한다. - 배치 설정 클래스에
@EnableRetry
를 붙여서 기능을 켠다.
- 메서드에
@Retryable(backoff = @BackOff(delay=1000, maxDelay=10000, muliplier=2))
public UserRegistration registerUser(UserRegistration userRegistration) {
...
}
레시피 11-7 스텝 실행 제어하기
- 하나의 job은 여러개의 스텝을 거느릴 수 있다.
- 각 스텝은 서로 떨어진 상태로 다음 스텝을 결정하는데 영향을 미친다.
- step은 순차스텝, 동시성 스텝, 조건 스텝 등으로 조절할 수 있다.
순차스텝
- 스텝 간에 암시적인 선후 관계를 가지고 있는 스텝이다.
- 스텝마다 자신의 컨텍스트 내에서 실행되며 오직 부모 Job 실행 컨텍스트와 순서를 공유한다.
@Bean
public Job nightlyRegistrationsJob() {
return jobs.get("nightlyRegistrationJob")
.start(loadRegistrations())
.next(reportStastics())
.next(...)
.build();
}
동시성 스텝
- 스프링 배치는 처리 과정을 분기시켜 여러 갈래로 동시에 실행시키는 기능을 제공한다.
@Bean
public Job nightlyRegistrationsJob() {
return jobs.get("nightlyRegistrationJob")
.start(loadRegistrations())
.split(taskExecutor().
.add(
builder.flow(reportStastics()),
builder.flow(sendJmsNotifications())))
.build();
}
상태에 따른 조건부 스텝
- Job 또는 Step의 ExitStatus에 따라 다음 스텝을 결정하는 조건 흐름이다.
BatchStatus
: 배치는 enum형 프로퍼티로 COMPLETED, STARTING, STARTED, STOPPING, FAILED, ABANDONED, UNKNOWN 중 하나의 상태값을 가지고 있다.end()
나fail()
같은 메소드로 처리해줄 수도 있다.
@Bean
public Job nightlyRegistrationsJob() {
return jobs.get("nightlyRegistrationJob")
.start(loadRegistrations())
.on("COMPLETED").to(step2())
.on("FAILED").fail()
.on("*").to(failureStep())
.build();
}
결정에 따른 조건부 스텝
- 스프링 배치의
decision
엘리먼트와JobExecutionDecider
로 구현체를 사용해 복잡한 로직을 사용한다.
public class HoroscopeDecider implements JobExecutionDecider {
@Override
public FlowExcutionStatus decide(JobExecution jobExecution
SetpExcution stepExecution) {
if(isMercuryInRetrograde()) {
return new FlowExecutionStatus("MERCURY_IN_RETROGRADE");
}
return new FlowExecutionStatus.COMPLETED;
}
}
@Bean
public Job job() {
return jobs.get("nightlyRegistrationJob")
.start(loadRegistrations())
.next(horoscopeDecider())
.on("MERCURY_IN_RETROGRADE").to(step2())
.on("COMPLETED").to(step3())
.build();
}
레시피 11-8 잡 실행하기
- 배치솔루션 배포, 실행하는 방법
웹 애플리케이션에서 실행하기
- 웹 애플리케이션에서 Job을 실행하면 클라이언트 스레드는 배치잡이 끝날 때까지 기다릴 여유가 없으므로 방식을 바꿔야한다.
- 스프링
TaskExecutor
를 이용해 웹 레이어의 컨트롤러나 액션에서 잡을 띄울때 클라이언트 스레드와 상관없이 비동기로 실행한다.
@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
public class BatchConfiguration {
// 실행 스레드를 만들어 블로킹 없이 관리하는 빈
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
BatchConfigurer
를 직접 구현해서 TaskExecutor를 구성하고 JobLauncher에 추가gksek.
@Component
public class CustomBatchConfigurer extends DefaultBatchConfigurer {
private final TaskExecutor taskExecutor;
public CustomBatchConfigurer(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLuancher = new SimpleJobLauncher();
jobLuancher.setJobRepository(gerJobRepository());
jobLuancher.setTaskExecutor(this.taskExecutor);
jobLuancher.afterPropertiesSet();
return jobLuancher;
}
}
명령줄에서 실행하기
- 윈도우 이벤트 같은 스케쥴러 시스템 스케줄러를 이용해 배치 프로세스를 배포한다.
java CommandLineJobRunner jobs.xml hourlyReport date = 'date +%m/%d/$Y time=date + %H'
스케줄링하여 실행하기
- 기본 애플리케이션 컨텍스트에
@EnableScheduling
을 붙이고ThreadPoolTaskScheduler
빈을 추가해서 스케줄링 기능을 활성화한다.
@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
@EnableScheduling
@EnableAsync
public class BatchConfiguration {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setThreadGroupName("batch-scheduler");
taskScheduler.setPoolSzie(10);
return taskScheduler;
}
}
레시피 11-9 Job을 매개변수화하기
JobParameters
를 이용해 잡을 어떻게 실행하는 지 살펴보고 잡과 구성 클래스에서 JobParameters를 가져온다.