背景:不同的应用需要把不同的数据写入不同的表,所以规定了结构:
Topic
是应用ID、key
是表名、value
是具体数据
主要需要实现的功能是:
1.到达指定条数时,执行批量插入数据库,未达到指定条数时,视为超时,不论多少条都执行插入
2.监听退出信号,有退出信号时,把map中的数据返回插入到数据库
遇到的问题:
项目启动会初始化执行KfkInitConsumerGroup、ConsumeMessages
,因为这两个只会执行一次,所以订阅的Topic就不会更新了,当有新的Topic写入时,这个新的Topic不能被及时的订阅,不能正常消费了,求各位大佬指点怎么修改代码,能让Topic自动更新呢?让新的Topic也会被消费呢?下边是我的代码实现
目前使用的是 github.com/Shopify/sarama
库,具体实现代码:
package kfk
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"regexp"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
)
type Consumer struct {
brokers []string
groupID string
topics []string
stopSignal chan struct{}
client sarama.Client
config *sarama.Config
consumer sarama.ConsumerGroup
timeExec time.Duration // 超时时间
mutex sync.Mutex // 互斥锁
messageLimit int // 每分钟获取的消息数量限制
}
// KfkInitConsumerGroup initializes the consumer group.
func KfkInitConsumerGroup(brokers []string, groupID string, messageLimit int, timeExec time.Duration) (*Consumer, error) {
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatal(err)
}
consumer := &Consumer{
brokers: brokers,
groupID: groupID,
config: config,
timeExec: timeExec,
messageLimit: messageLimit,
}
// 根据client创建consumerGroup
fromConsumer, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil {
log.Fatalf("Error creating consumer group client: %v", err)
}
consumer.client = client
consumer.consumer = fromConsumer
consumer.stopSignal = make(chan struct{})
return consumer, nil
}
// ConsumeMessages starts consuming messages.
func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {
// 设置捕捉中断信号的通道
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
messageMap := make(map[string]map[string][][]byte)
messageCount := 0 // 消息计数器
maxMessages := c.messageLimit // 每分钟获取的消息数量限制
//设置消费者处理函数
handlerFunc := &consumerHandler{
handler: handler,
messageMap: messageMap,
maxMessages: maxMessages,
timeExec: c.timeExec,
messageCount: &messageCount,
}
// 创建一个等待组,用于等待消费者和信号监听器完成
wg := sync.WaitGroup{}
// 在启动消费者之前初始化主题列表
topics, err := c.client.Topics()
if err != nil {
log.Fatalf("Error getting topics: %v", err)
}
filteredTopics := filterTopics(topics)
c.mutex.Lock()
c.topics = filteredTopics
c.mutex.Unlock()
// 启动单独的goroutine定期获取主题列表
go c.refreshTopics()
// 启动消费者协程
wg.Add(1)
go func() {
defer wg.Done()
c.mutex.Lock()
topics := c.topics // 获取当前的主题列表
c.mutex.Unlock()
log.Printf("Start Topics: %v", topics)
if err := c.consumer.Consume(context.Background(), topics, handlerFunc); err != nil {
fmt.Printf("Error from consumer: %v\n", err)
}
}()
// 启动信号监听器协程
wg.Add(1)
go func() {
defer wg.Done()
// 创建一个信号通道
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 阻塞等待信号
<-sigChan
// 发送停止信号给消费者协程
close(c.stopSignal)
err := handlerFunc.handler(handlerFunc.messageMap)
if err != nil {
}
// 退出程序
os.Exit(0)
}()
// 等待消费者和信号监听器完成
wg.Wait()
}
func (c *Consumer) refreshTopics() {
ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表
for {
select {
case <-ticker.C:
topics, err := c.client.Topics()
if err != nil {
log.Printf("Error refreshing topics: %v", err)
continue
}
filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets"
log.Printf("All Topics: %v", filteredTopics)
c.mutex.Lock()
c.topics = filteredTopics // 更新主题列表
c.mutex.Unlock()
case <-c.stopSignal:
ticker.Stop()
return
}
}
}
// 过滤掉内部主题 "__consumer_offsets"
func filterTopics(topics []string) []string {
filteredTopics := make([]string, 0)
for _, topic := range topics {
if topic != "__consumer_offsets" {
filteredTopics = append(filteredTopics, topic)
}
}
return filteredTopics
}
// consumerHandler implements sarama.ConsumerGroupHandler
type consumerHandler struct {
messageCount *int
maxMessages int
mutex sync.Mutex // 互斥锁
timeExec time.Duration // 超时时间
ticker *time.Ticker // 定时器
topicPattern *regexp.Regexp
messageMap map[string]map[string][][]byte
handler func(map[string]map[string][][]byte) error
}
// Setup is called when the consumer group session is established.
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup is called when the consumer group session is terminated.
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim is called when a new claim is received.
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
h.ticker = time.NewTicker(h.timeExec)
defer h.ticker.Stop()
for {
select {
case <-h.ticker.C:
h.mutex.Lock()
if len(h.messageMap) > 0 {
// 每30秒触发一次定时器
go func(messageMap map[string]map[string][][]byte) {
err := h.handler(messageMap)
if err != nil {
// 错误处理,根据实际情况进行操作
}
}(h.messageMap)
*h.messageCount = 0 // 重置计数器
h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap
}
h.mutex.Unlock()
case message := <-claim.Messages():
h.mutex.Lock() // 加锁
topicMap := h.messageMap[message.Topic]
if topicMap == nil {
topicMap = make(map[string][][]byte)
h.messageMap[message.Topic] = topicMap
}
keySlice := topicMap[string(message.Key)]
keySlice = append(keySlice, message.Value)
topicMap[string(message.Key)] = keySlice
h.mutex.Unlock() // 解锁
session.MarkMessage(message, "") // 标记消息为已消费
h.mutex.Lock()
// 计算数量
*h.messageCount++
if *h.messageCount >= h.maxMessages {
go func(messageMap map[string]map[string][][]byte) {
err := h.handler(messageMap)
if err != nil {
// 错误处理,根据实际情况进行操作
}
}(h.messageMap)
*h.messageCount = 0 // 重置计数器
h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap
}
h.mutex.Unlock()
}
}
}
------------------------------分割线------------------------------
按照一楼大哥的方式改了改,大概代码如下:
ConsumeMessages
// ConsumeMessages starts consuming messages.
func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {
// ···
c.ctx, c.cancel = context.WithCancel(context.Background())
//设置消费者处理函数
handlerFunc := &consumerHandler{
handler: handler,
messageMap: messageMap,
maxMessages: maxMessages,
timeExec: c.timeExec,
messageCount: &messageCount,
}
// 创建一个等待组,用于等待消费者和信号监听器完成
wg := sync.WaitGroup{}
// 在启动消费者之前初始化主题列表
topics, err := c.client.Topics()
if err != nil {
log.Fatalf("Error getting topics: %v", err)
}
filteredTopics := filterTopics(topics)
c.mutex.Lock()
c.topics = filteredTopics
c.mutex.Unlock()
// 启动单独的goroutine定期获取主题列表
go c.refreshTopics(handlerFunc)
// 启动消费者协程
wg.Add(1)
go func() {
defer wg.Done()
c.mutex.Lock()
topics := c.topics // 获取当前的主题列表
c.mutex.Unlock()
log.Printf("Start Topics: %v", topics)
if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil {
fmt.Printf("Error from consumer: %v\n", err)
}
}()
// ···
// 等待消费者和信号监听器完成
wg.Wait()
}
refreshTopics
func (c *Consumer) refreshTopics(handlerFunc *consumerHandler) {
var consumerShutdown sync.WaitGroup
ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表
for {
select {
case <-ticker.C:
// 刷新元数据缓存
if err := c.client.RefreshMetadata(); err != nil {
log.Printf("Error refreshing metadata: %v", err)
continue
}
topics, err := c.client.Topics()
if err != nil {
log.Printf("Error refreshing topics: %v", err)
continue
}
filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets"
log.Printf("All Topics: %v", c.topics)
// 主题列表发生了变化
if !util.EqualSlices(filteredTopics, c.topics) {
c.mutex.Lock()
c.topics = filteredTopics // 更新主题列表
c.mutex.Unlock()
consumerShutdown.Wait()
// 关闭旧的消费者
c.cancel()
// 创建一个新的消费者
c.ctx, c.cancel = context.WithCancel(context.Background())
go func() {
consumerShutdown.Add(1) // 增加计数器
log.Printf("New Topics: %v", c.topics)
if err := c.consumer.Consume(c.ctx, c.topics, handlerFunc); err != nil {
fmt.Printf("Error from consumer: %v\n", err)
}
consumerShutdown.Done() // 减少计数器
}()
}
case <-c.stopSignal:
ticker.Stop()
return
}
}
}
其他
// 过滤掉内部主题 "__consumer_offsets"
func filterTopics(topics []string) []string {
filteredTopics := make([]string, 0)
for _, topic := range topics {
if topic != "__consumer_offsets" {
filteredTopics = append(filteredTopics, topic)
}
}
return filteredTopics
}
// consumerHandler implements sarama.ConsumerGroupHandler
type consumerHandler struct {
messageCount *int
maxMessages int
mutex sync.Mutex // 互斥锁
timeExec time.Duration // 超时时间
ticker *time.Ticker // 定时器
topicPattern *regexp.Regexp
messageMap map[string]map[string][][]byte
handler func(map[string]map[string][][]byte) error
}
// Setup is called when the consumer group session is established.
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup is called when the consumer group session is terminated.
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
fmt.Println("关闭消费者")
return nil
}
// ConsumeClaim is called when a new claim is received.
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
h.ticker = time.NewTicker(h.timeExec)
defer h.ticker.Stop()
for {
select {
case <-h.ticker.C:
h.mutex.Lock()
if len(h.messageMap) > 0 {
fmt.Println(fmt.Sprintf("%s 超时,执行插入 %d", util.ZTime(""), *h.messageCount))
// 每30秒触发一次定时器
go func(messageMap map[string]map[string][][]byte) {
err := h.handler(messageMap)
if err != nil {
// 错误处理,根据实际情况进行操作
}
}(h.messageMap)
*h.messageCount = 0 // 重置计数器
h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap
}
h.mutex.Unlock()
case message := <-claim.Messages():
h.mutex.Lock() // 加锁
if message != nil {
topicMap := h.messageMap[message.Topic]
if topicMap == nil {
topicMap = make(map[string][][]byte)
h.messageMap[message.Topic] = topicMap
}
keySlice := topicMap[string(message.Key)]
keySlice = append(keySlice, message.Value)
topicMap[string(message.Key)] = keySlice
h.mutex.Unlock() // 解锁
session.MarkMessage(message, "") // 标记消息为已消费
h.mutex.Lock()
// 计算数量
*h.messageCount++
if *h.messageCount >= h.maxMessages {
fmt.Println(fmt.Sprintf("%s 到达阈值 %d,执行插入", util.ZTime(""), *h.messageCount))
go func(messageMap map[string]map[string][][]byte) {
err := h.handler(messageMap)
if err != nil {
// 错误处理,根据实际情况进行操作
}
}(h.messageMap)
*h.messageCount = 0 // 重置计数器
h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap
}
h.mutex.Unlock()
}
}
}
}
修改之后可以正常关闭旧的消费者,启动新的消费者也没有报错,但是ConsumeClaim
中的message := <-claim.Messages()
获取到的是nil,也就是message = nil
,随后程序结束,请各位大佬帮帮忙解答一下,十分感谢