카프카 오류
Synchronous auto-commit of offsets {topic-2=OffsetAndMetadata{offset=96, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
위와 같은 오류가 발생해서 찾아보니 다음과 같다.
Kafka에서 CommitFailedException이 나오는 유형은 아래와 같다.
consumer 로직의 처리시간이 max.poll.interval.ms보다 클 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
session.timeout.ms시간동안 heartbeat가 오지 않았을 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
1. consumer 로직의 처리시간이 max.poll.interval.ms보다 클 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
ConsumerConfig 설정 방법
아래 방식대로 consume 로직 처리시간 변경가능.
@Bean
public ConsumerFactory<String, JsonSerializable> consumerFactory() {
...
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000); // 5초로 설정
...
}
위의 로직이 완료되고 commit될 때 CommitFailedException이 발생한다.
max.poll.interval.ms 는 기본 5분으로 알고 있다.
This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
2. session.timeout.ms시간동안 heartbean가 오지 않았을 경우 리밸런싱으로 인한 컨슈머 그룹에서 제외되었을 경우
heartbeat 메시지는 consumer에서 3초(기본값)에 한번씩 주기적으로 broker에 날린다.
메시지 내용을 확인하려면 logger에 아래 내용을 추가한다.
logging:
level:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: debug
subscriber의 로직에 breakpoint를 걸면 heartbeat 메시지가 날라가지 않으므로 10초 후에 리밸런싱이 되고 컨슈머 그룹에서 제외된다.
그때 로직이 수행되고 커밋하려면 CommitFailedException이 발생한다.
'DB > Kafka' 카테고리의 다른 글
[KAFKA] 카프카 producer configuration (0) | 2022.07.20 |
---|