flume interceptor 设置host不生效

新手上路,请多包涵

使用flume-ng传输日志信息,用均衡负载的方式将agent数据推送到collector,数据流向如图:
图片描述

未了区分不同的agent发送的数据,我在collector的config文件中设置host interceptor,其中一个agent的config文件如下:

#name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.value = flume_test

#define sinkgroups
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=round_robin

#define the sink 1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=10.0.3.82
a1.sinks.k1.port=5150

#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=10.0.3.83
a1.sinks.k2.port=5150


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1

collector1的config文件:

collector1.sources = r1
collector1.channels = c1 c2
collector1.sinks = k1 k2

# Describe the source
collector1.sources.r1.type = avro
collector1.sources.r1.port = 5150
collector1.sources.r1.bind = 0.0.0.0
collector1.sources.r1.channels = c1 c2
collector1.sources.r1.interceptors = i2
collector1.sources.r1.interceptors.i2.type = host
collector1.sources.r1.interceptors.i2.hostHeader = agentHost

# Describe channels c1 c2 which buffers events in memory
collector1.channels.c1.type = file
collector1.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint
collector1.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data

collector1.channels.c2.type = memory
collector1.channels.c2.capacity = 1000
collector1.channels.c2.transactionCapacity = 100

# Describe the sink k1 to hadoop
collector1.sinks.k1.type = hdfs
collector1.sinks.k1.channel = c1
collector1.sinks.k1.hdfs.path = /quantone/flume/%{agentHost}
collector1.sinks.k1.hdfs.fileType = DataStream
collector1.sinks.k1.hdfs.writeFormat = TEXT
collector1.sinks.k1.hdfs.rollInterval = 300
collector1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%{agentHost}
collector1.sinks.k1.hdfs.round = true
collector1.sinks.k1.hdfs.roundValue = 5
collector1.sinks.k1.hdfs.roundUnit = minute
collector1.sinks.k1.hdfs.useLocalTimeStamp = true

# Describe the sink k2 to kafka
collector1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k2.channel = c2
collector1.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector1.sinks.k2.requiredAcks = 1
collector1.sinks.k2.batchSize = 20

collector2的config文件:

collector2.sources = r1
collector2.channels = c1 c2
collector2.sinks = k1 k2

# Describe the source
collector2.sources.r1.type = avro
collector2.sources.r1.port = 5150
collector2.sources.r1.bind = 0.0.0.0
collector2.sources.r1.channels = c1 c2
collector2.sources.r1.interceptors = i2
collector2.sources.r1.interceptors.i2.type = host
collector2.sources.r1.interceptors.i2.hostHeader = agentHost

# Describe channels c1 c2 which buffers events in memory
collector2.channels.c1.type = file
collector2.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint
collector2.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data

collector2.channels.c2.type = memory
collector2.channels.c2.capacity = 1000
collector2.channels.c2.transactionCapacity = 100

# Describe the sink k1 to hadoop
collector2.sinks.k1.type = hdfs
collector2.sinks.k1.channel = c1
collector2.sinks.k1.hdfs.path = /quantone/flume/%{agentHost}
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.hdfs.writeFormat = TEXT
collector2.sinks.k1.hdfs.rollInterval = 300
collector2.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%{agentHost}
collector2.sinks.k1.hdfs.round = true
collector2.sinks.k1.hdfs.roundValue = 5
collector2.sinks.k1.hdfs.roundUnit = minute
collector2.sinks.k1.hdfs.useLocalTimeStamp = true

# Describe the sink k2 to kafka
collector2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector2.sinks.k2.channel = c2
collector2.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector2.sinks.k2.requiredAcks = 1
collector2.sinks.k2.batchSize = 20

在collector1和collector2的config文件中,我使用%{agentHost} 在hdfs的路径和文件名中生成agent的ip地址,但是在最终生成的文件中和路径中这个字段都是空的。
有没有大侠指点下?谢谢!

阅读 3.7k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进