consumer
代码如下,其他还有一部分为mvc的代码
package com.settlement.kafka;
import org.springframework.kafka.annotation.KafkaListener;
public class Consumer {
@KafkaListener(topics = {"${kafka.topic}"})
public void consumer(String msg){
System.out.println(msg);
}
}
package com.settlement.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-server}")
private String bootstrapServers;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(5000);
return factory;
}
@Bean
public ConsumerFactory<String,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String,Object> consumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//自行控制提交offset
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");//提交延迟毫秒数
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");//执行超时时间
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"dsafas");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//开始消费位置
return propsMap;
}
@Bean
public Consumer listener(){
return new Consumer();
}
}
Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2018-02-26 20:47:26.256 ERROR 47792 --- [ main] o.s.b.SpringApplication : Application startup failed
java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:463) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(MessagingMessageListenerAdapter.java:106) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.<init>(RecordMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListenerInstance(MethodKafkaListenerEndpoint.java:172) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:374) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:359) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:246) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:49) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:243) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at
settlement.Application.main(Application.java:12) [classes/:?]
2018-02-26 20:47:26.261 INFO 47792 --- [ main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
2018-02-26 20:47:26.263 WARN 47792 --- [ main] ationConfigEmbeddedWebApplicationContext : Exception thrown from LifecycleProcessor on context close
java.lang.IllegalStateException: LifecycleProcessor not initialized - call 'refresh' before invoking lifecycle methods via the context: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7d4f9aae: startup date [Mon Feb 26 20:47:20 CST 2018]; root of context hierarchy
at org.springframework.context.support.AbstractApplicationContext.getLifecycleProcessor(AbstractApplicationContext.java:427) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:999) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:958) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:750) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at settlement.Application.main(Application.java:12) [classes/:?]
2018-02-26 20:47:26.264 INFO 47792 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
Process finished with exit code 1
java.lang.NoSuchMethodError on Kafka Consumer with spring-kafka 2.1.0 and SpringBoot 1.5.9