Golang 程序问题(协程数量控制)

问题描述

我试着用sync.WaitGroup限制协程数量,来写一个端口扫描器,但当协程数量过多的时候,扫描出的结果达不到预期。

如:

设置1000协程,可以扫出80,443,8000,3389
然而设置2000协程后,只能扫出80,443

问题出现的环境背景及自己尝试过哪些方法

在StackOverflow上问过这个问题,英文的回答不是很明白...

这是原回答

You do not wait for the two "non-worker" goroutines in main, so as soon as wg.Wait() there returns, the process shuts down, tearing down any outstanding goroutines.

Since one of them is processing the results, this appears to you as if not all the tasks were processed (and this is true).

以及另一个回答直接给出了优化后的代码:https://play.golang.org/p/_ZD...,在Windows下可行,Linux下一样不行。

相关代码

package main

import (
    "fmt"
    "net"
    "sync"
    "time"
)

type Job struct {
    host string
    port int
}

type Result struct {
    job    Job
    status bool
}

var jobs = make(chan Job)
var results = make(chan Result)

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        _, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", job.host, job.port), time.Millisecond*1500)
        if err != nil {
            results <- Result{job, false}
        } else {
            results <- Result{job, true}
        }
    }
    wg.Done()
}

const host = "127.0.0.1"

func main() {
    wg := sync.WaitGroup{}

    go func() {
        for i := 1; i < 65535; i++ {
            jobs <- Job{host, i}
        }
        close(jobs)
    }()

    go func() {
        for result := range results {
            if result.status {
                fmt.Println(result.job, "open")
            }
        }
    }()

    for i := 1; i < 4500; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
}

你期待的结果是什么?实际看到的错误信息又是什么?

我希望无论是Win还是Linux都能够获取到所有的端口开放情况。

在Windows中由于扫描速度不快,可以扫描出全部端口

{127.0.0.1 135} open
{127.0.0.1 443} open
{127.0.0.1 445} open
{127.0.0.1 808} open
{127.0.0.1 902} open
{127.0.0.1 912} open

Linux下,就达不到预期了

ubuntu@pc:~/Desktop$ go run port.go 
{127.0.0.1 80} open
{127.0.0.1 631} open
{127.0.0.1 3306} open
阅读 278
评论
    5 个回答

    你这个问题不是golang代码的问题,应该是Linux默认配置的问题:
    net.DialTimeout这个函数最终其实就是调用linux socket,而linux中任何东西都是文件,同时linux默认允许同时打开的文件数是1024,可以用如下命令查看:

    [root@aia-db /home/daik/test/src/scanport]# ulimit -a
    core file size          (blocks, -c) 0
    data seg size           (kbytes, -d) unlimited
    scheduling priority             (-e) 0
    file size               (blocks, -f) unlimited
    pending signals                 (-i) 3757
    max locked memory       (kbytes, -l) 64
    max memory size         (kbytes, -m) unlimited
    open files                      (-n) 1024
    pipe size            (512 bytes, -p) 8
    POSIX message queues     (bytes, -q) 819200
    real-time priority              (-r) 0
    stack size              (kbytes, -s) 8192
    cpu time               (seconds, -t) unlimited
    max user processes              (-u) 3757
    virtual memory          (kbytes, -v) unlimited
    file locks                      (-x) unlimited

    可以看到open files那一行,默认是1024,所以当你启动远远大于1024个协程时(如你代码中设置的4500),就有可能同时打开超过1024个文件,导致socket链接建立失败,
    我测试了下(注:我本地环境你用你的代码每次50051这个端口都打印不出来,所以选择这个端口做测试):

    [root@scanport]# go run main.go
    {127.0.0.1 25} open
    {127.0.0.1 139} open
    {127.0.0.1 111} open
    {127.0.0.1 445} open
    {127.0.0.1 22} open
    {127.0.0.1 631} open
    {127.0.0.1 9999} open
    {127.0.0.1 9998} open
    {127.0.0.1 9997} open
    I am port 50051, error: dial tcp 127.0.0.1:50051: socket: too many open files
    success is  9
    fail is  65526

    修改方法:
    1、降低协程数量,协程数量真不是越多越高效,要根据实际情况
    2、修改linux的配置限制,方法如下:

    [root@aia-db /home/daik/test/src/scanport]# ulimit -SHn 10000
    [root@aia-db /home/daik/test/src/scanport]# ulimit -n
    10000
    [root@aia-db /home/daik/test/src/scanport]# ulimit -a
    core file size          (blocks, -c) 0
    data seg size           (kbytes, -d) unlimited
    scheduling priority             (-e) 0
    file size               (blocks, -f) unlimited
    pending signals                 (-i) 3757
    max locked memory       (kbytes, -l) 64
    max memory size         (kbytes, -m) unlimited
    open files                      (-n) 10000
    pipe size            (512 bytes, -p) 8
    POSIX message queues     (bytes, -q) 819200
    real-time priority              (-r) 0
    stack size              (kbytes, -s) 8192
    cpu time               (seconds, -t) unlimited
    max user processes              (-u) 3757
    virtual memory          (kbytes, -v) unlimited
    file locks                      (-x) unlimited

    然后再测试你的代码应该OK了。

    我的测试代码:

    package main
    
    import (
        "fmt"
        "net"
        "time"
        "sync"
    )
    
    type Job struct {
        host string
        port int
    }
    
    type Result struct {
        job    Job
        status bool
    }
    
    var success uint32
    var fail uint32
    var successMux sync.Mutex
    var failMux sync.Mutex
    
    var jobs = make(chan Job)
    var results = make(chan Result)
    
    func worker(wg *sync.WaitGroup) {
        for job := range jobs {
            _, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", job.host, job.port), time.Millisecond*5500)
            if err != nil {
                failMux.Lock()
                fail++
                failMux.Unlock()
                if job.port == 50051 {
                    fmt.Println("I am port 50051, error:", err.Error())
                }
                results <- Result{job, false}
            } else {
                successMux.Lock()
                success++
                successMux.Unlock()
                results <- Result{job, true}
            }
        }
        wg.Done()
    }
    
    const host = "127.0.0.1"
    
    func main() {
        wg := sync.WaitGroup{}
    
        go func() {
            for i := 1; i <= 65535; i++ {
                jobs <- Job{host, i}
            }
            close(jobs)
        }()
    
        go func() {
            for result := range results {
                if result.status {
                    fmt.Println(result.job, "open")
                }
            }
        }()
    
        for i := 0; i < 4500; i++ {
            wg.Add(1)
            go worker(&wg)
        }
        wg.Wait()
    
        fmt.Println("success is ", success)
        fmt.Println("fail is ", fail)
    }
    

    另外,你文中提到的https://play.golang.org/p/_ZD...
    这个代码跟你的其实没有本质区别,应该也是linux的问题,所以才有在Windows和Linux上执行有不同结果的现象出现。

      • 2
      • 新人请关照

      是不是有可能因为链接过多 Linux自动丢弃了部分tcp请求 可以考虑非超时错误重试一次

        • 1.8k

        看起来,没有错误啊,是有哪些端口没有扫描到吗?

          • 106

          贴出来的代码是没有问题的,确定是用贴出来的代码执行的?

          我在Linux里面跑,把net.Dial逻辑去掉,直接传给results,输出都正常的

          代码里面两个channel都是无缓冲的,所以说,不存在main goroutine退出后,导致results很多结果没有来的及输出就退出了

            • 4
            • 新人请关照
            package main
            
            import (
                "fmt"
                "net"
                "sync"
                "time"
            )
            
            type job struct {
                host string
                port int
            }
            
            type result struct {
                jobData job
                status  bool
            }
            
            var (
                jobs    = make(chan job)
                results = make(chan result)
            )
            
            const HOST = "127.0.0.1"
            
            func worker(wg *sync.WaitGroup) {
            flag:
                for {
                    select {
                    case job := <-jobs:
                        _, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", job.host, job.port), time.Millisecond*1500)
                        d := result{
                            jobData: job,
                            status:  false,
                        }
                        if err == nil {
                            d.status = true
                        }
                        results <- d
            
                    case <-time.NewTicker(time.Second * 10).C:
                        wg.Done()
                        break flag
                    }
                }
            }
            
            func generateTasks() {
                for i := 1; i < 65535; i++ {
                    jobs <- job{
                        host: HOST,
                        port: i,
                    }
                }
            }
            
            func checkResult() {
            flag:
                for {
                    select {
                    case res := <-results:
                        if res.status {
                            fmt.Println(res.jobData, "open")
                        }
                    case <-time.NewTicker(time.Second * 10).C:
                        fmt.Println("time out ...")
                        break flag
                    }
                }
            }
            
            func main() {
                defer func() {
                    close(jobs)
                    close(results)
                }()
            
                wg := sync.WaitGroup{}
            
                //生产任务
                go generateTasks()
            
                //检测结果
                go checkResult()
            
                //开启worker
                for i := 0; i < 10; i++ {
                    wg.Add(1)
                    go worker(&wg)
                }
            
                fmt.Println("--------")
                wg.Wait()
                fmt.Println("++++++++")
            }
            
              撰写回答

              登录后参与交流、获取后续更新提醒