ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [SPRING] kafka Error registering AppInfo mbean - 스프링부트 카프카
    Spring/spring boot 및 기타 2020. 11. 3. 22:19

    스프링 부트에서 카프카 클라이언트 라이브러리를 추가하면, 이런오류가 생긴다.

    Error registering AppInfo mbean

    해당오류는 카프카 컨슈머 측에서만 발생한다.

    javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=clientid-0
    
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
    
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816)
    
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340)
    
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308)
    
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293)
    
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267)
    
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241)
    
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:606)
    
        at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302)
    
        at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    
        at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
    
        at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    
        at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    
        at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    
        at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    
        at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
    
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
    
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
    
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
    
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
    
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
    
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    
        at kr.co.lunasoft.productdb.ProductDbBotApplication.main(ProductDbBotApplication.java:14)

    이런 경우는 clientId 등록의 문제다.

    이대로 어플리케이션을 올려도 상관은 없다.

    하지만 오류가 뜨는걸 별로 보고싶지는 않기 떄문에 안뜨게 해주려면 카프카 리슨을 하고 있는 각각의 컨슈머에 client id 를 명시해줘야 한다.

    카프카 토픽이 여러개 있을텐데 모두 같은 clientid 를 사용할때 다음과 같은 오류로그가 발생한다.

     

    아래 코드와 같이 clientIdPrefix 로 해결

    카프카컨슈머 - KafkaListener

        @KafkaListener(topics = {"topic-test"}, containerFactory = "KafkaListenerContainerFactory", clientIdPrefix = "test-topic-client")
        public void consumer(ConsumerRecord<String, Object> consumerRecord) {
            log.info("카프카 컨슈머")
        }

    카프카컨테이너 팩토리 - ConsumerFactory

    @Slf4j
    @Configuration
    public class KafkaConsumerConfig {
    
        @Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
        List<String> bootstrapAddress;
        @Value("${spring.kafka.consumer.group-id}")
        String groupId;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        String autoOffsetReset;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean("kafkaListenerContainerFactory")
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
    }

    참고로 카프카 컨테이너 를 위와같이 구성해서 쓴다.

    또한 스프링부트 2.3.# 버젼 코드이다.

    반응형

    댓글

Designed by Tistory.