flume elasticsearch sink 启动失败

flume 版本1.7, elasticsearch 版本5.2, 启动失败

配置:

aent.sources = s1
agent.channels = c1
agent.sinks = k1

agent.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.s1.channels = c1
agent.sources.s1.batchSize = 5000
agent.sources.s1.batchDurationMillis = 2000
agent.sources.s1.kafka.bootstrap.servers =xxx.xxx.xxx:9092
agent.sources.s1.kafka.topics = access_log
agent.sources.s1.kafka.consumer.group.id = access_log_es

agent.sinks.k1.type = elasticsearch
agent.sinks.k1.hostNames = xxx.xxx.xxx:9200
agent.sinks.k1.indexName = logstash
agent.sinks.k1.indexType = access_log
agent.sinks.k1.clusterName = elasticsearch-log
agent.sinks.k1.batchSize = 100
agent.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
agent.sinks.k1.channel = c1

agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 500

错误信息:

2017-04-16 16:33:55,768 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@495acd28 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:141)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:358)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

问问大家有没有遇到过, 怎么解决的. many thx

阅读 6.8k
1 个回答

将Elasticsearch中lib下的jar包导入到Flume的lib下

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进