在springboot 中使用kafka 进行消费,启动失败报错

consumer
代码如下,其他还有一部分为mvc的代码

package com.settlement.kafka;


import org.springframework.kafka.annotation.KafkaListener;

public class Consumer {
    @KafkaListener(topics = {"${kafka.topic}"})
    public void consumer(String msg){
        System.out.println(msg);
    }
}
package com.settlement.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.bootstrap-server}")
    private String bootstrapServers;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(5000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String,Object> consumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//自行控制提交offset
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");//提交延迟毫秒数
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");//执行超时时间
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"dsafas");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//开始消费位置
        return propsMap;
    }

    @Bean
    public Consumer listener(){
        return new Consumer();
    }
}
Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2018-02-26 20:47:26.256 ERROR 47792 --- [           main] o.s.b.SpringApplication                  : Application startup failed

java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:463) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(MessagingMessageListenerAdapter.java:106) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.<init>(RecordMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListenerInstance(MethodKafkaListenerEndpoint.java:172) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:374) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:359) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:246) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:49) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:243) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at 
settlement.Application.main(Application.java:12) [classes/:?]

2018-02-26 20:47:26.261  INFO 47792 --- [           main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
2018-02-26 20:47:26.263  WARN 47792 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception thrown from LifecycleProcessor on context close

java.lang.IllegalStateException: LifecycleProcessor not initialized - call 'refresh' before invoking lifecycle methods via the context: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
    at org.springframework.context.support.AbstractApplicationContext.getLifecycleProcessor(AbstractApplicationContext.java:427) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:999) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:958) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:750) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
    at settlement.Application.main(Application.java:12) [classes/:?]

2018-02-26 20:47:26.264  INFO 47792 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown

Process finished with exit code 1
阅读 13.2k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题