반응형

스프링 부트에서 카프카 클라이언트 라이브러리를 추가하면, 이런오류가 생긴다.

Error registering AppInfo mbean

해당오류는 카프카 컨슈머 측에서만 발생한다.

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=clientid-0

    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)

    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)

    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267)

    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241)

    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:606)

    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302)

    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)

    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)

    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)

    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)

    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)

    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)

    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)

    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)

    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)

    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)

    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)

    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)

    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)

    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)

    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)

    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)

    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)

    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)

    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)

    at kr.co.lunasoft.productdb.ProductDbBotApplication.main(ProductDbBotApplication.java:14)

이런 경우는 clientId 등록의 문제다.

이대로 어플리케이션을 올려도 상관은 없다.

하지만 오류가 뜨는걸 별로 보고싶지는 않기 떄문에 안뜨게 해주려면 카프카 리슨을 하고 있는 각각의 컨슈머에 client id 를 명시해줘야 한다.

카프카 토픽이 여러개 있을텐데 모두 같은 clientid 를 사용할때 다음과 같은 오류로그가 발생한다.

 

아래 코드와 같이 clientIdPrefix 로 해결

카프카컨슈머 - KafkaListener

    @KafkaListener(topics = {"topic-test"}, containerFactory = "KafkaListenerContainerFactory", clientIdPrefix = "test-topic-client")
    public void consumer(ConsumerRecord<String, Object> consumerRecord) {
        log.info("카프카 컨슈머")
    }

카프카컨테이너 팩토리 - ConsumerFactory

@Slf4j
@Configuration
public class KafkaConsumerConfig {

    @Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
    List<String> bootstrapAddress;
    @Value("${spring.kafka.consumer.group-id}")
    String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    String autoOffsetReset;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

참고로 카프카 컨테이너 를 위와같이 구성해서 쓴다.

또한 스프링부트 2.3.# 버젼 코드이다.

반응형

+ Recent posts