大東

大東 查看完整档案

武汉编辑  |  填写毕业院校  |  填写所在公司/组织 github.com/bdxing 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

大東 评论了文章 · 2019-02-25

golang协程池设计

Why Pool

go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量

for {
        // 监听tcp
        rw, e := l.Accept()
        if e != nil {
            .......
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) // before Serve can return
        // 启动协程处理上下文
        go c.serve(ctx)
}

虽然创建一个groutine占用的内存极小(大约2KB左右,线程通常2M左右),但是在实际生产环境无限制的开启协程显然是不科学的,比如上图的逻辑,如果来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终导致内存溢出乃至严重的崩溃,所以本文将通过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。

一个简单的协程池

过年前做过一波小需求,是将主播管理系统中信息不完整的主播找出来然后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每一个主播的数据都要访问一次直播平台所以就用应对每一个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能造成groutine性能瓶颈的地步,但是心里总是不舒服,于是放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实非常简单,用一个channel当做任务队列,初始化groutine池时确定好并发量,然后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型如下图
图片描述

实现

type SimplePool struct {
    wg   sync.WaitGroup
    work chan func() //任务队列
}

func NewSimplePoll(workers int) *SimplePool {
    p := &SimplePool{
        wg:   sync.WaitGroup{},
        work: make(chan func()),
    }
    p.wg.Add(workers)
    //根据指定的并发量去读取管道并执行
    for i := 0; i < workers; i++ {
        go func() {
            defer func() {
                // 捕获异常 防止waitGroup阻塞
                if err := recover(); err != nil {
                    fmt.Println(err)
                    p.wg.Done()
                }
            }()
            // 从workChannel中取出任务执行
            for fn := range p.work {
                fn()
            }
            p.wg.Done()
        }()
    }
    return p
}
// 添加任务
func (p *SimplePool) Add(fn func()) {
    p.work <- fn
}

// 执行
func (p *SimplePool) Run() {
    close(p.work)
    p.wg.Wait()
}

测试

测试设定为在并发数量为20的协程池中并发抓取一百个人的信息, 因为代码包含较多业务逻辑所以sleep 1秒模拟爬虫过程,理论上执行时间为5秒

func TestSimplePool(t *testing.T) {
    p := NewSimplePoll(20)
    for i := 0; i < 100; i++ {
        p.Add(parseTask(i))
    }
    p.Run()
}

func parseTask(i int) func() {
    return func() {
        // 模拟抓取数据的过程
        time.Sleep(time.Second * 1)
        fmt.Println("finish parse ", i)
    }
}

图片描述

这样一来最简单的一个groutine池就完成了

go-playground/pool

上面的groutine池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,所以又去看了一下go-playground/pool的源码实现,先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。

// 需要加入pool 中执行的任务
type WorkFunc func(wu WorkUnit) (interface{}, error)

// 工作单元
type workUnit struct {
    value      interface{}    // 任务结果 
    err        error          // 任务的报错
    done       chan struct{}  // 通知任务完成
    fn         WorkFunc    
    cancelled  atomic.Value   // 任务是否被取消
    cancelling atomic.Value   // 是否正在取消任务
    writing    atomic.Value   // 任务是否正在执行
}

接下来看Pool的结构

type limitedPool struct {
    workers uint            // 并发量 
    work    chan *workUnit  // 任务channel
    cancel  chan struct{}   // 用于通知结束的channel
    closed  bool            // 是否关闭
    m       sync.RWMutex    // 读写锁,主要用来保证 closed值的并发安全
}

初始化groutine池, 以及启动设定好数量的groutine

// 初始化pool,设定并发量
func NewLimited(workers uint) Pool {
    if workers == 0 {
        panic("invalid workers '0'")
    }
    p := &limitedPool{
        workers: workers,
    }
    p.initialize()
    return p
}

func (p *limitedPool) initialize() {
    p.work = make(chan *workUnit, p.workers*2)
    p.cancel = make(chan struct{})
    p.closed = false
    for i := 0; i < int(p.workers); i++ {
        // 初始化并发单元
        p.newWorker(p.work, p.cancel)
    }
}

// passing work and cancel channels to newWorker() to avoid any potential race condition
// betweeen p.work read & write
func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) {
    go func(p *limitedPool) {

        var wu *workUnit

        defer func(p *limitedPool) {
            // 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动
            if err := recover(); err != nil {

                trace := make([]byte, 1<<16)
                n := runtime.Stack(trace, true)

                s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))

                iwu := wu
                iwu.err = &ErrRecovery{s: s}
                close(iwu.done)

                // need to fire up new worker to replace this one as this one is exiting
                p.newWorker(p.work, p.cancel)
            }
        }(p)

        var value interface{}
        var err error

        for {
            select {
            // workChannel中读取任务
            case wu = <-work:

                // 防止channel 被关闭后读取到零值
                if wu == nil {
                    continue
                }

                // 先判断任务是否被取消
                if wu.cancelled.Load() == nil {
                    // 执行任务
                    value, err = wu.fn(wu)
                    wu.writing.Store(struct{}{})
                    
                    // 任务执行完在写入结果时需要再次检查工作单元是否被取消,防止产生竞争条件
                    if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
                        wu.value, wu.err = value, err
                        close(wu.done)
                    }
                }
            // pool是否被停止
            case <-cancel:
                return
            }
        }

    }(p)
}

往POOL中添加任务,并检查pool是否关闭

func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {
    w := &workUnit{
        done: make(chan struct{}),
        fn:   fn,
    }

    go func() {
        p.m.RLock()
        if p.closed {
            w.err = &ErrPoolClosed{s: errClosed}
            if w.cancelled.Load() == nil {
                close(w.done)
            }
            p.m.RUnlock()
            return
        }
        // 将工作单元写入workChannel, pool启动后将由上面newWorker函数中读取执行
        p.work <- w
        p.m.RUnlock()
    }()

    return w
}

在go-playground/pool包中, limitedPool的批量并发执行还需要借助batch.go来完成

// batch contains all information for a batch run of WorkUnits
type batch struct {
    pool    Pool          // 上面的limitedPool实现了Pool interface
    m       sync.Mutex    // 互斥锁,用来判断closed
    units   []WorkUnit    // 工作单元的slice, 这个主要用在不设并发限制的场景,这里忽略
    results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取
    done    chan struct{} // 通知batch是否完成
    closed  bool
    wg      *sync.WaitGroup
}
//  go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将之前创建好的Pool传入newBatch
func newBatch(p Pool) Batch {
    return &batch{
        pool:    p,
        units:   make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
        results: make(chan WorkUnit),
        done:    make(chan struct{}),
        wg:      new(sync.WaitGroup),
    }
}

// 往批量任务中添加workFunc任务
func (b *batch) Queue(fn WorkFunc) {

    b.m.Lock()
    if b.closed {
        b.m.Unlock()
        return
    }
    //往上述的limitPool中添加workFunc
    wu := b.pool.Queue(fn)

    b.units = append(b.units, wu) // keeping a reference for cancellation purposes
    b.wg.Add(1)
    b.m.Unlock()
    
    // 执行完后将workUnit写入结果集channel
    go func(b *batch, wu WorkUnit) {
        wu.Wait()
        b.results <- wu
        b.wg.Done()
    }(b, wu)
}

// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
func (b *batch) QueueComplete() {
    b.m.Lock()
    b.closed = true
    close(b.done)
    b.m.Unlock()
}

// 获取批量任务结果集
func (b *batch) Results() <-chan WorkUnit {
    go func(b *batch) {
        <-b.done
        b.m.Lock()
        b.wg.Wait()
        b.m.Unlock()
        close(b.results)
    }(b)
    return b.results
}

测试

func SendMail(int int) pool.WorkFunc {
    fn := func(wu pool.WorkUnit) (interface{}, error) {
        // sleep 1s 模拟发邮件过程
        time.Sleep(time.Second * 1)
        // 模拟异常任务需要取消
        if int == 17 {
            wu.Cancel()
        }
        if wu.IsCancelled() {
            return false, nil
        }
        fmt.Println("send to", int)
        return true, nil
    }
    return fn
}

func TestBatchWork(t *testing.T) {
    // 初始化groutine数量为20的pool
    p := pool.NewLimited(20)
    defer p.Close()
    batch := p.Batch()
    // 设置一个批量任务的过期超时时间
    t := time.After(10 * time.Second)
    go func() {
        for i := 0; i < 100; i++ {
            batch.Queue(SendMail(i))
        }
        batch.QueueComplete()
    }()
    // 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行
    r := batch.Results()
LOOP:
    for {
        select {
        case <-t:
        // 登台超时通知
            fmt.Println("recived timeout")
            break LOOP
     
        case email, ok := <-r:
        // 读取结果集
            if ok {
                if err := email.Error(); err != nil {
                    fmt.Println("err", err.Error())
                }
                fmt.Println(email.Value())
            } else {
                fmt.Println("finish")
                break LOOP
            }
        }
    }
}

    

图片描述
图片描述
接近理论值5s, 通知模拟被取消的work也正常取消

go-playground/pool在比起之前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。但是,但是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。所以我又去看了一下ants的设计和实现。

ants

ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各种池化技术(这个日后再开新坑去读源码), ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

先看一下ants的pool 结构体 (pool.go)

type Pool struct {
    // 协程池的容量 (groutine数量的上限)
    capacity int32
    // 正在执行中的groutine
    running int32
    // 过期清理间隔时间
    expiryDuration time.Duration
    // 当前可用空闲的groutine
    workers []*Worker
    // 表示pool是否关闭
    release int32
    // lock for synchronous operation.
    lock sync.Mutex
    // 用于控制pool等待获取可用的groutine
    cond *sync.Cond
    // 确保pool只被关闭一次
    once sync.Once
    // worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度
    workerCache sync.Pool
    // pool引发panic时的执行函数
    PanicHandler func(interface{})
}

接下来看pool的工作单元 worker (worker.go)

type Worker struct {
    // worker 所属的poo;
    pool *Pool
    // 任务队列
    task chan func()
    // 回收时间,即该worker的最后一次结束运行的时间
    recycleTime time.Time
}

执行worker的代码 (worker.go)

func (w *Worker) run() {
    // pool中正在执行的worker数+1
    w.pool.incRunning()
    go func() {
        defer func() {
            if p := recover(); p != nil {
                //若worker因各种问题引发panic, 
                //pool中正在执行的worker数 -1,         
                //如果设置了Pool中的PanicHandler,此时会被调用
                w.pool.decRunning()
                if w.pool.PanicHandler != nil {
                    w.pool.PanicHandler(p)
                } else {
                    log.Printf("worker exits from a panic: %v", p)
                }
            }
        }()
        
        // worker 执行任务队列
        for f := range w.task {
            //任务队列中的函数全部被执行完后,
            //pool中正在执行的worker数 -1, 
            //将worker 放回对象池
            if f == nil {
                w.pool.decRunning()
                w.pool.workerCache.Put(w)
                return
            }
            f()
            //worker 执行完任务后放回Pool 
            //使得其余正在阻塞的任务可以获取worker
            w.pool.revertWorker(w)
        }
    }()
}

了解了工作单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)

// 向pool提交任务
func (p *Pool) Submit(task func()) error {
    if 1 == atomic.LoadInt32(&p.release) {
        return ErrPoolClosed
    }
    // 获取pool中的可用worker并向其任务队列中写入任务
    p.retrieveWorker().task <- task
    return nil
}


// **核心代码** 获取可用worker
func (p *Pool) retrieveWorker() *Worker {
    var w *Worker

    p.lock.Lock()
    idleWorkers := p.workers
    n := len(idleWorkers) - 1
  // 当前pool中有可用worker, 取出(队尾)worker并执行
    if n >= 0 {
        w = idleWorkers[n]
        idleWorkers[n] = nil
        p.workers = idleWorkers[:n]
        p.lock.Unlock()
    } else if p.Running() < p.Cap() {
        p.lock.Unlock()
        // 当前pool中无空闲worker,且pool数量未达到上线
        // pool会先从临时对象池中寻找是否有已完成任务的worker,
        // 若临时对象池中不存在,则重新创建一个worker并将其启动
        if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
            w = cacheWorker.(*Worker)
        } else {
            w = &Worker{
                pool: p,
                task: make(chan func(), workerChanCap),
            }
        }
        w.run()
    } else {
        // pool中没有空余worker且达到并发上限
        // 任务会阻塞等待当前运行的worker完成任务释放会pool
        for {
            p.cond.Wait() // 等待通知, 暂时阻塞
            l := len(p.workers) - 1
            if l < 0 {
                continue
            }
            // 当有可用worker释放回pool之后, 取出
            w = p.workers[l]
            p.workers[l] = nil
            p.workers = p.workers[:l]
            break
        }
        p.lock.Unlock()
    }
    return w
}

// 释放worker回pool
func (p *Pool) revertWorker(worker *Worker) {
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.workers = append(p.workers, worker)
    // 通知pool中已经获取锁的groutine, 有一个worker已完成任务
    p.cond.Signal()
    p.lock.Unlock()
}

在批量并发任务的执行过程中, 如果有超过5纳秒(ants中默认worker过期时间为5ns)的worker未被分配新的任务,则将其作为过期worker清理掉,从而保证pool中可用的worker都能发挥出最大的作用以及将任务分配得更均匀
(pool.go)

// 该函数会在pool初始化后在协程中启动
func (p *Pool) periodicallyPurge() {
    // 创建一个5ns定时的心跳
    heartbeat := time.NewTicker(p.expiryDuration)
    defer heartbeat.Stop()

    for range heartbeat.C {
        currentTime := time.Now()
        p.lock.Lock()
        idleWorkers := p.workers
        if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
            p.lock.Unlock()
            return
        }
        n := -1
        for i, w := range idleWorkers {
            // 因为pool 的worker队列是先进后出的,所以正序遍历可用worker时前面的往往里当前时间越久
            if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                break
            }    
            // 如果worker最后一次运行时间距现在超过5纳秒,视为过期,worker收到nil, 执行上述worker.go中 if n == nil 的操作
            n = i
            w.task <- nil
            idleWorkers[i] = nil
        }
        if n > -1 {
            // 全部过期
            if n >= len(idleWorkers)-1 {
                p.workers = idleWorkers[:0]
            } else {
            // 部分过期
                p.workers = idleWorkers[n+1:]
            }
        }
        p.lock.Unlock()
    }
}

测试

func TestAnts(t *testing.T) {
    wg := sync.WaitGroup{}
    pool, _ := ants.NewPool(20)
    defer pool.Release()
    for i := 0; i < 100; i++ {
        wg.Add(1)
        pool.Submit(sendMail(i, &wg))
    }
    wg.Wait()
}

func sendMail(i int, wg *sync.WaitGroup) func() {
    return func() {
        time.Sleep(time.Second * 1)
        fmt.Println("send mail to ", i)
        wg.Done()
    }
}

图片描述
这里虽只简单的测试批量并发任务的场景, 如果大家有兴趣可以去看看ants的压力测试, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。

借用ants作者的原话来说:
然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情…

Over

一口气从简单到复杂总结了三个协程池的实现,受益匪浅, 感谢各开源库的作者, 虽然世界上没有龙,但是屠龙技是必须练的,因为它就像存款,不一定要全部都用了,但是一定不能没有!

查看原文

大東 关注了问题 · 2019-01-30

Unity3D开发安卓游戏问题?

最近在学习Unity3D游戏开发,我从assets store一共下载了五个资源,一个有200MB左右的场景资源,其他的大概10MB左右。
我把哪些资源简单组合之后做了个塞车类安卓游戏,编译之后apk大小80MB左右。

等我安装到手机之后发现这个游戏卡的不行,打开之后根本用不了手机。

我的游戏逻辑还没写完,很简单的,就是Ui有两个按钮才实现前后移动。

游戏为什么会这么卡的,有什么技巧还是我下载的那个200MB的资源根本不适合安卓游戏?

关注 3 回答 1

大東 回答了问题 · 2019-01-30

如何在 golang 中开一个 goroutine 循环往缓冲通道写队列,另外开 5 个固定数量 goroutine 消费队列?

了解一下sync.WaitGroup这个库。有你想要的

关注 7 回答 5

大東 关注了标签 · 2019-01-29

golang

Go语言是谷歌2009发布的第二款开源编程语言。Go语言专门针对多处理器系统应用程序的编程进行了优化,使用Go编译的程序可以媲美C或C++代码的速度,而且更加安全、支持并行进程。
Go语言是谷歌推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发Go,是因为过去10多年间软件开发的难度令人沮丧。Go是谷歌2009发布的第二款编程语言。

七牛云存储CEO许式伟出版《Go语言编程
go语言翻译项目 http://code.google.com/p/gola...
《go编程导读》 http://code.google.com/p/ac-m...
golang的官方文档 http://golang.org/doc/docs.html
golang windows上安装 http://code.google.com/p/gomi...

关注 25953

大東 评论了文章 · 2019-01-29

来,控制一下 Goroutine 的并发数量

image

原文地址:来,控制一下 Goroutine 的并发数量

问题

func main() {
    userCount := math.MaxInt64
    for i := 0; i < userCount; i++ {
        go func(i int) {
            // 做一些各种各样的业务逻辑处理
            fmt.Printf("go func: %d\n", i)
            time.Sleep(time.Second)
        }(i)
    }
}

在这里,假设 userCount 是一个外部传入的参数(不可预测,有可能值非常大),有人会全部丢进去循环。想着全部都并发 goroutine 去同时做某一件事。觉得这样子会效率会更高,对不对!

那么,你觉得这里有没有什么问题?

噩梦般的开始

当然,在特定场景下,问题可大了。因为在本文被丢进去同时并发的可是一个极端值。我们可以一起观察下图的指标分析,看看情况有多 “崩溃”。下图是上述代码的表现:

输出结果

...
go func: 5839
go func: 5840
go func: 5841
go func: 5842
go func: 5915
go func: 5524
go func: 5916
go func: 8209
go func: 8264
signal: killed

如果你自己执行过代码,在 “输出结果” 上你会遇到如下问题:

  • 系统资源占用率不断上涨
  • 输出一定数量后:控制台就不再刷新输出最新的值了
  • 信号量:signal: killed

系统负载

image

CPU

image

短时间内系统负载暴增

虚拟内存

image

短时间内占用的虚拟内存暴增

top

PID    COMMAND      %CPU  TIME     #TH   #WQ  #PORT MEM    PURG   CMPRS  PGRP  PPID  STATE    BOOSTS
...
73414  test         100.2 01:59.50 9/1   0    18    6801M+ 0B     114G+  73403 73403 running  *0[1]

小结

如果仔细看过监控工具的示意图,就可以知道其实我间隔的执行了两次,能看到系统间的使用率幅度非常大。当进程被杀掉后,整体又恢复为正常值

在这里,我们回到主题,就是在不控制并发的 goroutine 数量 会发生什么问题?大致如下:

  • CPU 使用率浮动上涨
  • Memory 占用不断上涨。也可以看看 CMPRS,它表示进程的压缩数据的字节数。已经到达 114G+ 了
  • 主进程崩溃(被杀掉了)

简单来说,“崩溃” 的原因就是对系统资源的占用过大。常见的比如:打开文件数(too many files open)、内存占用等等

危害

对该台服务器产生非常大的影响,影响自身及相关联的应用。很有可能导致不可用或响应缓慢,另外启动了复数 “失控” 的 goroutine,导致程序流转混乱

解决方案

在前面花了大量篇幅,渲染了在存在大量并发 goroutine 数量时,不控制的话会出现 “严重” 的问题,接下来一起思考下解决方案。如下:

  1. 控制/限制 goroutine 同时并发运行的数量
  2. 改变应用程序的逻辑写法(避免大规模的使用系统资源和等待)
  3. 调整服务的硬件配置、最大打开数、内存等阈值

控制 goroutine 并发数量

接下来正式的开始解决这个问题,希望你认真阅读的同时加以思考,因为这个问题在实际项目中真的是太常见了!

问题已经抛出来了,你需要做的是想想有什么办法解决这个问题。建议你自行思考一下技术方案。再接着往下看 :-)

尝试 chan

func main() {
    userCount := 10
    ch := make(chan bool, 2)
    for i := 0; i < userCount; i++ {
        ch <- true
        go Read(ch, i)
    }
    
    //time.Sleep(time.Second)
}

func Read(ch chan bool, i int) {
    fmt.Printf("go func: %d\n", i)
    <- ch
}

输出结果:

go func: 1
go func: 2
go func: 3
go func: 4
go func: 5
go func: 6
go func: 7
go func: 8
go func: 0

嗯,我们似乎很好的控制了 2 个 2 个的 “顺序” 执行多个 goroutine。但是,问题出现了。你仔细数一下输出结果,才 9 个值?

这明显就不对。原因出在当主协程结束时,子协程也是会被终止掉的。因此剩余的 goroutine 没来及把值输出,就被送上路了(不信你把 time.Sleep 打开看看,看看输出数量)

尝试 sync

...
var wg = sync.WaitGroup{}

func main() {
    userCount := 10
    for i := 0; i < userCount; i++ {
        wg.Add(1)
        go Read(i)
    }

    wg.Wait()
}

func Read(i int) {
    defer wg.Done()
    fmt.Printf("go func: %d\n", i)
}

嗯,单纯的使用 sync.WaitGroup 也不行。没有控制到同时并发的 goroutine 数量(代指达不到本文所要求的目标)

小结

单纯简单使用 channel 或 sync 都有明显缺陷,不行。我们再看看组件配合能不能实现

尝试 chan + sync

...
var wg = sync.WaitGroup{}

func main() {
    userCount := 10
    ch := make(chan bool, 2)
    for i := 0; i < userCount; i++ {
        wg.Add(1)
        go Read(ch, i)
    }

    wg.Wait()
}

func Read(ch chan bool, i int) {
    defer wg.Done()

    ch <- true
    fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
    time.Sleep(time.Second)
    <-ch
}

输出结果:

go func: 9, time: 1547911938
go func: 1, time: 1547911938
go func: 6, time: 1547911939
go func: 7, time: 1547911939
go func: 8, time: 1547911940
go func: 0, time: 1547911940
go func: 3, time: 1547911941
go func: 2, time: 1547911941
go func: 4, time: 1547911942
go func: 5, time: 1547911942

从输出结果来看,确实实现了控制 goroutine 以 2 个 2 个的数量去执行我们的 “业务逻辑”,当然结果集也理所应当的是乱序输出

方案一:简单 Semaphore

在确立了简单使用 chan + sync 的方案是可行后,我们重新将流转逻辑封装为 gsema,主程序变成如下:

import (
    "fmt"
    "time"

    "github.com/EDDYCJY/gsema"
)

var sema = gsema.NewSemaphore(3)

func main() {
    userCount := 10
    for i := 0; i < userCount; i++ {
        go Read(i)
    }

    sema.Wait()
}

func Read(i int) {
    defer sema.Done()
    sema.Add(1)

    fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
    time.Sleep(time.Second)
}

分析方案

在上述代码中,程序执行流程如下:

  • 设置允许的并发数目为 3 个
  • 循环 10 次,每次启动一个 goroutine 来执行任务
  • 每一个 goroutine 在内部利用 sema 进行调控是否阻塞
  • 按允许并发数逐渐释出 goroutine,最后结束任务

看上去人模人样,没什么严重问题。但却有一个 “大” 坑,认真看到第二点 “每次启动一个 goroutine” 这句话。这里有点问题,提前产生那么多的 goroutine 会不会有什么问题,接下来一起分析下利弊,如下:

  • 适合量不大、复杂度低的使用场景

    • 几百几千个、几十万个也是可以接受的(看具体业务场景)
    • 实际业务逻辑在运行前就已经被阻塞等待了(因为并发数受限),基本实际业务逻辑损耗的性能比 goroutine 本身大
    • goroutine 本身很轻便,仅损耗极少许的内存空间和调度。这种等待响应的情况都是躺好了,等待任务唤醒
  • Semaphore 操作复杂度低且流转简单,容易控制

  • 不适合量很大、复杂度高的使用场景

    • 有几百万、几千万个 goroutine 的话,就浪费了大量调度 goroutine 和内存空间。恰好你的服务器也接受不了的话
  • Semaphore 操作复杂度提高,要管理更多的状态

小结

  • 基于什么业务场景,就用什么方案去做事
  • 有足够的时间,允许你去追求更优秀、极致的方案(用第三方库也行)

用哪种方案,我认为主要基于以上两点去思考,都是 OK 的。没有对错,只有当前业务场景能不能接受,这个预先启动的 goroutine 数量你的系统是否能够接受

当然了,常见/简单的 Go 应用采用这类技术方案,基本就能解决问题了。因为像本文第一节 “问题” 如此超巨大数量的情况,情况很少。其并不存在那些 “特殊性”。因此用这个方案基本 OK

灵活控制 goroutine 并发数量

小手一紧。隔壁老王发现了新的问题。“方案一” 中,在输入输出一体的情况下,在常见的业务场景中确实可以

但,这次新的业务场景比较特殊,要控制输入的数量,以此达到改变允许并发运行 goroutine 的数量。我们仔细想想,要做出如下改变:

  • 输入/输出要抽离,才可以分别控制
  • 输入/输出要可变,理所应当在 for-loop 中(可设置数值的地方)
  • 允许改变 goroutine 并发数量,但它也必须有一个最大值(因为允许改变是相对)

方案二:灵活 chan + sync

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    userCount := 10
    ch := make(chan int, 5)
    for i := 0; i < userCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for d := range ch {
                fmt.Printf("go func: %d, time: %d\n", d, time.Now().Unix())
                time.Sleep(time.Second * time.Duration(d))
            }
        }()
    }

    for i := 0; i < 10; i++ {
        ch <- 1
        ch <- 2
        //time.Sleep(time.Second)
    }

    close(ch)
    wg.Wait()
}

输出结果:

...
go func: 1, time: 1547950567
go func: 3, time: 1547950567
go func: 1, time: 1547950567
go func: 2, time: 1547950567
go func: 2, time: 1547950567
go func: 3, time: 1547950567
go func: 1, time: 1547950568
go func: 2, time: 1547950568
go func: 3, time: 1547950568
go func: 1, time: 1547950568
go func: 3, time: 1547950569
go func: 2, time: 1547950569

在 “方案二” 中,我们可以随时随地的根据新的业务需求,做如下事情:

  • 变更 channel 的输入数量
  • 能够根据特殊情况,变更 channel 的循环值
  • 变更最大允许并发的 goroutine 数量

总的来说,就是可控空间都尽量放开了,是不是更加灵活了呢 :-)

方案三:第三方库

比较成熟的第三方库也不少,基本都是以生成和管理 goroutine 为目标的池工具。我简单列了几个,具体建议大家阅读下源码或者多找找,原理相似

总结

在本文的开头,我花了大力气(极端数量),告诉你同时并发过多的 goroutine 数量会导致系统占用资源不断上涨。最终该服务崩盘的极端情况。为的是希望你今后避免这种问题,给你留下深刻的印象

接下来我们以 “控制 goroutine 并发数量” 为主题,展开了一番分析。分别给出了三种方案。在我看来,各具优缺点,我建议你挑选合适自身场景的技术方案就可以了

因为,有不同类型的技术方案也能解决这个问题,千人千面。本文推荐的是较常见的解决方案,也欢迎大家在评论区继续补充 :-)

查看原文

大東 提出了问题 · 2017-02-22

C语言如何获取 调用的外部程序 标准输出的内容

这个是在网上搜索的,但还是获取不到。
那位大神给改一下,好久没写C了

int GetRun()
{
    SECURITY_ATTRIBUTES sa;
    sa.nLength = sizeof(SECURITY_ATTRIBUTES);
    sa.lpSecurityDescriptor = NULL;
    sa.bInheritHandle = TRUE;

    HANDLE outHandle;
    outHandle = CreateFile(L"aa.txt", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, &sa, CREATE_ALWAYS, 0, NULL);

    PROCESS_INFORMATION processInfo;
    STARTUPINFO startUpInfo;

    memset(&startUpInfo, 0, sizeof(STARTUPINFO));
    memset(&processInfo, 0, sizeof(PROCESS_INFORMATION));

    startUpInfo.cb = sizeof(STARTUPINFO);
    startUpInfo.dwFlags = STARTF_USESTDHANDLES;
    startUpInfo.wShowWindow = SW_SHOWNORMAL;

    startUpInfo.hStdOutput = outHandle;

    if (!CreateProcess(L"Coreinfo.exe", NULL, NULL, NULL, FALSE,
        CREATE_NEW_CONSOLE, NULL, NULL,
        &startUpInfo, &processInfo)){
        CloseHandle(outHandle);
        printf("open hello.exe error\n");
    }
    else
    {
        DWORD filesize = 1000;
        char * buffer = (char *)malloc(filesize + 1);
        memset(buffer, 0, filesize + 1);
        DWORD readsize;
        ReadFile(outHandle, buffer, filesize, &readsize, NULL);
        buffer[filesize] = 0;
        printf("1111111 - %s\n", buffer);
        free(buffer);

        WaitForSingleObject(processInfo.hProcess, INFINITE);
        printf("open hello.exe ok\n");

        CloseHandle(outHandle);

        CloseHandle(processInfo.hProcess);
        CloseHandle(processInfo.hThread);
    }

    return 0;
}

关注 4 回答 3

大東 提出了问题 · 2017-02-21

解决C语言判断电脑是否支持虚拟化

如何用C语言判断PC是否支持虚拟化,和是否开启虚拟化。


如果有第三方软件可以检测的话,我需要一个在后台检测的,不能有GUI的,谢谢朋友

关注 3 回答 2

大東 提出了问题 · 2016-12-08

七牛以送给用户的邮件,小小的建议。

最近项目里做一个邮件项目,看到七牛的邮件结构格式里的 属性 subtype=> PLAIN 但内容还是有html标签,这个应该是没有格式的。

clipboard.png

关注 1 回答 0

大東 关注了问题 · 2016-11-07

onlyoffice 中文字体(正文)版就会有乱码

clipboard.png

对默认打开宋体(正文或标题)的文档,就会出现乱码,只要是中文字体,带(正文或标题)的字体就会这样。
有大神解决过么?

关注 3 回答 1

大東 提出了问题 · 2016-11-07

onlyoffice 中文字体(正文)版就会有乱码

clipboard.png

对默认打开宋体(正文或标题)的文档,就会出现乱码,只要是中文字体,带(正文或标题)的字体就会这样。
有大神解决过么?

关注 3 回答 1

认证与成就

  • 获得 1 次点赞
  • 获得 15 枚徽章 获得 0 枚金徽章, 获得 3 枚银徽章, 获得 12 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-10-02
个人主页被 243 人浏览