반응형

acks 란

acks는 acknowledgments의 약자로 사전에서 찾아 보면 "승인", 확인
프로듀서가 메시지를 보내고 그 메시지를 카프카가 잘 받았는지 확인을 할 것인지 또는 확인을 하지 않을 것인지를 결정하는 옵션

acks 옵션

OPTION 손실율 속도 DESCRIPTION
acks =  0 프로듀서는 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않는다.
acks  = 1 프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader가 메시지를 받았는지 기다립니다. follower들은 확인하지 않습니다. leader가 확인응답을 보내고, follower에게 복제가 되기 전에 leader가 fail되면, 해당 메시지는 손실될 수 있다.
acks  = all(-1) 프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader와 follower(replicas)까지 받았는지 기다립니다. 최소 하나의 복제본까지 처리된 것을 확인하므로 메시지가 손실될 확률은 거의 없다.

acks=0 

단순 메트릭 정보와 같이 메세지 손실이 어느정도 눈감아지는 상황인 경우 사용.

 

acks=1

producer 가 kafka 에 데이터 전송 -> leader broker 는 쓰기요청에 대해 producer 에게 respond 를 보낸다. 그리고 데이터를 topic 에 쓴다.

leader 에게 응답받지 못한 producer 는 다시 쓰기 요청을 재시도 하고, leader 의 replica 들도 제대로 메시지를 받았는지 확인할 수 없다.

replica 들이 제대로 leader 의 topic/partition 을 제대로 복제 못한 상태에서 leader 가 다운되면 메시지가 손실 될 수 있다.

 

acks = all

leader 뿐 아니라 replicas 들도 ack 를 보내야 한다.

그만큼 지연시간은 늘어나지만 데이터 손실이 되지 않는다. 하나의 데이터도 손실이 용납되지 않는 경우 해당 옵션 사용 해야 한다.

 

acks가 all 인 경우 min.insync.replicas 옵션을 고려해야 한다.

min.insync.replicas=2 는 적어도 2개의 브로커가 데이터를 받았다는 ack응답을 보내야 한다는 뜻.

replication factor=3, min.insync.replicas=2, acks=all 인 경우 -> 모든 브로커에게 ack를 받지 못해도 min.insync.replicas 가 2이기때문에 1개가 장애나도 프로세스는 유지된다. 2개가 문제인 경우 producer 는 NOT_ENOUGH_REPLICAS 라는 예외를 받는다.

 

retry

NOT_ENOUGH_REPLICAS 과 같은 일시적 장애가 생긴경우 다시 시도하는 횟수.

kafka 2.1 버젼 이상부터는 retries 가 2147483647 번으로 기본값 설정됨.

retry.backoff.ms 옵션의 기본값은 100ms 이다.

delivery.timeout.ms

위 옵션이 2분인 경우 producer 는 acks 를 받지 못한 메시지에 대해 2분동안 요청 retry 를 한다.

위 시간동안 ack 를 받지 못하면 fail 이다.

 

retries 가 일어나는 동안 메시지의 순서는 보장되지 않는다.

key 기반 ordering 을 원하는 경우 문제가 발생한다.

이 경우 product request 가 병렬적으로 실행되도록 조절하는 옵션인 

max.in.flight.requests.per.connection 의 값을 조절하면 된다.

 

max.in.flight.requests.per.connection 

Default: 5

순서를 보장하기 위해서는 1로 설정해야하지만, 처리량은 낮아질 수 있다.

 

acks 를 보내는 과정에서 네트워크 에러가 발생하는 경우 request 가 중복이 되는 경우가 발생할수 있다.

kafka 0.11 버젼부터는 Idempotent request 라는 설정이 가능하다.

produce request 에 id 가 부여되기 때문에 broker 단에서 중복되는 reqeust 를 알아채고 중복을 방지한다.


Batching

max.in.flight.requests.per.connection = 5 의 경우 동시에 5건의 메시지가 개별적으로 전달된다. 

linger.ms -> produce 가 batch 를 보내기 전에 기다리는 시간 (ms)

값을 올릴수록 해당 초까지 기다렸다가 같이 보낸다.하지만 해당 시간 전에 batch.size 만큼 메시지가 차면 바로 배치로 보낸다.

 

batch.size 

하나의 배치안 에 넣을 수 있는 최대 바이트 수.

배치 사이즈를 늘리면 요청을 보낼때 압축률, 처리량, 효율성에 이점을 볼 수 있다. 

배치 사이즈보다 더 큰 사이즈의 메시지의 경우 배치로 처리되지 못한다.

하나의 배치는 파티션별로 할당되기 때문에 너무 높은 값으로 정하면 메모리 부족이 생길 수 있다.

 

memory

 

broker가 메시지를 처리하는 속도보다 producer 가 더 빠르게 메시지를 보내는 경우 해당 레코드는 잠시 producer 메모리에 buffer 된다.

buffer.memory 

기본값은 32MB, send buffer 의 사이즈다.

이 버퍼는 시간이 지남에 따라 계속 차고, 브로커에 더 빠르게 메시지를 보낼 수 있게 되면 다시 내려간다.

버퍼가 가득 차면 send 메소드는 block 된다. 데이터를 못보내고 그냥 대기하게 된다.

 

max.block.ms

send() 메소드가 예외를 던지기까지 block 되는 시간이다.

예외를 던지는 경우 

1. producer 버퍼 메모리가 꽉 찬 경우.

2. broker 가 전혀 새로운 데이터를 받지 못한 경우.

3. max.block.ms 시간이 다 지난 경우.

 


https://www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-producer-acks/

 

Kafka 운영자가 말하는 Producer ACKS | Popit

이번에는 메시지를 보내는 프로듀서에 대해 설명하도록 하겠습니다. 프로듀서가 메시지를 보낼때, 몇가지의 옵션들을 선택하여 보낼 수 있습니다. 프로듀서의 여러가지 옵션들중에서, 저는 가

www.popit.kr

https://4betterme.tistory.com/168

 

[Kafka] Producer 관련 주요 옵션

📌 kafka 및 Confluent 를 공부하며 정리하는 글 Producer Configurations  Idempotent Producer acks=0 (no acks) producer는 메세지만 보낼 뿐, 해당 메세지가 broker단에 제대로 전달되었는지 확인하지 않는..

4betterme.tistory.com

 

반응형

'DB > Kafka' 카테고리의 다른 글

[KAFKA] CommitFailedException  (1) 2022.12.28
반응형

local 에서 docker 를 이용햇 kafka 서버를 구축하자

 

docker search kafka 를 통해 검색해 보면 

wurstmeister/kafka  가 스타가 제일 많다.

https://hub.docker.com/r/wurstmeister/kafka -- 여기에서 kafka docker 관련 옵션 설명 확인가능하다.

 

kafka in docker 구조

도커 카프카 네트워킹에 대한 설명 레퍼런스이다.

github.com/wurstmeister/kafka-docker/wiki/Connectivity 

docker 로 kafka 를 올리면 다음과 같은 구조가 된다.

multiple 카프카 브로커로 설정도 가능하다.

이런식으로 구성해주면 

다음과 같이 9092 단일포트로 브로커가 구성된다.

 

 

단일 카프카 브로커일때는 

# KAFKA_ADVERTISED_HOST_NAME 옵션을 127.0.0.1 로 하겠지만 multiple 로 할때는 외부에서 접근 가능한 ip 를 넣어야 한다.

 

docker-compose-yml 작성

version: '3.3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

KAFKA_ADVERTISED_HOST_NAME는 본인의 docker host ip로 수정

multi broker를 사용하지 않고 로컬용이므로, 127.0.0.1로 사용

 

kafka 확인

카프카 툴로 접속 확인

또는 docker 로 접속 후 kafka-topics.sh 를 실행하여 확인가능하다.

# docker container exec -it kafka bash
# kafka-topics.sh --list --bootstrap-server localhost:9092 

kafka 도커에 접속 후 kafka 토픽 리스트를 조회할 수 있다.

 

카프카 공식 문서

kafka.apache.org/quickstart

반응형
반응형

스프링 부트에서 카프카 클라이언트 라이브러리를 추가하면, 이런오류가 생긴다.

Error registering AppInfo mbean

해당오류는 카프카 컨슈머 측에서만 발생한다.

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=clientid-0

    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)

    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241)

    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:606)

    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302)

    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)

    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)

    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)

    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)

    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)

    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)

    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)

    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)

    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)

    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)

    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)

    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)

    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)

    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)

    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)

    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)

    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)

    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)

    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)

    at kr.co.lunasoft.productdb.ProductDbBotApplication.main(ProductDbBotApplication.java:14)

이런 경우는 clientId 등록의 문제다.

이대로 어플리케이션을 올려도 상관은 없다.

하지만 오류가 뜨는걸 별로 보고싶지는 않기 떄문에 안뜨게 해주려면 카프카 리슨을 하고 있는 각각의 컨슈머에 client id 를 명시해줘야 한다.

카프카 토픽이 여러개 있을텐데 모두 같은 clientid 를 사용할때 다음과 같은 오류로그가 발생한다.

 

아래 코드와 같이 clientIdPrefix 로 해결

카프카컨슈머 - KafkaListener

    @KafkaListener(topics = {"topic-test"}, containerFactory = "KafkaListenerContainerFactory", clientIdPrefix = "test-topic-client")
    public void consumer(ConsumerRecord<String, Object> consumerRecord) {
        log.info("카프카 컨슈머")
    }

카프카컨테이너 팩토리 - ConsumerFactory

@Slf4j
@Configuration
public class KafkaConsumerConfig {

    @Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
    List<String> bootstrapAddress;
    @Value("${spring.kafka.consumer.group-id}")
    String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    String autoOffsetReset;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

참고로 카프카 컨테이너 를 위와같이 구성해서 쓴다.

또한 스프링부트 2.3.# 버젼 코드이다.

반응형

+ Recent posts