今天线上报错Local: Queue full,导致接口无法使用,请求各位大佬指教
目前使用go连接kafka的库使用的是github.com/confluentinc/confluent-kafka-go/v2/kafka
问题:在我连续像生产者写入110万条数据时,报错 Local: Queue full
,具体代码如下:
package kfk
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"strings"
"time"
)
// 发送消息到指定主题
func SendMessage(broker, topic string, tableName string, message []byte) error {
config := &kafka.ConfigMap{
"bootstrap.servers": strings.Join([]string{"localhost:9092"}, ","),
"acks": "1",
"delivery.timeout.ms": 3000,
"security.protocol": "PLAINTEXT",
}
producer, err := kafka.NewProducer(config)
if err != nil {
}
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
Key: []byte(tableName),
Value: message,
Timestamp: time.Now(),
}, nil)
if err != nil {
fmt.Println(err)
return err
}
return nil
}
经过多次的测试,不管是不是进行消费,都是在110万条左右开始报错,并且不在之后的数据不能写入,但是重启服务就好了,分析原因:github.com/confluentinc/confluent-kafka-go 这个包使用了队列的概念,累计到110万时,超出了队列的缓存区,所以不能再继续写入,而重启服务则清空了这个缓冲区,让服务可用,但是再累计到110万时,服务还会不可用。
各位大佬,这是这个库的Bug么?是作者特意做的,还是我的配置不正确,可以通过什么配置可以解决呢?
有没有更好一点的kafka库呢? 其他库会有这种问题吗?
这个库其实是包装了一下 c 的实现,最终其实这个报错应该也是 c 的库里返回的,并不是这个库本身的意思 详情见:https://github.com/confluentinc/librdkafka/blob/aa50e52a12bece3e399f69b8477fd0c8aadbfff1/src/rdkafka.c#L430
我就不追 C 的源码了,大胆来猜测一下,估计这个报错就是因为本地队列满了导致的(kafka 客户端在发送消息的时候,并不是收到之后马上就发送出去的,而是攒起来,一批一批发)。这样的话应该会有两种思路,一种是库里面本身支持了某个配置可以修改本地队列最大数量,或者 buffer 最大数量类似的参数;还有就是本身不提供这样的参数调整,通过前面的限流完成。
顺着这个思路去找一下文档,发现有
https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md
估计就是 batch.num.messages 这个配置了,这个库没用过,所以不知道包装之后有没有这个配置,你可以找一下。