반응형
스프링 부트에서 카프카 클라이언트 라이브러리를 추가하면, 이런오류가 생긴다.
Error registering AppInfo mbean
해당오류는 카프카 컨슈머 측에서만 발생한다.
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=clientid-0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:606)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at kr.co.lunasoft.productdb.ProductDbBotApplication.main(ProductDbBotApplication.java:14)
이런 경우는 clientId 등록의 문제다.
이대로 어플리케이션을 올려도 상관은 없다.
하지만 오류가 뜨는걸 별로 보고싶지는 않기 떄문에 안뜨게 해주려면 카프카 리슨을 하고 있는 각각의 컨슈머에 client id 를 명시해줘야 한다.
카프카 토픽이 여러개 있을텐데 모두 같은 clientid 를 사용할때 다음과 같은 오류로그가 발생한다.
아래 코드와 같이 clientIdPrefix 로 해결
카프카컨슈머 - KafkaListener
@KafkaListener(topics = {"topic-test"}, containerFactory = "KafkaListenerContainerFactory", clientIdPrefix = "test-topic-client")
public void consumer(ConsumerRecord<String, Object> consumerRecord) {
log.info("카프카 컨슈머")
}
카프카컨테이너 팩토리 - ConsumerFactory
@Slf4j
@Configuration
public class KafkaConsumerConfig {
@Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
List<String> bootstrapAddress;
@Value("${spring.kafka.consumer.group-id}")
String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
String autoOffsetReset;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
참고로 카프카 컨테이너 를 위와같이 구성해서 쓴다.
또한 스프링부트 2.3.# 버젼 코드이다.
반응형
'Spring > spring boot 및 기타' 카테고리의 다른 글
Jasypt 암호화 - spring 설정파일 암호화하기 (0) | 2021.01.28 |
---|---|
spring boot 1.X 와 2.X 차이점 (0) | 2021.01.08 |
[SPRING] 몽고 DB @Field 없을때 오류 org.springframework.data.mapping.PropertyReferenceException: No property log found for type (0) | 2020.10.26 |
[SPRING] 스프링 스케쥴링 추가 @Scheduled (0) | 2020.09.27 |
vscode 로 스프링부트 실행하기 (0) | 2020.09.18 |