Go使用Kafka动态获取Topic问题?

使用github.com/Shopify/sarama来链接Kafka消费消息,现在有一个问题:方法KfkInitConsumerGroupConsumeMessages都是在项目启动时执行的,并且只执行一次,
需求是每一个游戏作为一个Topic,当有新的Topic出现时,程序需要自动订阅,并且消费。

若没有新的主题出现,那么就一切正常,当有新的主题出现时,旧的主题、新的主题数据都不会被消费出来了,这是什么原因啊?

祝各位大佬工作顺利,事业一帆风顺、写代码没有Bug

代码如下:

KfkInitConsumerGroup:


type Consumer struct {
    brokers      []string
    groupID      string
    topics       []string
    stopSignal   chan struct{}
    client       sarama.Client
    config       *sarama.Config
    consumer     sarama.ConsumerGroup
    timeExec     time.Duration // 超时时间
    mutex        sync.Mutex    // 互斥锁
    ctx          context.Context
    cancel       context.CancelFunc
    messageLimit int // 每分钟获取的消息数量限制
}

// KfkInitConsumerGroup initializes the consumer group.
func KfkInitConsumerGroup(brokers []string, groupID string, messageLimit int, timeExec time.Duration) (*Consumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = true
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

    client, err := sarama.NewClient(brokers, config)
    if err != nil {
        log.Fatal(err)
    }

    consumer := &Consumer{
        brokers:      brokers,
        groupID:      groupID,
        config:       config,
        timeExec:     timeExec,
        messageLimit: messageLimit,
    }

    // 根据client创建consumerGroup
    fromConsumer, err := sarama.NewConsumerGroupFromClient(groupID, client)
    if err != nil {
        log.Fatalf("Error creating consumer group client: %v", err)
    }

    consumer.client = client
    consumer.consumer = fromConsumer
    consumer.stopSignal = make(chan struct{})
    return consumer, nil
}

ConsumeMessages代码:

// ConsumeMessages starts consuming messages.
func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {
    
    messageMap := make(map[string]map[string][][]byte)
    messageCount := 0             // 消息计数器
    maxMessages := c.messageLimit // 每分钟获取的消息数量限制

    c.ctx, c.cancel = context.WithCancel(context.Background())

    // 设置消费者处理函数
    handlerFunc := &consumerHandler{
        handler:      handler,
        messageMap:   messageMap,
        maxMessages:  maxMessages,
        timeExec:     c.timeExec,
        messageCount: &messageCount,
    }

    // 创建一个等待组,用于等待消费者和信号监听器完成
    wg := sync.WaitGroup{}

    // 在启动消费者之前初始化主题列表
    topics, err := c.client.Topics()
    if err != nil {
        log.Fatalf("Error getting topics: %v", err)
    }
    filteredTopics := filterTopics(topics)
    c.mutex.Lock()
    c.topics = filteredTopics
    c.mutex.Unlock()

    // 启动单独的goroutine定期获取主题列表
    go c.refreshTopics(handlerFunc)

    // 启动消费者协程
    wg.Add(1)
    go func() {
        defer wg.Done()

        c.mutex.Lock()
        topics := c.topics // 获取当前的主题列表
        c.mutex.Unlock()

        log.Printf("Start Topics: %v", topics)

        if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil {
            fmt.Printf("Error from consumer: %v\n", err)
        }
    }()
    

    // 等待消费者和信号监听器完成
    wg.Wait()
}

refreshTopics:

func (c *Consumer) refreshTopics(handlerFunc *consumerHandler) {

    var consumerShutdown sync.WaitGroup

    ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表

    for {
        select {
        case <-ticker.C:

            // 刷新元数据缓存
            if err := c.client.RefreshMetadata(); err != nil {
                log.Printf("Error refreshing metadata: %v", err)
                continue
            }

            topics, err := c.client.Topics()
            if err != nil {
                log.Printf("Error refreshing topics: %v", err)
                continue
            }

            filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets"
            log.Printf("All Topics: %v", c.topics)

            // 主题列表发生了变化
            if !util.EqualSlices(filteredTopics, c.topics) {

                c.mutex.Lock()
                c.topics = filteredTopics // 更新主题列表
                c.mutex.Unlock()

                c.cancel()

                consumerShutdown.Wait()
                go func() {

                    consumerShutdown.Add(1) // 增加计数器

                    log.Printf("New Topics: %v", c.topics)

                    // 创建一个新的消费者和consumerGroupSession
                    c.ctx, c.cancel = context.WithCancel(context.Background())
                    consumer, err := sarama.NewConsumerGroupFromClient(c.groupID, c.client)
                    if err != nil {
                        log.Fatalf("Error creating consumer group: %v", err)
                    }
                    c.consumer = consumer

                    if err := c.consumer.Consume(c.ctx, c.topics, handlerFunc); err != nil {
                        fmt.Printf("Error from consumer: %v\n", err)
                    }

                    consumerShutdown.Done() // 减少计数器
                }()
            }
        case <-c.stopSignal:
            ticker.Stop()
            return
        }
    }
}

// 过滤掉内部主题 "__consumer_offsets"
func filterTopics(topics []string) []string {
    filteredTopics := make([]string, 0)
    for _, topic := range topics {
        if topic != "__consumer_offsets" {
            filteredTopics = append(filteredTopics, topic)
        }
    }
    return filteredTopics
}

消费逻辑:


// consumerHandler implements sarama.ConsumerGroupHandler
type consumerHandler struct {
    messageCount *int
    maxMessages  int
    mutex        sync.Mutex    // 互斥锁
    timeExec     time.Duration // 超时时间
    ticker       *time.Ticker  // 定时器
    topicPattern *regexp.Regexp
    messageMap   map[string]map[string][][]byte
    handler      func(map[string]map[string][][]byte) error
}

// Setup is called when the consumer group session is established.
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    fmt.Println("第一步!!!")
    return nil
}

// Cleanup is called when the consumer group session is terminated.
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {

    fmt.Println("关闭消费者")
    return nil
}

// ConsumeClaim is called when a new claim is received.
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {

        case message := <-claim.Messages():
            
            if message != nil {

                fmt.Println(message.Topic)
            }
        }
    }
}
阅读 2.6k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题