go sarama client无法消费?

写了一段代码,能正常生产消息,但无法消费,还有这个默认clientID警告如何去掉啊。
代码如下

    package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "sync"

    "github.com/Shopify/sarama"
)

var (
    wg     sync.WaitGroup
    logger = log.New(os.Stderr, "[sarama]", log.LstdFlags)
)

func main() {

    //生产消息
    sarama.Logger = logger

    config := sarama.NewConfig()
    config.ClientID = "saramaId"
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    msg := &sarama.ProducerMessage{}
    msg.Topic = "topic.ops.falcon"
    msg.Partition = int32(-1)
    msg.Key = sarama.StringEncoder("key")
    msg.Value = sarama.ByteEncoder("hello,world")

    producer, err := sarama.NewSyncProducer(strings.Split("hx-kafka-broker-1.test.org:9092", ","), config)
    if err != nil {
        logger.Println("Failed to produce message: %s", err)
        os.Exit(500)
    }
    defer producer.Close()

    partition, offset, err := producer.SendMessage(msg)

    if err != nil {
        logger.Println("Failed to produce message: ", err)
    }
    logger.Printf("partition=%d, offset=%d\n", partition, offset)

    //消费
    //  sarama.Logger = logger
    consumer, err := sarama.NewConsumer(strings.Split("hx-kafka-broker-1.test.org:9092", ","), nil)
    if err != nil {
        logger.Println("Failed to start consumer:%s", err)
    }

    partitionList, err := consumer.Partitions("topic.ops.falcon")
    if err != nil {
        logger.Println("Failed to get the list of partitions: ", err)
    }

    for partition := range partitionList {
        pc, err := consumer.ConsumePartition("topic.ops.falcon", int32(partition), sarama.OffsetNewest)
        if err != nil {
            logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
        }
        defer pc.AsyncClose()

        wg.Add(1)

        //      fmt.Println("abc")
        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                fmt.Println("123")
            }
        }(pc)
    }
    wg.Wait()
    logger.Println("Done consuming topic hello")
    consumer.Close()
}
阅读 7.4k
3 个回答
新手上路,请多包涵

你解决了么?

不能在一个函数中同时生产消息,然后又消费,否则,消费协议会出错。只能单独生产,或单独消费。

新手上路,请多包涵

把参数 sarama.OffsetNewest修改为sarama.OffsetOldest你再试下看能不能消费!

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题