AsyncItemProcessor와 AsyncItemWriter의 도입
기존의 크롤링 파이프라인은
RSS 크롤링 -> LLM 호출을 통한 요약 및 키워드 생성 -> 임베딩 및 색인
3가지 스텝으로 구성되어 있습니다.
이때 LLM 호출 스텝과 임베딩 및 색인 스텝에서
taskExecutor를 활용해 병렬 처리를 하고 있었는데요.
문제는 바로 여기서 발생합니다.
Reader에서는 단순히 thread-safe하지 않은 Iterator를 사용하는데
멀티스레드로 Processor가 동작하다보니 같은 데이터에 대해 중복 처리가 발생한 것입니다.
이를 해결하기 위해 처음에는 Reader를 thread-safe하도록 synchronized 키워드를 넣을까 싶었는데,
이 경우 Reader에서 병목이 발생할 것으로 추측되었습니다.
https://docs.spring.io/spring-batch/reference/5.2/spring-batch-integration/sub-elements.html
Sub-elements :: Spring Batch Reference
Asynchronous Processors help you scale the processing of items. In the asynchronous processor use case, an AsyncItemProcessor serves as a dispatcher, executing the logic of the ItemProcessor for an item on a new thread. Once the item completes, the Future
docs.spring.io
스프링 배치 공식문서를 읽다보니
AsyncItemProcessor를 찾을 수 있었고, 프로세서를 비동기로 동작시키는 기능임을 파악했습니다.
그래서 Reader는 단일 스레드로 둔 채
Processor를 멀티스레드로 동작하도록 AsyncItemProcessor를 도입했고,
Post를 Future로 감싸 비동기로 동작하도록 하였습니다.
AsyncItemProcessor를 사용하고 응답이 온걸 확인하려면
AsyncItemWriter또한 사용해야 합니다.
내부적으로 Future.get 코드가 존재하여 응답이 모두 도착했을 때 Writer가 실행되도록 할 수 있습니다.
그래서 코드를 다음과 같이 구성했습니다.
@Bean
public Step extractSummaryStep() {
return new StepBuilder("extractSummaryStep", jobRepository)
.<Post, Future<Post>>chunk(5, transactionManager) // Rate Limiter(15/min)를 고려한 chunk size
.reader(postSummaryReader)
.processor(asyncSummaryProcessor())
.writer(asyncSummaryWriter())
// Resilience4j가 Retry를 담당
// Skip 로직만 유지: 실패한 아이템을 건너뛰고 다음 아이템 처리
.faultTolerant()
.skipLimit(10) // 실패 허용 개수 증가
.skip(Exception.class)
.build();
}
@Bean
public Step embedAndIndexStep() {
return new StepBuilder("embedAndIndexStep", jobRepository)
.<Post, Future<PostDocument>>chunk(20, transactionManager) // 5개씩 배치 처리
.reader(postEmbeddingReader)
.processor(asyncEmbeddingProcessor())
.writer(asyncEmbeddingWriter())
// Resilience4j가 Retry를 담당
// Skip 로직만 유지: 실패한 아이템을 건너뛰고 다음 아이템 처리
.faultTolerant()
.skipLimit(20) // 임베딩 실패 허용 개수
.skip(Exception.class)
.build();
}
@Bean
public ItemProcessor<Post, Future<Post>> asyncSummaryProcessor() {
AsyncItemProcessor<Post, Post> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(postSummaryProcessor);
asyncItemProcessor.setTaskExecutor(summaryTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Future<Post>> asyncSummaryWriter() {
AsyncItemWriter<Post> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(postSummaryWriter);
return asyncItemWriter;
}
@Bean
public ItemProcessor<Post, Future<PostDocument>> asyncEmbeddingProcessor() {
AsyncItemProcessor<Post, PostDocument> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(postEmbeddingProcessor);
asyncItemProcessor.setTaskExecutor(embeddingTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Future<PostDocument>> asyncEmbeddingWriter() {
AsyncItemWriter<PostDocument> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(postEmbeddingWriter);
return asyncItemWriter;
}
@Bean
public TaskExecutor summaryTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("summary-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean
public TaskExecutor embeddingTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("embedding-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
테스트 결과 중복 처리가 해결되면서도 성공적으로 병렬 처리가 동작함을 확인하였습니다.
Spring Batch 메타데이터 활용
기존에는 크롤링 이력을 관리하기 위해 CrawlingHistory라는 테이블을 별도로 두었습니다.
이는 Spring Batch에 대해 제대로 파악하지 못할 때 도입한 테이블입니다.
공식 문서를 살펴보던 중
Spring Batch에서 JobExecution과 StepExecution에 대한 데이터가 자동으로 관리됨을 파악했습니다.
이중 JobExecution은 기존의 CrawlingHistory와 칼럼들이 매우 유사하였기에
별도로 관리할 필요가 없음을 느끼고 테이블을 제거하고 관련 코드를 삭제하였습니다.
크롤링 상태를 관리하는 코드가 제거되며 120줄의 코드가 감소하는 효과를 볼 수 있었습니다.
'프로젝트 > Techfork' 카테고리의 다른 글
| [26/01/10] 오늘의 개발일지 - 온보딩 API 수정, 테스트 코드 작성, 엔티티 생성 성능 최적화 (0) | 2026.01.11 |
|---|---|
| [26/01/07] 오늘의 개발 일지 - Resilience4j 설정 개선 및 JdbcTemplate를 활용한 배치 처리 (0) | 2026.01.08 |
| [26/01/05] 오늘의 개발 일지 - Resilience4j 도입 (1) | 2026.01.06 |
| [26/01/03] 오늘의 개발 일지 - Spring Batch JobExecutionListener 도입 (0) | 2026.01.04 |
| [25/01/02] 오늘의 개발 일지 - RSS 크롤링 성능 및 안정성 개선 (0) | 2026.01.03 |