使用github.com/Shopify/sarama
来链接Kafka消费消息,现在有一个问题:方法KfkInitConsumerGroup
、ConsumeMessages
都是在项目启动时执行的,并且只执行一次,
需求是每一个游戏作为一个Topic,当有新的Topic出现时,程序需要自动订阅,并且消费。
若没有新的主题出现,那么就一切正常,当有新的主题出现时,旧的主题、新的主题数据都不会被消费出来了,这是什么原因啊?
祝各位大佬工作顺利,事业一帆风顺、写代码没有Bug
代码如下:
KfkInitConsumerGroup:
type Consumer struct {
brokers []string
groupID string
topics []string
stopSignal chan struct{}
client sarama.Client
config *sarama.Config
consumer sarama.ConsumerGroup
timeExec time.Duration // 超时时间
mutex sync.Mutex // 互斥锁
ctx context.Context
cancel context.CancelFunc
messageLimit int // 每分钟获取的消息数量限制
}
// KfkInitConsumerGroup initializes the consumer group.
func KfkInitConsumerGroup(brokers []string, groupID string, messageLimit int, timeExec time.Duration) (*Consumer, error) {
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatal(err)
}
consumer := &Consumer{
brokers: brokers,
groupID: groupID,
config: config,
timeExec: timeExec,
messageLimit: messageLimit,
}
// 根据client创建consumerGroup
fromConsumer, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil {
log.Fatalf("Error creating consumer group client: %v", err)
}
consumer.client = client
consumer.consumer = fromConsumer
consumer.stopSignal = make(chan struct{})
return consumer, nil
}
ConsumeMessages代码:
// ConsumeMessages starts consuming messages.
func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {
messageMap := make(map[string]map[string][][]byte)
messageCount := 0 // 消息计数器
maxMessages := c.messageLimit // 每分钟获取的消息数量限制
c.ctx, c.cancel = context.WithCancel(context.Background())
// 设置消费者处理函数
handlerFunc := &consumerHandler{
handler: handler,
messageMap: messageMap,
maxMessages: maxMessages,
timeExec: c.timeExec,
messageCount: &messageCount,
}
// 创建一个等待组,用于等待消费者和信号监听器完成
wg := sync.WaitGroup{}
// 在启动消费者之前初始化主题列表
topics, err := c.client.Topics()
if err != nil {
log.Fatalf("Error getting topics: %v", err)
}
filteredTopics := filterTopics(topics)
c.mutex.Lock()
c.topics = filteredTopics
c.mutex.Unlock()
// 启动单独的goroutine定期获取主题列表
go c.refreshTopics(handlerFunc)
// 启动消费者协程
wg.Add(1)
go func() {
defer wg.Done()
c.mutex.Lock()
topics := c.topics // 获取当前的主题列表
c.mutex.Unlock()
log.Printf("Start Topics: %v", topics)
if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil {
fmt.Printf("Error from consumer: %v\n", err)
}
}()
// 等待消费者和信号监听器完成
wg.Wait()
}
refreshTopics:
func (c *Consumer) refreshTopics(handlerFunc *consumerHandler) {
var consumerShutdown sync.WaitGroup
ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表
for {
select {
case <-ticker.C:
// 刷新元数据缓存
if err := c.client.RefreshMetadata(); err != nil {
log.Printf("Error refreshing metadata: %v", err)
continue
}
topics, err := c.client.Topics()
if err != nil {
log.Printf("Error refreshing topics: %v", err)
continue
}
filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets"
log.Printf("All Topics: %v", c.topics)
// 主题列表发生了变化
if !util.EqualSlices(filteredTopics, c.topics) {
c.mutex.Lock()
c.topics = filteredTopics // 更新主题列表
c.mutex.Unlock()
c.cancel()
consumerShutdown.Wait()
go func() {
consumerShutdown.Add(1) // 增加计数器
log.Printf("New Topics: %v", c.topics)
// 创建一个新的消费者和consumerGroupSession
c.ctx, c.cancel = context.WithCancel(context.Background())
consumer, err := sarama.NewConsumerGroupFromClient(c.groupID, c.client)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
c.consumer = consumer
if err := c.consumer.Consume(c.ctx, c.topics, handlerFunc); err != nil {
fmt.Printf("Error from consumer: %v\n", err)
}
consumerShutdown.Done() // 减少计数器
}()
}
case <-c.stopSignal:
ticker.Stop()
return
}
}
}
// 过滤掉内部主题 "__consumer_offsets"
func filterTopics(topics []string) []string {
filteredTopics := make([]string, 0)
for _, topic := range topics {
if topic != "__consumer_offsets" {
filteredTopics = append(filteredTopics, topic)
}
}
return filteredTopics
}
消费逻辑:
// consumerHandler implements sarama.ConsumerGroupHandler
type consumerHandler struct {
messageCount *int
maxMessages int
mutex sync.Mutex // 互斥锁
timeExec time.Duration // 超时时间
ticker *time.Ticker // 定时器
topicPattern *regexp.Regexp
messageMap map[string]map[string][][]byte
handler func(map[string]map[string][][]byte) error
}
// Setup is called when the consumer group session is established.
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
fmt.Println("第一步!!!")
return nil
}
// Cleanup is called when the consumer group session is terminated.
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
fmt.Println("关闭消费者")
return nil
}
// ConsumeClaim is called when a new claim is received.
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
if message != nil {
fmt.Println(message.Topic)
}
}
}
}