GO的Kafka的问题,Local: Queue full?

今天线上报错Local: Queue full,导致接口无法使用,请求各位大佬指教
目前使用go连接kafka的库使用的是github.com/confluentinc/confluent-kafka-go/v2/kafka

问题:在我连续像生产者写入110万条数据时,报错 Local: Queue full,具体代码如下:

package kfk

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    "strings"
    "time"
)

// 发送消息到指定主题
func SendMessage(broker, topic string, tableName string, message []byte) error {
    config := &kafka.ConfigMap{
        "bootstrap.servers":   strings.Join([]string{"localhost:9092"}, ","),
        "acks":                "1",
        "delivery.timeout.ms": 3000,
        "security.protocol":   "PLAINTEXT",
    }
    producer, err := kafka.NewProducer(config)
    if err != nil {
        
    }

    err = producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic: &topic,
        },
        Key:       []byte(tableName),
        Value:     message,
        Timestamp: time.Now(),
    }, nil)
    if err != nil {
        fmt.Println(err)
        return err
    }
    return nil
}

经过多次的测试,不管是不是进行消费,都是在110万条左右开始报错,并且不在之后的数据不能写入,但是重启服务就好了,分析原因:github.com/confluentinc/confluent-kafka-go 这个包使用了队列的概念,累计到110万时,超出了队列的缓存区,所以不能再继续写入,而重启服务则清空了这个缓冲区,让服务可用,但是再累计到110万时,服务还会不可用。

各位大佬,这是这个库的Bug么?是作者特意做的,还是我的配置不正确,可以通过什么配置可以解决呢?

有没有更好一点的kafka库呢? 其他库会有这种问题吗?

阅读 3.2k
1 个回答

这个库其实是包装了一下 c 的实现,最终其实这个报错应该也是 c 的库里返回的,并不是这个库本身的意思 详情见:https://github.com/confluentinc/librdkafka/blob/aa50e52a12bece3e399f69b8477fd0c8aadbfff1/src/rdkafka.c#L430

我就不追 C 的源码了,大胆来猜测一下,估计这个报错就是因为本地队列满了导致的(kafka 客户端在发送消息的时候,并不是收到之后马上就发送出去的,而是攒起来,一批一批发)。这样的话应该会有两种思路,一种是库里面本身支持了某个配置可以修改本地队列最大数量,或者 buffer 最大数量类似的参数;还有就是本身不提供这样的参数调整,通过前面的限流完成。

顺着这个思路去找一下文档,发现有
https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md

Compression
Producer message compression is enabled through the compression.codec configuration property.

Compression is performed on the batch of messages in the local queue, the larger the batch the higher likelyhood of a higher compression ratio. The local batch queue size is controlled through the batch.num.messages, batch.size, and linger.ms configuration properties as described in the High throughput chapter above.

估计就是 batch.num.messages 这个配置了,这个库没用过,所以不知道包装之后有没有这个配置,你可以找一下。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题