본문 바로가기

Data/Kafka

[Kafka] 카프카 컨슈머: 설정과 오프셋 관리

이 글은 "카프카 핵심 가이드(Kafka: The Definitive Guide)"를 기반으로 작성한 시리즈 포스트입니다.
이번 편에서는 컨슈머 동작을 제어하는 주요 설정, 오프셋 커밋 전략, 리밸런스 리스너, 그리고 특정 오프셋으로 이동하는 seek API를 다룹니다.


1. 주요 컨슈머 설정

5편에서 다룬 필수 속성 외에, 컨슈머 동작에 큰 영향을 미치는 설정들입니다.

 

Fetch 관련 설정

fetch.min.bytes

브로커가 fetch 요청에 응답할 때 반환할 최소 데이터 크기입니다. 기본값은 1바이트로, 레코드가 하나라도 있으면 즉시 반환합니다. 값을 높이면 브로커가 데이터가 쌓일 때까지 기다린 뒤 반환하므로, 트래픽이 적은 환경에서 불필요한 요청 횟수를 줄일 수 있습니다.

 

fetch.max.wait.ms

fetch.min.bytes를 채우지 못했을 때 브로커가 최대 기다리는 시간입니다. 기본값은 500ms입니다. 두 설정은 함께 작동합니다. "데이터가 이만큼 쌓이면 바로 보내고, 못 채우면 최대 이 시간까지는 기다린다"는 의미입니다.

 

fetch.max.bytes

한 번의 fetch 요청으로 받을 수 있는 전체 최대 데이터 크기입니다. 기본값은 50MB입니다. 컨슈머의 메모리 사용량과 네트워크 대역폭을 제어할 때 튜닝합니다.

fetch.max.bytes는 브로커의 message.max.bytes보다 크게 설정해야 합니다. 단일 메시지 크기가 이 한도를 초과하면 컨슈머가 해당 메시지를 영원히 읽지 못하는 상황이 발생합니다.

 

타임아웃 관련 설정

session.timeout.ms / heartbeat.interval.ms

session.timeout.ms는 이 시간 안에 하트비트가 도착하지 않으면 브로커가 컨슈머를 죽은 것으로 간주하고 리밸런스를 트리거합니다. 기본값은 45초입니다.

 

heartbeat.interval.ms는 하트비트 전송 주기이며, 일반적으로 session.timeout.ms의 1/3 이하로 설정합니다. 기본값은 3초입니다.

 

두 값을 낮추면 장애를 빠르게 감지할 수 있지만, 네트워크 순간 지연으로 인한 불필요한 리밸런스가 늘어날 수 있습니다.

 

max.poll.interval.ms

poll() 호출 간격의 최대 허용 시간입니다. 기본값은 5분입니다. session.timeout.ms가 하트비트 스레드의 생존을 감시한다면, max.poll.interval.ms메인 스레드가 실제로 poll()을 계속 호출하고 있는지를 별도로 감시합니다. 이 시간을 초과하면 브로커는 해당 컨슈머를 그룹에서 제외하고 리밸런스를 트리거합니다.

 

하트비트 스레드가 살아있더라도, 메인 스레드가 무거운 처리에 막혀 poll()을 제때 호출하지 못하면 컨슈머는 강제 퇴출됩니다. 처리 로직이 오래 걸리는 경우 이 값을 늘리거나, max.poll.records를 줄여 한 번에 처리할 양을 조절하는 것을 고려해야 합니다.

 

기타 설정

max.poll.records

poll() 한 번 호출에 반환되는 최대 레코드 수입니다. 기본값은 500입니다. 처리 로직이 무거운 경우 이 값을 낮춰 max.poll.interval.ms 초과를 방지할 수 있습니다.

 

partition.assignment.strategy

파티션을 컨슈머에게 배분하는 Assignor 알고리즘을 지정합니다. 5편에서 다룬 Eager/Cooperative 리밸런스 프로토콜과는 별개로, "어떤 컨슈머가 어떤 파티션을 맡을지" 결정하는 알고리즘입니다.

특징
RangeAssignor 토픽별로 파티션 범위를 나눠 할당. 토픽 수가 많으면 불균등 발생 가능
RoundRobinAssignor 전체 파티션을 순환 배분. 균등하지만 리밸런스 시 이동이 많음
StickyAssignor 리밸런스 시 기존 할당 최대한 유지. Eager 프로토콜
CooperativeStickyAssignor Sticky + Cooperative 프로토콜. Kafka 3.0+에서 기본 목록에 포함

실무에서는 불필요한 파티션 이동을 줄이기 위해 CooperativeStickyAssignor를 권장합니다.

 

auto.offset.reset

커밋된 오프셋이 없거나 유효하지 않을 때의 동작을 결정합니다. 이미 커밋된 오프셋이 존재한다면 이 설정과 무관하게 커밋된 오프셋부터 읽습니다.

동작 권장 상황
latest (기본값) 가장 최근 오프셋부터 읽음 실시간 알림, 모니터링 등 과거 데이터가 불필요한 경우
earliest 파티션의 가장 처음 오프셋부터 읽음 데이터 파이프라인, 집계, 재처리 등 전체 데이터가 필요한 경우
none 오프셋 없으면 예외 발생 오프셋 누락을 허용하지 않는 운영 환경에서 조기 감지 목적

 


2. 커밋과 오프셋

카프카에서 오프셋 커밋은 "여기까지 처리했다"는 위치를 브로커의 __consumer_offsets 토픽에 기록하는 행위입니다. 컨슈머가 재시작되면 커밋된 오프셋부터 다시 읽기 시작합니다.

 

 

자동 커밋

enable.auto.commit=true(기본값)로 설정하면 auto.commit.interval.ms(기본값 5초) 주기마다 마지막으로 poll()이 반환한 오프셋을 자동으로 커밋합니다.

props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");

구현이 단순하지만 두 가지 위험이 있습니다. 첫째, 크래시 발생 시 마지막 커밋 이후에 처리한 메시지를 재시작 후 중복으로 처리할 수 있습니다. 둘째, 자동 커밋은 poll() 호출 시점에 트리거되므로, 이전 poll()이 반환한 레코드를 아직 처리 중인데 다음 poll()을 호출하면 처리가 완료되지 않은 오프셋이 커밋될 수 있습니다.

 

수동 커밋

enable.auto.commit=false로 설정하고 직접 커밋 시점을 제어합니다. 처리 완료 후 커밋하는 구조이므로 메시지 유실 위험은 낮아지지만, 커밋 전 크래시 시에는 재처리(중복)가 발생할 수 있습니다. 이것이 카프카의 기본 보장인 at-least-once 의미론입니다.

 

commitSync() — 동기 커밋

브로커의 응답을 기다리며, 재시도 가능한 오류는 내부에서 자동으로 재시도합니다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    try {
        consumer.commitSync(); // 처리 완료 후 동기 커밋
    } catch (CommitFailedException e) {
        // 리밸런스 등으로 더 이상 파티션 소유권이 없을 때 발생
        log.error("커밋 실패 — 재시도 불가", e);
    }
}

블로킹 방식이므로 커밋 응답을 기다리는 동안 다음 처리가 멈춥니다. 처리량보다 정확성이 중요한 경우에 적합합니다.

 

commitAsync() — 비동기 커밋

커밋 요청을 보내고 즉시 다음 작업으로 넘어갑니다. 선택적으로 콜백을 등록해 결과를 받을 수 있습니다. 논블로킹이므로 처리량이 높지만, 실패 시 재시도하지 않습니다. 카프카가 의도적으로 그렇게 설계한 이유가 있습니다. 예를 들어 오프셋 10을 커밋하는 요청이 실패해서 재시도하는 사이에, 오프셋 20을 커밋하는 요청이 먼저 성공해버리면 오프셋이 10으로 되돌아가는 역행이 발생합니다. 이후 재시작 시 이미 처리한 10~20 구간을 다시 처리하게 됩니다. 재시도 없이 다음 커밋이 자연스럽게 덮어쓰도록 두는 편이 더 안전합니다.

 

권장 패턴: 루프에서 Async, 종료 시 Sync

두 방식의 장점을 조합한 패턴입니다.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
        consumer.commitAsync(); // 루프: 논블로킹으로 처리량 확보
    }
} catch (Exception e) {
    log.error("처리 중 오류", e);
} finally {
    try {
        consumer.commitSync(); // 종료 전: 마지막 오프셋 확실히 커밋
    } finally {
        consumer.close();
    }
}

루프 중 commitAsync() 실패는 이후 커밋이 자연스럽게 덮어쓰므로 큰 문제가 되지 않습니다. 반면 종료 시점에는 이후에 커밋이 없으므로 commitSync()로 확실히 기록해야 합니다.

 

특정 오프셋만 커밋하기

poll()이 반환한 전체 오프셋이 아니라, 처리가 완료된 파티션별 오프셋만 골라서 커밋할 수도 있습니다.

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

for (ConsumerRecord<String, String> record : records) {
    process(record);
    currentOffsets.put(
        new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1) // 다음에 읽을 오프셋 = 현재 + 1
    );
}
consumer.commitAsync(currentOffsets, null);

커밋하는 오프셋은 "다음번에 읽을 오프셋", 즉 마지막으로 처리한 오프셋 + 1이어야 합니다.


3. 리밸런스 리스너

리밸런스가 발생할 때 파티션 반납 직전, 새 파티션 할당 직후에 특정 로직을 실행하고 싶다면 ConsumerRebalanceListener를 사용합니다.

 

 

구현 예시

consumer.subscribe(Collections.singletonList("orders"), new ConsumerRebalanceListener() {

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 파티션 반납 직전 호출 — 여기서 오프셋을 커밋해야 안전하다
        log.info("파티션 반납: {}", partitions);
        consumer.commitSync(currentOffsets); // 폴링 루프에서 누적해온 오프셋 맵
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 새 파티션 할당 직후 호출 — 상태 초기화 또는 seek 적기
        log.info("파티션 할당: {}", partitions);
    }
});

언제 쓰는가?

onPartitionsRevoked()의 핵심 용도는 오프셋 커밋입니다. 수동 커밋 환경에서 리밸런스가 발생하면, 처리 중이던 오프셋이 커밋되지 못한 채 파티션이 다른 컨슈머에게 넘어갈 수 있습니다. 이 콜백에서 commitSync()를 호출하면 해당 상황을 방지할 수 있습니다.

 

onPartitionsAssigned()는 새 파티션을 받은 직후 초기화 작업에 사용합니다. 예를 들어 오프셋을 외부 DB에 별도로 저장하는 방식을 쓰고 있다면, 여기서 DB로부터 오프셋을 조회해 seek()으로 직접 이동할 수 있습니다.


4. 특정 오프셋의 레코드 읽기

기본적으로 컨슈머는 마지막으로 커밋된 오프셋부터 읽습니다. 하지만 특정 오프셋이나 특정 시점부터 읽어야 하는 경우에는 seek API를 활용합니다.

seek API 종류

// 처음 오프셋으로 이동
consumer.seekToBeginning(consumer.assignment());

// 마지막 오프셋으로 이동
consumer.seekToEnd(consumer.assignment());

// 특정 오프셋으로 이동
TopicPartition partition = new TopicPartition("orders", 0);
consumer.seek(partition, 42L);

// 특정 타임스탬프 이후 첫 오프셋 조회 후 이동
Map<TopicPartition, Long> timestampMap = new HashMap<>();
timestampMap.put(partition, Instant.now().minus(1, ChronoUnit.HOURS).toEpochMilli());

Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(timestampMap);
result.forEach((tp, offsetAndTimestamp) -> {
    if (offsetAndTimestamp != null) {
        consumer.seek(tp, offsetAndTimestamp.offset());
    }
});

 

seek의 주의사항

seek()은 파티션이 할당된 상태에서만 호출할 수 있습니다. assign()으로 직접 파티션을 지정한 경우에는 즉시 사용 가능하지만, subscribe()로 구독한 경우에는 파티션 할당이 완료된 이후, 즉 첫 poll() 이후 또는 onPartitionsAssigned() 내부에서 호출해야 합니다.

 

또한 seekToBeginning()seekToEnd()lazy하게 동작합니다. 호출 즉시 이동하는 것이 아니라, 다음 poll() 호출 시 실제 이동이 반영됩니다.

 


정리

설정/개념 핵심 내용
fetch.min.bytes / fetch.max.wait.ms 반환 최소 크기와 대기 시간으로 fetch 효율 조절
fetch.max.bytes 한 번의 fetch 요청으로 받을 전체 최대 크기 (기본 50MB)
session.timeout.ms 하트비트 미수신 시 컨슈머 죽은 것으로 판단하는 기준
max.poll.interval.ms poll() 호출 간격 최대 허용 시간 — 메인 스레드 생존 감시
max.poll.records poll() 1회당 반환할 최대 레코드 수
partition.assignment.strategy 파티션 배분 알고리즘 (CooperativeStickyAssignor 권장)
auto.offset.reset 커밋 오프셋 없을 때 동작 (latest / earliest / none)
자동 커밋 간편하지만 크래시 시 중복 또는 미처리 오프셋 커밋 가능
commitSync() 블로킹, 자동 재시도 — 정확성 우선 시
commitAsync() 논블로킹, 재시도 없음 — 처리량 우선 시
권장 패턴 루프: commitAsync() + 종료 시: commitSync()
커밋 오프셋 규칙 마지막으로 처리한 오프셋 + 1을 커밋
onPartitionsRevoked() 파티션 반납 전 — 오프셋 커밋 적기
onPartitionsAssigned() 파티션 할당 후 — 상태 초기화 / seek 적기
seek() 파티션 할당 완료 후 호출, seekToBeginning/End는 lazy 동작

다음 편 예고: 폴링 루프 종료(wakeup), 디시리얼라이저, 독립 실행 컨슈머(assign)를 다룹니다.