Synchronous auto-commit of offsets {topic-2=OffsetAndMetadata{offset=96, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
위와 같은 오류가 발생해서 찾아보니 다음과 같다.
Kafka에서 CommitFailedException이 나오는 유형은 아래와 같다.
consumer 로직의 처리시간이 max.poll.interval.ms보다 클 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
session.timeout.ms시간동안 heartbeat가 오지 않았을 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
1. consumer 로직의 처리시간이 max.poll.interval.ms보다 클 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
ConsumerConfig 설정 방법 아래 방식대로 consume 로직 처리시간 변경가능.
@Bean
public ConsumerFactory<String, JsonSerializable> consumerFactory() {
...
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000); // 5초로 설정
...
}
위의 로직이 완료되고 commit될 때 CommitFailedException이 발생한다.
max.poll.interval.ms 는 기본 5분으로 알고 있다.
This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
2. session.timeout.ms시간동안 heartbean가 오지 않았을 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
heartbeat 메시지는 consumer에서 3초(기본값)에 한번씩 주기적으로 broker에 날린다. 메시지 내용을 확인하려면 logger에 아래 내용을 추가한다.
acks는 acknowledgments의 약자로 사전에서 찾아 보면 "승인", 확인 프로듀서가 메시지를 보내고 그 메시지를 카프카가 잘 받았는지 확인을 할 것인지 또는 확인을 하지 않을 것인지를 결정하는 옵션
acks 옵션
OPTION
손실율
속도
DESCRIPTION
acks = 0
상
상
프로듀서는 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않는다.
acks = 1
중
중
프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader가 메시지를 받았는지 기다립니다. follower들은 확인하지 않습니다. leader가 확인응답을 보내고, follower에게 복제가 되기 전에 leader가 fail되면, 해당 메시지는 손실될 수 있다.
acks = all(-1)
하
하
프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader와 follower(replicas)까지 받았는지 기다립니다. 최소 하나의 복제본까지 처리된 것을 확인하므로 메시지가 손실될 확률은 거의 없다.
acks=0
단순 메트릭 정보와 같이 메세지 손실이 어느정도 눈감아지는 상황인 경우 사용.
acks=1
producer 가 kafka 에 데이터 전송 -> leader broker 는 쓰기요청에 대해 producer 에게 respond 를 보낸다. 그리고 데이터를 topic 에 쓴다.
leader 에게 응답받지 못한 producer 는 다시 쓰기 요청을 재시도 하고, leader 의 replica 들도 제대로 메시지를 받았는지 확인할 수 없다.
replica 들이 제대로 leader 의 topic/partition 을 제대로 복제 못한 상태에서 leader 가 다운되면 메시지가 손실 될 수 있다.
acks = all
leader 뿐 아니라 replicas 들도 ack 를 보내야 한다.
그만큼 지연시간은 늘어나지만 데이터 손실이 되지 않는다. 하나의 데이터도 손실이 용납되지 않는 경우 해당 옵션 사용 해야 한다.
acks가 all 인 경우 min.insync.replicas 옵션을 고려해야 한다.
min.insync.replicas=2 는 적어도 2개의 브로커가 데이터를 받았다는 ack응답을 보내야 한다는 뜻.
replication factor=3, min.insync.replicas=2, acks=all 인 경우 -> 모든 브로커에게 ack를 받지 못해도 min.insync.replicas 가 2이기때문에 1개가 장애나도 프로세스는 유지된다. 2개가 문제인 경우 producer 는 NOT_ENOUGH_REPLICAS 라는 예외를 받는다.
retry
NOT_ENOUGH_REPLICAS 과 같은 일시적 장애가 생긴경우 다시 시도하는 횟수.
kafka 2.1 버젼 이상부터는 retries 가 2147483647 번으로 기본값 설정됨.
retry.backoff.ms 옵션의 기본값은 100ms 이다.
delivery.timeout.ms
위 옵션이 2분인 경우 producer 는 acks 를 받지 못한 메시지에 대해 2분동안 요청 retry 를 한다.
위 시간동안 ack 를 받지 못하면 fail 이다.
retries 가 일어나는 동안 메시지의 순서는 보장되지 않는다.
key 기반 ordering 을 원하는 경우 문제가 발생한다.
이 경우 product request 가 병렬적으로 실행되도록 조절하는 옵션인
max.in.flight.requests.per.connection 의 값을 조절하면 된다.
max.in.flight.requests.per.connection
Default: 5
순서를 보장하기 위해서는 1로 설정해야하지만, 처리량은 낮아질 수 있다.
acks 를 보내는 과정에서 네트워크 에러가 발생하는 경우 request 가 중복이 되는 경우가 발생할수 있다.
kafka 0.11 버젼부터는 Idempotent request 라는 설정이 가능하다.
produce request 에 id 가 부여되기 때문에 broker 단에서 중복되는 reqeust 를 알아채고 중복을 방지한다.
Batching
max.in.flight.requests.per.connection = 5 의 경우 동시에 5건의 메시지가 개별적으로 전달된다.
linger.ms -> produce 가 batch 를 보내기 전에 기다리는 시간 (ms)
값을 올릴수록 해당 초까지 기다렸다가 같이 보낸다.하지만 해당 시간 전에 batch.size 만큼 메시지가 차면 바로 배치로 보낸다.
batch.size
하나의 배치안 에 넣을 수 있는 최대 바이트 수.
배치 사이즈를 늘리면 요청을 보낼때 압축률, 처리량, 효율성에 이점을 볼 수 있다.
배치 사이즈보다 더 큰 사이즈의 메시지의 경우 배치로 처리되지 못한다.
하나의 배치는 파티션별로 할당되기 때문에 너무 높은 값으로 정하면 메모리 부족이 생길 수 있다.
memory
broker가 메시지를 처리하는 속도보다 producer 가 더 빠르게 메시지를 보내는 경우 해당 레코드는 잠시 producer 메모리에 buffer 된다.
buffer.memory
기본값은 32MB, send buffer 의 사이즈다.
이 버퍼는 시간이 지남에 따라 계속 차고, 브로커에 더 빠르게 메시지를 보낼 수 있게 되면 다시 내려간다.
버퍼가 가득 차면 send 메소드는 block 된다. 데이터를 못보내고 그냥 대기하게 된다.