请教下golang goroutine泄露问题?

新手上路,请多包涵

golang新手, 最近在写个练手的小项目, 需求是收集指定日志目录下最新的日志文件, 通过正则过滤日志内容, 把想要的信息存到一个map里.

但是这个程序在实际运行中我发现, 由于在catLog包里读取文件内容时里面我必须使用for死循环实现实时读取文件, 导致在getNewFile包里监控到的系统事件信息无法发送给catLog包,这样如果有新的日志文件产生, 它无法切换文件. 于是我在getNewFile包调用catLog包时启用goroutine, 这样就可以收到通知了.

结果导致每次读取文件新内容时, 程序都会启动一个新的goroutine调用catLog包, 并且不会自动退出,因为catLog包里是for死循环, 一直在累积增加.

请大佬们帮忙看下下面的代码, 该怎么修改可以控制goroutine增长, 并且还能实时监控最新的日志文件.


package getNewFile
package getNewFile

import (
    "fmt"
    "log"
    "regexp"
    catlog "study/prometheus/exporter_watcherLog/catLog"
    "time"

    "github.com/fsnotify/fsnotify"
)

// 实时监控指定目录下文件并读取
func GetNewFile(pattren, logDir string) {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        fmt.Println(err)
        return
    }
    defer watcher.Close()

    done := make(chan bool)

    re := regexp.MustCompile(pattren) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会panic
    go func() {
        for {
            select {
            case event := <-watcher.Events:
                isMatch := re.MatchString(event.Name)
                if isMatch {
                    if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作
                    fmt.Println(event.Name)
                    go catlog.DealLog(event.Name)
                    }

                    if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作
                    fmt.Println(event.Name)
                    go catlog.DealLog(event.Name)
                    }
                }

            case err := <-watcher.Errors:
                log.Println(err)
            }
            time.Sleep(time.Second * 1)
        }
    }()

    err = watcher.Add(logDir) // 监控指定目录
    if err != nil {
        log.Fatal(err)
    }
    <-done
}
package catlog:
package catlog

import (
    "fmt"
    "regexp"
    "runtime"
    "time"

    "sync"

    "github.com/hpcloud/tail"
)

var AllAddrLoglist map[string]ClassLogInfo
var Sm sync.Map
var Notify = false

type ClassLogInfo struct {
    Host string
    Time string
    Date string
    Cfc  string
}

// 实时读取文件内容, 并筛选出host, date, time, cfc存入map中
func DealLog(file string) {
    config := tail.Config{
        ReOpen:    true,                                 // 重新打开
        Follow:    true,                                 // 是否跟随
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件哪个地方开始读
        MustExist: false,                                // 文件不存在报错
        Poll:      true,
        // Logger:    tail.DiscardingLogger, // 禁用日志记录
    }
    tails, err := tail.TailFile(file, config) // 打开文件, 并用上面的配置
    if err != nil {
        fmt.Printf("tail file failed, err: %v\n", err)
        return
    }
    var (
        line *tail.Line
        ok   bool
    )

    ch := make(chan struct{}, 1) // 创建缓冲区大小, 控制并发, 最多发送x个消息就阻塞

    // 正则匹配, 最后只筛选出host;date;time;cfc, 其余的跳过.
    pattern := `(^[0-9]+/[0-9]+/[0-9]+)\s+([0-9]+:[0-9]+:[0-9]+:[0-9]+)\s+[A-Z]+\s+\[.*\]\s+INFO\s+LSG.*LSG\s+svr\s+update:\s+cfc=([0-9]+),cpfc=[0-9]+,clc=[0-9]+,cplc=[0-9]+,load=[0-9]+,status=[0-9]+,addr=(\d+\.\d+\.\d+\.\d+-\d),inst=\d+\.\d+\.\d+\.\d+-\d`
    // re := regexp.MustCompile(pattern) // 两个唯一区别, 如错误直接panic
    re, err := regexp.Compile(pattern) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会返回一个错误
    if err != nil {
        fmt.Println(err)
    }

    // 读取每行数据,最后写入到AllAddrLoglist中.
    for {
        ch <- struct{}{} // 写入消息到缓冲区

        line, ok = <-tails.Lines
        if !ok {
            fmt.Printf("tail file close reopen, filename: %s\n", tails.Filename)
            time.Sleep(time.Second)
            continue
        }
        matchArr := re.FindStringSubmatch(line.Text)
        if matchArr != nil { // 新增判断,替代re.MatchString(line.Text),原有的影响性能
            // 信息存入结构体
            testI := ClassLogInfo{
                Host: matchArr[4],
                Date: matchArr[1],
                Time: matchArr[2],
                Cfc:  matchArr[3],
            }
            // 利用sync.map, 直接使用, 无需声明, 避免读写同一个map冲突
            Sm.Store(matchArr[4], testI)
            <-ch // 从缓冲区读取消息
        }
    }
}
阅读 1.3k
2 个回答
新手上路,请多包涵

实现思路就不对吧。文件每次变化都去创建新协程,而新协程又不会退出,协程数自然是越来越多了。这里为了达到提取日志特殊内容的目的,应该只watch文件创建事件(还要预先处理目录内原来就存在的文件,watch到新文件创建时大概要做去重判断),针对每个日志文件创建协程不断去读取(阻塞读),如果文件有新内容,自然会被读取到。

当然日志文件可能会被移除(因为要压缩等原因),所以可能还需要处理移除事件,去关闭协程(通过关闭文件句柄的方式)。

看上去是你想用tail包对每个新创建的日志文件进行持续追踪,因此你想对每个新日志文件生成一个协程进行监控。

但是你的代码并不是这样做的,你不仅对新文件生成协程,对文件被写入又生成协程。日志每次被写入就会生成新协程,然后多个协程反反复复写入同一条日志。

if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作
    go catlog.DealLog(event.Name)

另外,这个控制语句是无意义的。你调用catlog.DealLog时是调用一个“静态方法”,每次调用都会生成新的独立的ch,不存在控制并发的功能。而且你也不需要在此控制并发,因为你已经用sync.Map控制并发写入了,而跟踪文件又是互相独立的。

    for {
        ch <- struct{}{} // 写入消息到缓冲区
    ...
            <-ch // 从缓冲区读取消息

还有如上面回答所说,你没处理文件移除的事件,linux中已经打开的文件会一直在内存和硬盘inode中,程序本身无法通过文件本身知晓文件是否被删了,被删除的日志会被一直追踪。你得在tail的监听循环中通过路径检测文件是否还存在,或是储存每个tail对像,在主协程中停止跟踪。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题