kafka SocketTimeoutException问题

kafka生成者和消费者突然报SocketTimeoutException异常,导致数据写入和消费失败。

配置
安装的kafka版本kafka_2.10-0.10.2.0
客户端版本

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.1</version>
</dependency>

kafka集群一共有3个节点,175,176,177

配置文件:


broker.id=176

listeners=PLAINTEXT://10.17.24.176:9092


advertised.listeners=PLAINTEXT://10.17.24.176:9092


num.network.threads=3


num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600



log.dirs=/data/logs/kafka


num.partitions=6

num.recovery.threads.per.data.dir=1


log.retention.hours=72

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=10.17.24.175:2181,10.17.24.176:2181,10.17.24.177:2181

zookeeper.connection.timeout.ms=6000

idefault.replication.factor=2
auto.create.topics.enable=true
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

topic信息:

Topic:sms    PartitionCount:6    ReplicationFactor:2    Configs:
    Topic: sms    Partition: 0    Leader: 176    Replicas: 176,177    Isr: 176
    Topic: sms    Partition: 1    Leader: 177    Replicas: 177,175    Isr: 177
    Topic: sms    Partition: 2    Leader: 175    Replicas: 175,176    Isr: 175
    Topic: sms    Partition: 3    Leader: 176    Replicas: 176,175    Isr: 176
    Topic: sms    Partition: 4    Leader: 177    Replicas: 177,176    Isr: 177
    Topic: sms    Partition: 5    Leader: 175    Replicas: 175,177    Isr: 175

使用

2个生产者,一个消费者

消费者错误log:

sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602964627]-[WARN] Fetching topic metadata with correlation id 2 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092] failed
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

生产者log:

2017-06-12 10:46:52[qtp958382397-99:591474422]-[WARN] Failed to send producer request with correlation id 234660 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
    at scala.collection.Iterator$class.foreach(Iterator.scala:772)
    at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)

kafka服务器端server.log
[2017-06-12 10:45:51,425] INFO Rolled new log segment for 'sms-3' in 2 ms. (kafka.log.Log)

服务从上周1开始跑,跑了一周都正常,没有问题,今天突然出问题了,topic的生产和消费都无法正常进行了。观察生成者,看了相关的scala代码,但是看不出什么有用的信息,就是连接超时。观察kafka集群情况,在出问题前十秒做了一次 Rolled new log。这个对我生成和写入有影响吗?不能理解,求大神指点。超时时间我没做显示配置,消费端默认超时时间30S,写入端默认是10S,应该都是够用的,ping kafka各个节点也都能ping通。
图片描述

阅读 7.9k
1 个回答
✓ 已被采纳新手上路,请多包涵

可能kafka acl鉴权机制配的不对, 我没有配置jaas文件,后期去掉鉴权配置后再没有出问题了。

有需要配置的可以参考该博客:
http://www.cnblogs.com/huxi2b...

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进