惯用的 goroutine 终止和错误处理

新手上路,请多包涵

我在 go 中有一个简单的并发用例,但我想不出一个优雅的解决方案来解决我的问题。

我想编写一个方法 fetchAll 从远程服务器并行查询未指定数量的资源。如果任何提取失败,我想立即返回第一个错误。

我最初的实现泄漏了 goroutines:

     package main

    import (
      "fmt"
      "math/rand"
      "sync"
      "time"
    )

    func fetchAll() error {
      wg := sync.WaitGroup{}
      errs := make(chan error)
      leaks := make(map[int]struct{})
      defer fmt.Println("these goroutines leaked:", leaks)

      // run all the http requests in parallel
      for i := 0; i < 4; i++ {
        leaks[i] = struct{}{}
        wg.Add(1)
        go func(i int) {
          defer wg.Done()
          defer delete(leaks, i)

          // pretend this does an http request and returns an error
          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
          errs <- fmt.Errorf("goroutine %d's error returned", i)
        }(i)
      }

      // wait until all the fetches are done and close the error
      // channel so the loop below terminates
      go func() {
        wg.Wait()
        close(errs)
      }()

      // return the first error
      for err := range errs {
        if err != nil {
          return err
        }
      }

      return nil
    }

    func main() {
      fmt.Println(fetchAll())
    }

游乐场:https: //play.golang.org/p/Be93J514R5

我通过阅读 https://blog.golang.org/pipelines 知道我可以创建一个信号通道来清理其他线程。或者,我可能会使用 context 来完成它。但似乎这样一个简单的用例应该有一个我所缺少的更简单的解决方案。

原文由 gerad 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 707
2 个回答

使用 错误组 使这更简单。这会自动等待所有提供的 Go Routines 成功完成,或者在任何一个例程返回错误的情况下取消所有剩余的例程(在这种情况下,该错误是返回给调用者的一个气泡)。

 package main

import (
        "context"
        "fmt"
        "math/rand"
        "time"

        "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context) error {
        errs, ctx := errgroup.WithContext(ctx)

        // run all the http requests in parallel
        for i := 0; i < 4; i++ {
                errs.Go(func() error {
                        // pretend this does an http request and returns an error
                        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                        return fmt.Errorf("error in go routine, bailing")
                })
        }

        // Wait for completion and return the first error (if any)
        return errs.Wait()
}

func main() {
        fmt.Println(fetchAll(context.Background()))
}

原文由 joth 发布,翻译遵循 CC BY-SA 4.0 许可协议

除了一个 goroutine 之外,所有 goroutine 都已泄漏,因为它们仍在等待发送到 errs 通道——你永远不会完成清空它的 for-range。您还泄漏了 goroutine 的工作是关闭错误通道,因为等待组永远不会完成。

(另外,正如 Andy 指出的那样,从 map 中删除不是线程安全的,因此需要互斥锁的保护。)

但是,我认为在这里甚至不需要映射、互斥锁、等待组、上下文等。我将重写整个内容以仅使用基本的通道操作,如下所示:

 package main

import (
    "fmt"
    "math/rand"
    "time"
)

func fetchAll() error {
    var N = 4
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)
    for i := 0; i < N; i++ {
        go func(i int) {
            // dummy fetch
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            err := error(nil)
            if rand.Intn(2) == 0 {
                err = fmt.Errorf("goroutine %d's error returned", i)
            }
            ch := done // we'll send to done if nil error and to errc otherwise
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }(i)
    }
    count := 0
    for {
        select {
        case err := <-errc:
            close(quit)
            return err
        case <-done:
            count++
            if count == N {
                return nil // got all N signals, so there was no error
            }
        }
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())
    fmt.Println(fetchAll())
}

游乐场链接:https: //play.golang.org/p/mxGhSYYkOb

编辑:确实有一个愚蠢的错误,感谢您指出。我修复了上面的代码(我认为……)。我还添加了一些随机性以增加 Realism™。

另外,我想强调的是,确实有多种方法可以解决这个问题,而我的解决方案只是一种方法。最终它归结为个人品味,但总的来说,您希望努力实现“惯用”代码 - 以及一种让您感觉自然且易于理解的风格。

原文由 LemurFromTheId 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题