본문 바로가기

Data/Kafka

[Kafka] 카프카 컨슈머: 종료, 역직렬화, 독립 실행

이 글은 "카프카 핵심 가이드(Kafka: The Definitive Guide)"를 기반으로 작성한 시리즈 포스트입니다.
이번 편에서는 컨슈머를 안전하게 종료하는 방법, 역직렬화, 그리고 컨슈머 그룹 없이 특정 파티션을 직접 읽는 독립 실행 컨슈머를 다룹니다.


1. 폴링 루프 벗어나기

컨슈머는 while (true) 루프 안에서 poll()을 반복합니다. 이 루프를 안전하게 종료하려면 어떻게 해야 할까요?

 

문제: poll()은 블로킹 호출이다

poll(Duration.ofMillis(1000))은 레코드가 없으면 최대 1초간 블로킹됩니다. 이 상태에서 SIGTERM 같은 종료 신호를 받아도 poll()이 반환될 때까지 종료 처리를 시작할 수 없습니다. 루프 조건 변수(while (running))를 바꿔도 현재 진행 중인 poll() 자체를 깨울 수는 없습니다.

 

해법: consumer.wakeup()

KafkaConsumer.wakeup()은 블로킹 중인 poll()을 즉시 중단시키고 WakeupException을 발생시킵니다. KafkaConsumer에서 스레드 안전을 보장하는 유일한 메서드이므로, 다른 스레드(ShutdownHook 등)에서 안전하게 호출할 수 있습니다.

 

구현 패턴

// 메인 스레드에서 실행되는 컨슈머
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// JVM 종료 시 ShutdownHook이 wakeup() 호출
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.wakeup();
}));

try {
    consumer.subscribe(Collections.singletonList("orders"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
        consumer.commitAsync();
    }
} catch (WakeupException e) {
    // wakeup()에 의해 정상적으로 루프 탈출 — 예외 처리 불필요
} finally {
    try {
        consumer.commitSync(); // 마지막 오프셋 확실히 커밋
    } finally {
        consumer.close();      // 브로커에 그룹 이탈 통보 + 리소스 정리
    }
}

WakeupException은 오류가 아니다

WakeupException은 오류 상황이 아니라 의도된 종료 신호입니다. catch 블록에서 별도 처리 없이 흘려보내고 finally에서 정리 작업만 수행하면 됩니다.

 

close()는 단순 리소스 해제가 아니다

consumer.close()는 브로커에 즉시 이탈을 통보해 session.timeout.ms를 기다리지 않고 바로 리밸런스를 트리거합니다. 다른 컨슈머가 신속하게 파티션을 이어받을 수 있어, 호출하지 않는 것과 비교하면 그룹 전체의 처리 중단 시간이 크게 줄어듭니다.

 


2. 디시리얼라이저

카프카는 메시지를 브로커에 byte[]로 저장합니다. 컨슈머는 이 바이트 배열을 다시 Java 객체로 변환해야 하는데, 이 역할을 디시리얼라이저(Deserializer)가 담당합니다.

 

 

기본 제공 디시리얼라이저

카프카 클라이언트 라이브러리는 기본 타입에 대한 디시리얼라이저를 기본으로 제공합니다.

디시리얼라이저 대응 시리얼라이저 용도
StringDeserializer StringSerializer 문자열
IntegerDeserializer IntegerSerializer 정수
ByteArrayDeserializer ByteArraySerializer 바이트 배열 (역직렬화 직접 처리 시)

 

타입 일치 원칙

프로듀서의 Serializer와 컨슈머의 Deserializer는 반드시 타입이 일치해야 합니다. StringSerializer로 직렬화한 데이터를 IntegerDeserializer로 읽으려 하면 역직렬화 오류가 발생합니다. 카프카 브로커는 바이트 배열을 그대로 저장할 뿐이므로, 타입 불일치를 브로커 레벨에서 감지할 수 없습니다.

 

커스텀 디시리얼라이저

public class OrderDeserializer implements Deserializer<Order> {

    // ObjectMapper는 생성 비용이 높으므로 인스턴스를 재사용해야 합니다
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public Order deserialize(String topic, byte[] data) {
        if (data == null) return null;
        try {
            return objectMapper.readValue(data, Order.class);
        } catch (Exception e) {
            throw new SerializationException("Order 역직렬화 실패", e);
        }
    }
}

등록 방법은 다른 Deserializer와 동일합니다.

props.put("value.deserializer", OrderDeserializer.class.getName());

 

커스텀 디시리얼라이저의 단점

커스텀 디시리얼라이저는 프로듀서와 컨슈머 간 강한 결합을 만듭니다. Order 클래스의 필드가 바뀌면 프로듀서와 컨슈머를 동시에 수정해야 하며, 서로 다른 팀이나 서비스가 같은 토픽을 사용한다면 버전 관리가 복잡해집니다.

 

실무 권장: Avro + Schema Registry

이런 이유로 실무에서는 Apache AvroConfluent Schema Registry를 함께 사용하는 것을 권장합니다. 스키마를 레지스트리에 등록하고 버전을 관리하므로, 스키마 변경이 발생해도 하위 호환성을 유지하며 프로듀서와 컨슈머를 독립적으로 배포할 수 있습니다.

// Confluent Schema Registry 사용 시
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://schema-registry:8081");

참고 — Serdes: Kafka Streams에서는 Serializer와 Deserializer를 Serdes라는 단일 객체로 묶어 관리합니다. Serdes.String(), Serdes.Integer() 처럼 사용하며, 스트림 처리 토폴로지 전반에 걸쳐 직렬화 설정을 일관되게 적용할 수 있습니다. Kafka Streams를 다룰 때 자세히 살펴볼 개념입니다.

 


3. 독립 실행 컨슈머

지금까지는 subscribe()를 통해 컨슈머 그룹에 참여하는 방식을 다뤘습니다. 하지만 경우에 따라 특정 파티션을 직접 지정해 읽어야 할 때가 있습니다. 이때 assign()을 사용하는 독립 실행 컨슈머(Standalone Consumer)가 적합합니다.

언제 쓰는가?

  • 파티션 수만큼 정확히 컨슈머를 배치하고 직접 관리하고 싶을 때
  • 리밸런스 없이 특정 파티션의 데이터를 안정적으로 읽어야 할 때
  • 컨슈머 그룹의 오프셋 관리 대신 외부 저장소에서 오프셋을 직접 관리할 때

구현

// 읽을 파티션 목록 직접 지정
List<TopicPartition> partitions = List.of(
    new TopicPartition("orders", 0),
    new TopicPartition("orders", 1)
);

consumer.assign(partitions);

// 이후 폴링 루프는 동일
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    // __consumer_offsets에 커밋하려면 group.id가 설정되어 있어야 합니다
    consumer.commitSync();
}

 

파티션 번호를 하드코딩하는 대신, 토픽의 파티션 목록을 동적으로 조회해 전체 파티션을 할당할 수도 있습니다.

// 토픽의 전체 파티션을 동적으로 조회해 assign
List<PartitionInfo> partitionInfos = consumer.partitionsFor("orders");
List<TopicPartition> allPartitions = partitionInfos.stream()
    .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
    .collect(Collectors.toList());

consumer.assign(allPartitions);

 

subscribe()와 assign()의 차이

  subscribe() assign()
파티션 할당 카프카가 자동 배분 직접 지정
리밸런스 발생 발생하지 않음
group.id 필수 그룹 관리 목적으로는 불필요 (오프셋 커밋 시에는 필요)
장애 대응 자동 (다른 컨슈머가 인계) 직접 구현 필요
오프셋 커밋 __consumer_offsets 토픽 직접 관리 가능

 

주의사항

assign()subscribe()동시에 사용할 수 없습니다. 하나를 호출한 뒤 다른 것을 호출하면 IllegalStateException이 발생합니다.

 

또한 독립 실행 컨슈머는 리밸런스가 없으므로, 컨슈머가 죽어도 카프카가 자동으로 다른 컨슈머에 파티션을 넘겨주지 않습니다. 가용성이 필요하다면 장애 감지와 재할당 로직을 직접 구현해야 합니다.

 


정리

개념 핵심 내용
wakeup() 블로킹 중인 poll()을 즉시 깨우는 유일한 스레드 안전 메서드
WakeupException 오류가 아닌 정상 종료 신호 — catch 후 흘려보내면 됨
close() 브로커에 즉시 이탈 통보 → 빠른 리밸런스 트리거
Deserializer 바이트 배열을 Java 객체로 복원. 프로듀서 Serializer와 타입 일치 필수
커스텀 Deserializer 프로듀서-컨슈머 간 강한 결합 발생 → 실무에서는 Avro + Schema Registry 권장
assign() 특정 파티션을 직접 지정해 읽는 독립 실행 컨슈머
assign() vs subscribe() 리밸런스 없음 / group.id 그룹 관리 목적으로는 불필요 / 장애 대응 직접 구현

 

이것으로 카프카 컨슈머 시리즈를 마칩니다.