본문 바로가기

Data/Kafka

[Kafka] 카프카 프로듀서 완전 정복 1편 — 구조, 전송, 시리얼라이저

이 글은 Apache Kafka Java Client 3.7.x 기준으로 작성되었습니다.

 


1. 프로듀서란 무엇인가

카프카는 프로듀서(Producer), 브로커(Broker), 컨슈머(Consumer) 세 축으로 구성됩니다. 이 중 프로듀서는 토픽에 메시지를 보내는 클라이언트로, 브로커와 직접 TCP 연결을 맺고 통신합니다.

 

단순히 send()를 한 번 호출하는 것처럼 보이지만, 내부적으로는 직렬화 → 파티션 결정 → 배치 버퍼링 → 네트워크 전송의 파이프라인이 작동합니다. 이 구조를 이해해야 설정값의 의미와 장애 상황에서의 동작을 제대로 파악할 수 있습니다.

프로듀서의 핵심 책임은 두 가지입니다.

  • 직렬화: Java 객체를 브로커가 저장할 수 있는 바이트 배열로 변환합니다.
  • 파티션 결정: 메시지를 어느 파티션으로 보낼지 결정합니다. (키 기반, 라운드로빈, 커스텀 전략)

 


2. 프로듀서의 구성 요소

프로듀서가 send()를 호출하면 내부적으로 다섯 단계를 거쳐 브로커에 도달합니다.

 

구성 요소 역할
ProducerRecord 보낼 메시지 단위 (토픽, 키, 값, 헤더, 파티션, 타임스탬프 포함 가능)
Serializer 키/값 객체를 바이트 배열로 변환
Partitioner 어느 파티션으로 보낼지 결정
RecordAccumulator 파티션별로 메시지를 배치 단위로 모아두는 내부 버퍼
Sender 백그라운드 I/O 스레드. linger.ms 또는 batch.size 조건이 충족되면 배치를 브로커로 전송

send()를 호출하는 애플리케이션 스레드와 실제 네트워크 전송을 담당하는 Sender 스레드는 분리되어 있습니다. 덕분에 애플리케이션은 브로커 응답을 기다리지 않고 계속 메시지를 RecordAccumulator에 쌓을 수 있습니다.

 


3. 프로듀서 생성

의존성 추가 (Maven)

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.0</version>
</dependency>

 

기본 생성 코드

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,   StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

BOOTSTRAP_SERVERS_CONFIG는 초기 클러스터 연결에 사용하는 브로커 주소입니다. 첫 연결 후에는 클러스터 전체 메타데이터를 자동으로 가져오기 때문에 모든 브로커를 나열할 필요는 없습니다. 다만 가용성을 위해 2~3개 이상 지정하는 것이 권장됩니다.

 


4. ProducerRecord 구조

ProducerRecord는 프로듀서가 전송하는 메시지의 단위입니다. 필수 필드는 토픽과 값이며, 나머지는 선택입니다.

// 가장 기본적인 형태 (토픽 + 값)
ProducerRecord<String, String> record1 = new ProducerRecord<>("my-topic", "order-value");

// 키 포함 (같은 키는 항상 같은 파티션으로 전송됨)
ProducerRecord<String, String> record2 = new ProducerRecord<>("my-topic", "order-key", "order-value");

 

파티션 번호와 타임스탬프, 헤더까지 직접 지정하려면 아래 생성자를 사용합니다.

// (topic, partition, timestamp, key, value, headers)
Headers headers = new RecordHeaders();
headers.add("source", "order-service".getBytes(StandardCharsets.UTF_8));

ProducerRecord<String, String> record = new ProducerRecord<>(
    "my-topic",
    0,                          // 파티션 번호 (null이면 Partitioner가 결정)
    System.currentTimeMillis(), // 타임스탬프
    "order-key",
    "order-value",
    headers
);

주의: 파티션을 직접 지정하면 Partitioner가 무시됩니다. 특별한 이유가 없다면 키를 활용해 파티션을 간접 결정하는 방식이 권장됩니다.

 


5. 메시지 전송 방식

카프카 프로듀서의 전송 방식은 크게 세 가지이며, Java에서는 CompletableFuture로 래핑하는 패턴도 자주 사용됩니다.

 

5-1. Fire and Forget (결과 무시)

producer.send(record);

전송 후 결과를 확인하지 않습니다. 카프카는 내부적으로 재시도(retries)를 수행하지만, 재시도가 모두 소진된 뒤의 최종 실패 여부는 애플리케이션이 알 수 없습니다. 유실이 허용되는 로그성 데이터에만 적합합니다.

 


5-2. 동기 전송

try {
    RecordMetadata metadata = producer.send(record).get(); // 브로커 응답까지 블로킹
    System.out.println("파티션: " + metadata.partition());
    System.out.println("오프셋: "  + metadata.offset());
} catch (ExecutionException e) {
    // 브로커 전송 실패 (재시도 소진 후 도달)
    System.err.println("전송 실패: " + e.getCause().getMessage());
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

send()Future<RecordMetadata>를 반환하며, .get()으로 블로킹하면 결과를 동기적으로 받을 수 있습니다.

  • 장점: 구현이 단순하고 전송 성공/실패를 즉시 확인할 수 있습니다.
  • 단점: 브로커 응답을 기다리는 동안 호출 스레드가 블로킹됩니다. 처리량이 중요한 환경에서는 적합하지 않습니다.

 


5-3. 비동기 전송 (콜백)

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        System.err.println("전송 실패: " + exception.getMessage());
    } else {
        System.out.printf("전송 성공 — 파티션: %d, 오프셋: %d%n",
                metadata.partition(), metadata.offset());
    }
});

콜백은 카프카 내부 I/O 스레드에서 실행됩니다. 콜백 내부에서 DB 저장이나 외부 API 호출처럼 무거운 작업을 수행하면 I/O 스레드가 지연되어 전체 처리량이 저하됩니다. 무거운 후속 작업이 필요한 경우에는 다음에 소개할 CompletableFuture 패턴으로 스레드를 분리하는 것이 좋습니다.

 


5-4. CompletableFuture로 래핑

Java 표준 Future.get() 블로킹만 지원합니다. 콜백을 CompletableFuture로 감싸면 비동기 체이닝과 예외 처리를 더 유연하게 구성할 수 있습니다.

public CompletableFuture<RecordMetadata> sendAsync(
        KafkaProducer<String, String> producer,
        ProducerRecord<String, String> record) {

    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            future.completeExceptionally(exception); // 반드시 호출 — 누락 시 Future가 영원히 pending
        } else {
            future.complete(metadata);
        }
    });

    return future;
}
// 사용 예
sendAsync(producer, record)
    .thenAcceptAsync(meta -> log.info("오프셋: {}", meta.offset()), executor) // I/O 스레드 부하 분산
    .exceptionally(ex -> { log.error("전송 실패", ex); return null; });

주의: 콜백은 카프카 I/O 스레드에서 실행되므로, 후속 작업이 무겁다면 반드시 .thenAcceptAsync(task, executor) 형태로 별도 스레드풀을 지정해야 합니다.

 


6. 헤더

헤더는 메시지 본문(값)을 수정하지 않고도 메타데이터를 함께 전달하는 수단입니다. HTTP 헤더와 유사한 개념으로, 트레이싱 ID, 출처 서비스명, 이벤트 타입 등을 담는 데 활용합니다.

// 프로듀서: 헤더 추가
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
record.headers().add("trace-id",       "abc-123".getBytes(StandardCharsets.UTF_8));
record.headers().add("source-service", "order-service".getBytes(StandardCharsets.UTF_8));
producer.send(record);
// 컨슈머: 헤더 읽기 (ConsumerRecord는 poll()로 수신한 레코드)
for (Header header : consumerRecord.headers()) {
    System.out.println(header.key() + " = " + new String(header.value(), StandardCharsets.UTF_8));
}

헤더 값은 바이트 배열로 저장되므로, 읽을 때는 프로듀서가 인코딩한 방식과 동일한 Charset으로 디코딩해야 합니다.

 


7. 시리얼라이저

카프카는 모든 메시지를 바이트 배열로 저장합니다. 시리얼라이저는 Java 객체를 바이트 배열로 변환하는 역할을 담당합니다.

 

7-1. 기본 제공 시리얼라이저

클래스 대상 타입
StringSerializer String
IntegerSerializer Integer
LongSerializer Long
ByteArraySerializer byte[] (이미 직렬화된 데이터)

 

7-2. 커스텀 시리얼라이저

기본 타입 외의 객체를 전송하려면 Serializer<T> 인터페이스를 구현합니다. 필수 구현 메서드는 serialize() 하나이며, configure()close()는 필요한 경우에만 오버라이드합니다.

public class Order {
    private String orderId;
    private int amount;
    // getter, constructor 생략
}
public class OrderSerializer implements Serializer<Order> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, Order data) {
        if (data == null) return null;
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Order 직렬화 실패", e);
        }
    }
    // configure(), close()는 default 구현이 있으므로 필요 시에만 오버라이드
}
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class.getName());

실무 팁: 커스텀 시리얼라이저는 스키마 변경에 취약합니다. 필드 추가·삭제 시 프로듀서와 컨슈머를 동시에 배포해야 하는 문제가 생깁니다. 다수의 컨슈머가 붙거나 스키마 변경이 잦다면 Apache Avro + Schema Registry 조합을 고려하는 것이 좋습니다.

 


8. 프로듀서 종료

프로듀서는 사용 후 반드시 종료해야 내부 버퍼와 네트워크 연결이 정리됩니다.

producer.flush(); // RecordAccumulator에 남은 메시지를 모두 브로커로 전송
producer.close(); // 네트워크 연결 해제 및 리소스 반환 (내부적으로 flush() 선행)

close()만 호출해도 내부적으로 flush()가 먼저 실행되지만, 명시적으로 분리해두면 "전송 완료 후 종료"라는 의도가 코드에 드러납니다.

 

실무에서는 try-with-resources를 활용해 종료를 누락하지 않도록 하는 것이 좋습니다.

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(record);
} // 블록 종료 시 close() 자동 호출

 


정리

주제 핵심
구성 요소 Serializer → Partitioner → RecordAccumulator → Sender(I/O 스레드)
전송 방식 Fire and Forget / 동기(.get()) / 비동기(콜백) — CompletableFuture 래핑은 콜백의 확장 패턴
헤더 메시지 본문 외 메타데이터 전달 수단. 컨슈머에서 consumerRecord.headers()로 읽음
시리얼라이저 기본 타입은 제공 클래스 사용, 커스텀 객체는 직접 구현 or Avro + Schema Registry 검토
종료 flush()close() 또는 try-with-resources 활용

다음 편에서는 프로듀서 동작에 직접 영향을 미치는 주요 설정(acks, retries, batch.size 등), 파티션 전략, 인터셉터, 쿼터/스로틀링을 다룹니다.