개발/Spring

Spring과 Kafka를 활용한 트랜잭셔널 아웃박스 패턴 구현

깨굴딱지 2024. 11. 21. 22:50

트랜잭셔널 아웃박스 패턴이란?

마이크로서비스 환경에서는 데이터베이스와 메시지 브로커를 함께 사용하는 경우가 많습니다. 예를 들어 콘서트 예매 시스템에서 결제가

완료되면 사용자에게 알림을 보내야 하는데, 이 과정에서 데이터베이스 트랜잭션과 메시지 발행이 원자성을 가져야 합니다.

트랜잭셔널 아웃박스 패턴은 이벤트를 바로 메시지 브로커로 보내지 않고, 먼저 데이터베이스의 아웃박스 테이블에 저장한 후 별도의

프로세스가 이를 읽어서 메시지 브로커로 전송하는 방식입니다.

 

1. 기존 구현의 문제점

기존 시스템은 결제 처리와 동시에 카프카로 메시지를 직접 전송했습니다

@Transactional
public PaymentResult processPayment(PaymentCommand command) {
    Payment payment = paymentRepository.save(...);
    kafkaTemplate.send("payment-notification", new PaymentEvent(...));
    return PaymentResult.success();
}

 

이 방식의 문제점:

  1. 데이터 정합성 문제: 결제는 성공했지만 메시지 전송이 실패할 수 있음
  2. 메시지 유실: 네트워크 장애나 카프카 장애 시 메시지가 영구적으로 유실될 수 있음
  3. 모니터링 어려움: 실패한 메시지를 추적하고 재시도하기 어려움

 

1.1 아웃박스 패턴을 통한 해결 방안

아웃박스 패턴을 적용하면 다음과 같은 흐름으로 처리됩니다:

  1. 결제 트랜잭션에서 결제 처리와 아웃박스 테이블 저장을 함께 수행
  2. 결제 트랜잭션 성공 후 별도의 프로세스가 아웃박스 테이블을 읽어서 메시지 전송
  3. 실패한 메시지는 상태 관리를 통해 재시도

 

2. 상세 구현 설명

2.1 아웃박스 테이블 설계

CREATE TABLE payment_outbox (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    event_type VARCHAR(100) NOT NULL,
    aggregate_id BIGINT NOT NULL,
    payload TEXT NOT NULL,
    status VARCHAR(20) NOT NULL,
    retry_count INT DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    processed_at TIMESTAMP,
    INDEX idx_status_created (status, created_at)
);

 

2.2 아웃박스 엔티티 및 레포지토리

@Entity
@Table(name = "payment_outbox")
@Getter
public class PaymentOutbox {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String eventType;
    private Long aggregateId;
    private String payload;
    private String status;
    private int retryCount;
    private LocalDateTime createdAt;
    private LocalDateTime processedAt;
    
    public static PaymentOutbox from(PaymentMessageSendEvent event) {
        PaymentOutbox outbox = new PaymentOutbox();
        outbox.eventType = "PAYMENT_COMPLETED";
        outbox.aggregateId = event.getPaymentId();
        outbox.payload = JsonUtils.toJson(event);
        outbox.status = "PENDING";
        outbox.createdAt = LocalDateTime.now();
        return outbox;
    }
    
    public void markAsProcessed() {
        this.status = "PROCESSED";
        this.processedAt = LocalDateTime.now();
    }
    
    public void markAsFailed() {
        this.status = "FAILED";
        this.retryCount++;
        this.processedAt = LocalDateTime.now();
    }
}

@Repository
public interface PaymentOutboxRepository extends JpaRepository<PaymentOutbox, Long> {
    List<PaymentOutbox> findByStatusAndCreatedAtBefore(
        String status, LocalDateTime threshold);
}

 

테이블 설계 시 고려사항

  • event_type: 이벤트 종류 구분 (예: PAYMENT_COMPLETED)
  • aggregate_id: 관련 엔티티의 ID (결제 ID 등)
  • payload: JSON 형태의 실제 메시지 데이터
  • status: 메시지 상태 관리 (PENDING, PROCESSED, FAILED)
  • retry_count: 재시도 횟수 추적
  • idx_status_created: 처리해야 할 메시지 조회를 위한 인덱스

 

2.3 이벤트 처리 프로세스

Spring의 TransactionalEventListener를 활용하여 두 단계로 나누어 처리합니다:

  1. BEFORE_COMMIT 단계: 아웃박스 테이블에 이벤트 저장
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void recordMessage(PaymentMessageSendEvent event) {
    PaymentOutbox outbox = PaymentOutbox.from(event);
    outboxRepository.save(outbox);
}

 

 

2. AFTER_COMMIT 단계: 카프카로 메시지 발행

@Async(EVENT_ASYNC_TASK_EXECUTOR)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handlePaymentNotification(PaymentMessageSendEvent event) {
    try {
        String message = formatMessage(event);
        messageSender.sendMessage(message);
        updateOutboxStatus(event, "PROCESSED");
    } catch (Exception e) {
        updateOutboxStatus(event, "FAILED");
        log.error("메시지 전송 실패", e);
    }
}

 

 

3 실패한 메시지 재처리

 

실패한 메시지는 PaymentMessageRelay가 주기적으로 재처리를 시도합니다

@Scheduled(fixedDelay = 300000) // 5분마다 실행
@Transactional
public void processFailedMessages() {
    List<PaymentOutbox> failedEvents = outboxRepository
        .findByStatusAndCreatedAtBefore("FAILED", LocalDateTime.now().minusMinutes(5));
    
    for (PaymentOutbox outbox : failedEvents) {
        // 재처리 로직
    }
}

 

주요 특징:

  • 5분 주기로 실패한 메시지 검색
  • 트랜잭션 내에서 상태 업데이트
  • 상세한 로깅으로 모니터링 용이

 

3 Graceful Shutdown 처리

비동기 처리의 안정성을 위해 다음 설정이 중요합니다

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Bean(name = EVENT_ASYNC_TASK_EXECUTOR)
    public Executor messageTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(10);
        // ... 기타 설정
        return executor;
    }
}

 

이 설정으로:

  • 애플리케이션 종료 시 진행 중인 메시지 처리 완료 보장
  • 메시지 유실 방지
  • 리소스 안전한 정리

 

4. 운영상 이점

4.1 데이터 정합성

  • 결제와 메시지 기록이 단일 트랜잭션으로 처리
  • 실패한 메시지도 재처리를 통해 결국 전송됨 (Eventually Consistent)

4.2 운영 효율성

  • 메시지 상태 모니터링 용이
  • 장애 상황에서도 데이터 복구 가능
  • 명확한 감사 로그 제공

4.3 확장성

  • 새로운 이벤트 타입 추가가 용이
  • 메시지 포맷 변경이 자유로움
  • 컨슈머 추가가 간단

 

5. 모니터링 및 운영 전략

5.1 모니터링 지표

  • 실패한 메시지 수
  • 재시도 횟수 분포
  • 처리 지연 시간
  • 메시지 처리량

5.2 운영 체크리스트

  • 아웃박스 테이블 크기 관리
  • 처리 완료된 레코드 아카이빙
  • 재시도 정책 최적화
  • 로그 모니터링

 

마치며

트랜잭셔널 아웃박스 패턴은 구현은 다소 복잡할 수 있지만, 운영 안정성과 데이터 정합성 측면에서 큰 이점을 제공합니다.

실제 운영에서는 모니터링과 장애 대응 전략을 잘 수립하는 것이 중요하며, 이를 통해 안정적인 메시지 처리 시스템을 구축할 수 있습니다.

 

 

참고문헌..

https://medium.com/@greg.shiny82/%ED%8A%B8%EB%9E%9C%EC%9E%AD%EC%85%94%EB%84%90-%EC%95%84%EC%9B%83%EB%B0%95%EC%8A%A4-%ED%8C%A8%ED%84%B4%EC%9D%98-%EC%8B%A4%EC%A0%9C-%EA%B5%AC%ED%98%84-%EC%82%AC%EB%A1%80-29cm-0f822fc23edb