CHAPTER11 스프링 배치

스프링 배치

  • 스프링 배치는 배치프로세스의 상태를 관리하기 위해 JobRepository를 저장할 저장소를 필요로 한다.
  • JobRepository는 이런 메타데이터를 저장하기 위해 스프링 배치가 제공하는 인터페이스로 저장소로 어떤걸 쓸지는 개발자가 정해도 된다.
  • Job 단위로 컴포넌트를 포함한 모든 정보와 메타데이터를 총괄한 JobRepository를 중심으로 작동한다.
  • 각 Job은 하나 이상의 순차적인 스텝으로 구성된다.
  • 스텝은 조건부로 다음 스텝을 진행하거나, 동시성 스텝을 지원할 수 있다.
  • Job은 보통 실행 시점에 Job Parameter와 엮어 Job의 런타임 로직을 매개변수화한다.
    • Job은 각 작업을 식별하기 위해 JobInstance를 생성한다.
    • JobInstance는 Job과 JobParameter로 이루어져 있고 이게 실행되는 것을 JobExecution이라고 한다.
    • JobInstance가 실행되다 에러가 나면 JobInstance는 처음부터 다시 시작하고 새로운 JobExecution이 만들어진다.

레시피 11-1 스프링 배치 기초 공사하기

  • 스프링 배치 DB를 설정해 스프링 애플리케이션을 구성한다.

JobRepository

  • SimpleJobRepository 는 JobRepository를 구현한 유일한 클래스이다.
    • JobRepositoryFactoryBean을 이용해 생성한다.
    • 배치 처리 상태를 데이터 저장소에 보관한다.
  • JobRepository는 DB를 전제로 작동해 스키마가 미리 준비되어 있어야한다. (스키마는 DB별로 배치에서 제공)
@Bean
public JobRepositoryFactoryBean jobRepository() {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource());
    jobRepositoryFactoryBean.setTransactionManager(transactionManager());
    return jobRepositoryFactoryBean;
}

@Bean
public JobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository().getObject());
    return jobLauncher;
}

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor processor = new JobRegistryBeanPostProcessor();
    processor.setJobRegistry(jobRegistry());
    return processor;
}

@Bean
public JobRegistry jobRegistry() {
    return new MapJobRegistry();
}
  • jobRegistry() : 특정 Job에 대한 정보를 담고 있는 중앙 저장소이자, 시스템 내부의 전체 Job을 관장한다.

  • SimpleJobLauncher : 배치 job을 시동하는 매커니즘을 건네주는 역할을 한다.

    • jobLauncher 로 배치 솔루션과 필요한 매개변수를 지정한다.
  • JobRegistryBeanPostProcessor : 스프링 컨텍스트 파일을 스캐닝해 런치된 job을 발견하면 등록한 jobRegistry() 에 엮는 Bean 후처리기이다.

SimpleJobRepository

  • JobRepository는 Repository를 구현한 객체로 Job과 스텝을 아울러 도메인 모델에 대한 조회/저장 작업을 처리한다.

  • 위에서 한 설정을 설정 클래스에 @EnableBatchProcessing 을 붙여 기본 값을 바로 구성하는 방법이 있다.

##레시피 11-2 데이터 읽기/쓰기

  • Csv 파일에서 데이터를 읽어 DB에 입력하는 배치 프로그램을 만든다.
  • 스프링 배치는 XML 스키마를 이용해 솔루션 모델을 정의한다.

job 구성하기

@Configuration
public class UserJob {
    private static final String INSERT_REGISTRATION_QUERY =
        "INSERT INTO into user_registration (first_name, last_name, company," +
        "address, city, state, zip, county, phone_number, fax)" +
        " values " + 
        "(:firstName, :lastName, :company, :address, :city, :state, :zip, :county,"
        +":url, :phoneNumber, :fax)";
    
    @Autowired
    private JobBuilderFactory jobs;
    
    @Autowired
    private StepBuilderFactory steps;
    
    @Autowired
    private DataSource dataSource;
    
    @Value('file:${user.home}/batchs/registrations.csv')
    private Resource input;
    
    @Bean
    public Job insertIntoDbFromCsvJob() {
        return jobs.get("User Registration Import Job")
            		.start(step1())
            		.build();
    }
    
    @Bean
    public Step step1() {
		return steps.get("User Registration CSV To DB Step")
            	.<UserRegistration, UserRegistration>chunk(5)
            	.reader(csvFileReader())
            	.writer(jdbcItemWriter())
            	.build();
    }
    
    @Bean
    public FlatFileItemReader<UserRegistration> csvFileReader() {
        FlatFileItemReader<UserRegistration> itemReader = new UserRegistration<>();
        itemReader.setLineMapper(lineMapper());
        itemReader.setResource(input);
        return itemReader;
    }
    
    @Bean
    public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
        JdbcBatchItemWriter<UserRegistration> itemWriter = new JdbcBatchItemWriter<>();
        itemWriter.setDataSource(dataSource);
        itemWriter.setSql(INSERT_REGISTRATION_QUERY);
        itemWriter.setItemSqlParameterSourceProvider(
        	new BeanPropertyItemSqlParameterSourceProvider<>());
        return itemWriter;
    }
    
    @Bean
    public DefaultLineMapper<UserRegistration> lineMapper() {
        DefaultLineMapper<UserRegistration> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer());
        lineMapper.setFieldSetMapper(fieldSetMapper());
        return lineMapper;
    }
}
  • Job은 여러 스텝으로 구성되며 각 스텝은 주어진 Job을 묵묵히 수행한다.
    • 스텝은 잡을 수행하는 가장 작은 단위이다.
  • 처리로직은 Tasket 으로 기술하는데, 직접 구현해도 되고 여러 처리 시나리오에 맞게 짜여진 것 중 하나를 골라써도 된다.
    • 청크 지향 처리를 할 경우에는 chunk()라는 구성메소드를 사용한다.
    • 청크 지향 처리는 입력기가 입력을 읽고 부가적인 처리를 한 후 종합해 commit-interval 속성을 이용해 처리 주기를 설정해 트랜잭션을 커밋할 때 몇 개의 아이템을 보낼 지 정한다.
    • 이미 가동중인 트랜잭션 관리자가 있으면 함께 커밋하고, 커밋 직접에 DB 메타데이터를 수정해 해당 잡을 완료한 사실을 알린다.

입력

  • Csv 파일 읽기
    • 스프링 배치가 제공하는 FlatFileItemReader<T> 클래스는 파일의 필드와 값을 구분하는 작업을 LineMapper<T>에게 맡기고 LineMapper는 다시 LineTokenizer 에게 작업을 맡겨 필드를 식별한다.
    • ItemReader는 파일을 읽어 UserRegistration 자바 빈으로 반환해준다.
@Bean
public FlatFileItemReader<UserRegistration> csvFileReader() {
    FlatFileItemReader<UserRegistration> = new FlatFileItemReader<>();
    itemReader.setLineMapper(lineMapper());
    itemReader.setResource(input);
    return itemReader;
}

// 빈 등록 생략 코드 보고싶으면 p576

@Bean
public DelimetedLineTokenizer tokenizer() {
    DelimetedLineTokenzier tokenizer = new DelimetedLineTokenzier();
    tokenizer.setDelimiter(",");
    tokenizer.setNames(
    	new String[] {"firstName", "lastName", "company", "address", "city", "state", "zip", "county", "url", "phoneNumber", "fax"});
    return tokenizer;
}

출력

  • 출력기는 입력기가 읽은 아이템 컬렉션을 한데 모아 처리하는 작업을 담당한다.
  • 여기서는 JdbcBatchItemWriter 를 이용해 데이터를 입력받아 DB에 넣어준다.
    • 개발자가 Sql을 입력해주면 commit-interval 에 따라 일정 주기로 DB에서 데이터를 읽어 sql 프로퍼티에 설정한 SQL을 실행한 뒤 전체 트랜잭션을 커밋한다.
@Bean
public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
    JdbcBatchItemWriter<UserRegistration> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setDataSource(dataSource);
    itemWriter.setSql(INSERT_REGISTRATION_QUERY);
    itemWriter.setItemSqlParameterSourceProvider<>();
    return itemWriter;
}

ItemReader/ItemWriter를 간단하게 구성하기

  • Spring4부터는 Builder를 제공하기 때문에 빈마다 일일이 구성할 필요 없이 사용이 가능하다.. (앞에서말해ㅠ)
@Bean
public FlatFileItemReader<UserRegistration> csvFileReader() throws Exception {
    return new FlatFileItemReaderBuilder<UserRegistration>()
        			.name(ClassUtils.getShortName(FlatFileItemReader.class))
        			.resource(input)
       				.targetType(UserRegistration.class)
        			.delimited()
        			.names(new String[] { /* 배열 */ })
        			.build();
}

레시피 11-3 커스텀 ItemWriter/ItemReader 작성하기

  • 커스텀하게 쓰면된다.

레시피 11-4 출력하기 전에 입력 데이터 처리하기

  • 스프링 배치는 입력기가 읽은 데이터에 커스텀 로직을 적용할 수 있도록 지원한다.
  • chunk 엘리먼트의 processor 속성에 ItemProcessor 형 빈을 설정하면 된다.
@Bean
public Step step1() {
    return steps.get("User Registration CSV To DB Step")
        		.<UserRegistration, UserRegistration>chunk(5)
        		.reader(csvFileReader())
        		.processor(userRegistrationValidationItemProcessor())
        		.writer(jdbcItemWriter())
        		.build();
}
  • DB에 출력하기 전 데이터의 유효성을 검증하는 예제로 올바르지 않은 레코드가 발견되면 ItemProcesso<I,O>는 null을 반환하고 작업을 중단한다.
    • 입력은 process() 메소드에 입력되는 매개변수이고, 출력은 반환되는 값이다.
public class UserRegistrationValidationItemProcessor implements ItemProcessor<UserRegistration, UserRegistration> {
    private String stripNonNumber(String input) {/* */}
    private boolean isTelephoneValid(String input) {/* */}
    private boolean isZipCodeValid(String input) {/* */}
    private boolean isValidState(String input) {/* */}
    
    @Override
    public UserRegistration process(UserRegistration input) throw Exception {
        String zipCode = stripNonNumbers(input.getZip());
        String telephone = stripNonNUmbers(input.getTelephone());
        String state = StringUtils.defaultString(input.getState());
        
        if(isTelephoneValid(telephone) && isZipCodeValid(zipCode) && 
           														isValidState(state)) {
            input.setZip(zipCode);
            input.setTelephone(telephone);
            return input;
        }
        
        return null;
    }
}
  • 처리를 마치면 스프링 배치 메타데이터 테이블에 여러가지 정보가 쌓인다.

    • Job의 종료 상태, 커밋 횟수, 읽은 아이템 개수, 걸러진 아이템 개수를 확인할 수 있다.
      • filter_count: 걸러진 아이템 개수를 의미한다.
      • read_count: 읽은 아이템 개수
      • write_count: DB에 쓰여진 아이템 개수
      select * from BATCH_STEP_EXCUTION;
    

처리기를 서로 연결하기

  • 하나 이상의 처리기를 사용할 때 CompositeItemProcessor<I,O> 는 한 필터의 출력을 그 다음 필터의 입력으로 전달하는 클래스이다.
    • 하나의 책임을 지닌 `ItemProcessor<I,O> 를 여러 개 작성하고 필요한만큼 연결해 쓴다.
@Bean
public CompositeItemProcessor<Customer, Customer> compositeBankCustomerProcessor() {
	List<ItemProcessor<Customer, Customer>> delegates = 
        Arrays.asList(creditScoreValidationProcessor(), bbProcessor(), cProcessor());
    CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
    processor.setDelegates(delegates);
    return processor;
}

레시피 11-5 트랜잭션

  • 읽기/쓰기를 하면서 예외상황 발생에 대응하기 위해 __트랜젝션__을 관리한다.
  • 스텝에 트랜젝션을 적용한 다음 다음 스텝에 재시도 로직을 건다.

트랜젝션

  • 스프링이 제공하는 PlatformTransactionManager 을 연결하고 스프링 배치가 참조할 수 있도록 구성한다.
    • 기본적으로 JdbcItemWriter 를 구성할 때 스프링 배치가 transactionManager 라는 PlatformTransactionManager 를 컨텍스트에서 얻어 사용하기 때문에 명시할 필요는 없다.
    • 하지만, 분명하게 명시하고 싶은 경우 아래처럼 설정해준다.
@Bean
public Step step1() {
    return steps.get("User Registration CSV To DB Step")
        		.<UserRegistration, UserRegistration>chunk(5)
        		.reader(csvFileReader())
        		.processor(userRegistrationValidationItemProcessor())
        		.writer(jdbcItemWriter())
        		.transactionManager(new DataSourceTransactionManager());
        		.build();
}
  • ItemReader<T> 가 읽은 아이템은 보통 한곳에 모아두고 ItemWriter<T> 커밋이 실패하면 이렇게 모아둔 아이템들을 원상태 그대로 다시 전송한다.
    • 일반적으로는 이런 처리 방식이 효율적이지만 트랜젝션이 적용된 리소스에서 아이템을 읽을 때는 문제가 발생할 수 있다.
    • 메시지 큐에서 읽은 아이템은 트랜젝션이 실패하면 롤백되어야 하기 때문이다.
    • 이런 경우 아래와 같이 reader가 트랜젝션을 수행하는 큐임을 명시해준다.
@Bean
public Step step1() {
    return steps.get("User Registration CSV To DB Step")
        		.<UserRegistration, UserRegistration>chunk(5)
        		.reader(csvFileReader()).readerIsTransactionalQueue()
        		.processor(userRegistrationValidationItemProcessor())
        		.writer(jdbcItemWriter())
        		.transactionManager(new DataSourceTransactionManager());
        		.build();
}

롤백

  • 트랜젝션을 수동으로 롤백시켜야 하는 경우에는 자바 구성을 통해 수동으로 롤백시켜야 하는 경우가 있다.
    • faultToleant() 로 오류가 허용된 스텝을 얻은 후, noRollback 으로 오류가 발생하지 않아야 하는 오류를 지정한다.
@Bean
public Step step1() {
    return steps.get("step1")
        		.<UserRegistration, UserRegistration>chunk(10)
        			.faultTolearnt()
        				.noRollback(YourException.class)
        		.reader(csvFileReader())
        		.processor(userRegistrationValidationItemProcessor())
        		.writer(jdbcItemWriter())
        		.transactionManager(new DataSourceTransactionManager());
        		.build();
}

레시피 11-6 재시도

  • 다시 시도해서 성공할 가능성이 있는 부분을 재시도하는 로직을 작성한다.

스텝 구성하기

@Bean
public Step step1() {
    return steps.get("step1")
        		.<UserRegistration, UserRegistration>chunk(10)
        			.faultTolearnt()
        				.retryLimit(3).retry(DeadlockLoserDataAccessException.class)
        		.reader(csvFileReader())
        		.processor(userRegistrationValidationItemProcessor())
        		.writer(jdbcItemWriter())
        		.transactionManager(new DataSourceTransactionManager());
        		.build();
}
  • 위에서 나온 faultToleant() 로 오류가 나더라도 멈추지 않도록 설정한다.
  • retryLimit() : 재시도 횟수를 정해준다.
  • retry() : 재시도의 대상 예외를 지정해준다. 예외가 여러 개인 경우 여러번 호출한다.

재시도 템플릿

  • 스프링 배치가 제공하는 재시도 및 복구 서비스를 코드에 활용하는 방법
    • 재시도 로직이 구현된 커스텀 ItemWriter를 작성
    • 전체 서비스 인터페이스에 재시도 기능을 추가
스프링 배치 RetryTemplate
  • 비즈니스 로직과 재시도 로직을 분리해 재시도 없이 한번만 시도하는 것처럼 코드를 작성할 수 있다.
  • 재시도 -> 실패 -> 복구의 반복적인 과정을 하나의 API 호출로 감싼다.

#####커스텀 ItemWriter에 RetryTemplate 를 추가하기

public class RetryableUserRegistrationServiceWriter implments ItemWriter<UserRegistration> {
    // 생략
    
    private final RetryTemplate retryTemplate;
    
    // 생략
    
    @Override
    public void write(List<? extends UserRegistration> items) throws Exception {
        for (final UserRegistration userRegistration : items) {
            UserRegistration registeredUserRegistration = retryTemeplate.excute(
            	(RetryCallback<UserRegistration, Exception>) context -> 
                	userRegistrationService.registerUser(userRegistration));
            )
        }
        
    }
}
BackOffPolicy
  • RetryTemlate의 재시도 시간 간격을 지정하는 기능
  • 같은 호출을 할 때 스텝의 시간 간격이 잠기지 않도록 예방하는 수단

AOP 기반의 재시도

  • 스프링 배치가 제공하는 AOP 어드바이저를 이용해 RetryTemplate 처럼 재시도를 해도 성공을 보장할 수 없는 메소드 호출을 감싸서 재시도를 처리할 수 있다.

  • userRegistartionService 에 재시도 로직을 어드바이스로 추가해 RetryTemplate 이 빠진 코드를 되돌릴 수 있다.

    1. 메서드에 @Retryable 을 붙여 재시도 로직을 추가한다.
    2. 배치 설정 클래스에 @EnableRetry 를 붙여서 기능을 켠다.
@Retryable(backoff = @BackOff(delay=1000, maxDelay=10000, muliplier=2))
public UserRegistration registerUser(UserRegistration userRegistration) {
    ...
}

레시피 11-7 스텝 실행 제어하기

  • 하나의 job은 여러개의 스텝을 거느릴 수 있다.
  • 각 스텝은 서로 떨어진 상태로 다음 스텝을 결정하는데 영향을 미친다.
  • step은 순차스텝, 동시성 스텝, 조건 스텝 등으로 조절할 수 있다.

순차스텝

  • 스텝 간에 암시적인 선후 관계를 가지고 있는 스텝이다.
  • 스텝마다 자신의 컨텍스트 내에서 실행되며 오직 부모 Job 실행 컨텍스트와 순서를 공유한다.
@Bean
public Job nightlyRegistrationsJob() {
    return jobs.get("nightlyRegistrationJob")
        		.start(loadRegistrations())
        		.next(reportStastics())
        		.next(...)
        		.build();
}

동시성 스텝

  • 스프링 배치는 처리 과정을 분기시켜 여러 갈래로 동시에 실행시키는 기능을 제공한다.
@Bean
public Job nightlyRegistrationsJob() {
    return jobs.get("nightlyRegistrationJob")
        		.start(loadRegistrations())
        		.split(taskExecutor().
                      	.add(
                            builder.flow(reportStastics()),
                            builder.flow(sendJmsNotifications())))
        		.build();
}

상태에 따른 조건부 스텝

  • Job 또는 Step의 ExitStatus에 따라 다음 스텝을 결정하는 조건 흐름이다.
  • BatchStatus : 배치는 enum형 프로퍼티로 COMPLETED, STARTING, STARTED, STOPPING, FAILED, ABANDONED, UNKNOWN 중 하나의 상태값을 가지고 있다.
  • end()fail() 같은 메소드로 처리해줄 수도 있다.
@Bean
public Job nightlyRegistrationsJob() {
    return jobs.get("nightlyRegistrationJob")
        		.start(loadRegistrations())
        			.on("COMPLETED").to(step2())
        			.on("FAILED").fail()
        			.on("*").to(failureStep())
        		.build();
}

결정에 따른 조건부 스텝

  • 스프링 배치의 decision 엘리먼트와 JobExecutionDecider로 구현체를 사용해 복잡한 로직을 사용한다.
public class HoroscopeDecider implements JobExecutionDecider {
    
    @Override
    public FlowExcutionStatus decide(JobExecution jobExecution
                                     SetpExcution stepExecution) {
        if(isMercuryInRetrograde()) {
            return new FlowExecutionStatus("MERCURY_IN_RETROGRADE");
        }
        
        return new FlowExecutionStatus.COMPLETED;
    }
} 
@Bean
public Job job() {
    return jobs.get("nightlyRegistrationJob")
        		.start(loadRegistrations())
        		.next(horoscopeDecider())
        			.on("MERCURY_IN_RETROGRADE").to(step2())
        			.on("COMPLETED").to(step3())
        		.build();
}

레시피 11-8 잡 실행하기

  • 배치솔루션 배포, 실행하는 방법

웹 애플리케이션에서 실행하기

  • 웹 애플리케이션에서 Job을 실행하면 클라이언트 스레드는 배치잡이 끝날 때까지 기다릴 여유가 없으므로 방식을 바꿔야한다.
  • 스프링 TaskExecutor 를 이용해 웹 레이어의 컨트롤러나 액션에서 잡을 띄울때 클라이언트 스레드와 상관없이 비동기로 실행한다.
@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
public class BatchConfiguration {
    
    // 실행 스레드를 만들어 블로킹 없이 관리하는 빈
    @Bean
    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
}
  • BatchConfigurer 를 직접 구현해서 TaskExecutor를 구성하고 JobLauncher에 추가gksek.
@Component
public class CustomBatchConfigurer extends DefaultBatchConfigurer {
    private final TaskExecutor taskExecutor;
    
    public CustomBatchConfigurer(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
    
    @Override
    protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLuancher = new SimpleJobLauncher();
        jobLuancher.setJobRepository(gerJobRepository());
        jobLuancher.setTaskExecutor(this.taskExecutor);
        jobLuancher.afterPropertiesSet();
        return jobLuancher;
    }
}

명령줄에서 실행하기

  • 윈도우 이벤트 같은 스케쥴러 시스템 스케줄러를 이용해 배치 프로세스를 배포한다.
java CommandLineJobRunner jobs.xml hourlyReport date = 'date +%m/%d/$Y time=date + %H'

스케줄링하여 실행하기

  • 기본 애플리케이션 컨텍스트에 @EnableScheduling 을 붙이고 ThreadPoolTaskScheduler 빈을 추가해서 스케줄링 기능을 활성화한다.
@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
@EnableScheduling
@EnableAsync
public class BatchConfiguration {
    
    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setThreadGroupName("batch-scheduler");
        taskScheduler.setPoolSzie(10);
        return taskScheduler;
    }
}

레시피 11-9 Job을 매개변수화하기

  • JobParameters 를 이용해 잡을 어떻게 실행하는 지 살펴보고 잡과 구성 클래스에서 JobParameters를 가져온다.