kafka生成者和消费者突然报SocketTimeoutException异常,导致数据写入和消费失败。
配置
安装的kafka版本kafka_2.10-0.10.2.0
客户端版本
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.1</version>
</dependency>
kafka集群一共有3个节点,175,176,177
配置文件:
broker.id=176
listeners=PLAINTEXT://10.17.24.176:9092
advertised.listeners=PLAINTEXT://10.17.24.176:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/logs/kafka
num.partitions=6
num.recovery.threads.per.data.dir=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.17.24.175:2181,10.17.24.176:2181,10.17.24.177:2181
zookeeper.connection.timeout.ms=6000
idefault.replication.factor=2
auto.create.topics.enable=true
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
topic信息:
Topic:sms PartitionCount:6 ReplicationFactor:2 Configs:
Topic: sms Partition: 0 Leader: 176 Replicas: 176,177 Isr: 176
Topic: sms Partition: 1 Leader: 177 Replicas: 177,175 Isr: 177
Topic: sms Partition: 2 Leader: 175 Replicas: 175,176 Isr: 175
Topic: sms Partition: 3 Leader: 176 Replicas: 176,175 Isr: 176
Topic: sms Partition: 4 Leader: 177 Replicas: 177,176 Isr: 177
Topic: sms Partition: 5 Leader: 175 Replicas: 175,177 Isr: 175
使用
2个生产者,一个消费者
消费者错误log:
sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602964627]-[WARN] Fetching topic metadata with correlation id 2 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092] failed
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
生产者log:
2017-06-12 10:46:52[qtp958382397-99:591474422]-[WARN] Failed to send producer request with correlation id 234660 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
kafka服务器端server.log[2017-06-12 10:45:51,425] INFO Rolled new log segment for 'sms-3' in 2 ms. (kafka.log.Log)
服务从上周1开始跑,跑了一周都正常,没有问题,今天突然出问题了,topic的生产和消费都无法正常进行了。观察生成者,看了相关的scala代码,但是看不出什么有用的信息,就是连接超时。观察kafka集群情况,在出问题前十秒做了一次 Rolled new log。这个对我生成和写入有影响吗?不能理解,求大神指点。超时时间我没做显示配置,消费端默认超时时间30S,写入端默认是10S,应该都是够用的,ping kafka各个节点也都能ping通。
可能kafka acl鉴权机制配的不对, 我没有配置jaas文件,后期去掉鉴权配置后再没有出问题了。
有需要配置的可以参考该博客:
http://www.cnblogs.com/huxi2b...