allMap存储的是一个任务列表,KEY标记了这个任务类型,Value对应的是任务的参数, 现在我需要并发处理这些任务 。 开发过程中使用了如下两种方法,效果并不好,感觉自己没有领会到golang并发处理的思想 ; 下面是我的几点体会和疑惑,希望得到各位大神的指导。
方式一
// allMap 中存储了任务列表
// Task 定义如下
type Task struct {
Params interface{}
ResultChan chan []byte
// Wg *sync.WaitGroup
}
Params是参数,ResultChan是处理完毕之后,将结果写入到ResultChan中 ;
// 并发 处理任务
for key, value := range allMap {
go func(k string, v interface{}) {
log.Debug("k : " , k )
if k == tools.REQUEST_BAOJIE {
// A
log.Debug("baojie elem len : ", len(value))
one_task = &service.Task{
Params: v,
ResultChan: make(chan []byte, len(value)),
//Wg : new(sync.WaitGroup) ,
}
// B
log.Debugf("1 one_task : %+v ", one_task)
// AddTask函数逻辑会处理one_task,处理完毕之后,将结果写入到one_task结构体的ResultChan字段;
service.AddTask(one_task)
} else if k == tools.REQUEST {
}
}(key, value)
}
// C
log.Debugf("2 one_task : %+v ", one_task)
// 接收结果
go func() {
for item := range one_task.ResultChan {
log.Debug("Receive data From ResultChan : ", string(item))
}
log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
}()
这种方式的弊端,太依赖程序执行的先后顺序了,测试的过程中,发现当C发生在A和B之前时,会使接收结果goroutinue访问ResultChan成员发生奔溃,因为此时ResultChan还没有申请空间。
方案一解决方案:
service.AddTask(one_task) 函数再加一个参数,chan <- interface{} , AddTask处理完之后,将结果写入到这个通道里面,接收结果协程监听该通道,然后读取结果。
方式二
延迟并发时机
for k, v := range allMap {
//go func(k string, v interface{}) {
log.Debug("k : ", k)
if k == tools.REQUEST {
// A
log.Debug("baojie elem len : ", len(v))
one_task = &service.Task{
Params: v,
ResultChan: make(chan []byte, len(v)),
//Wg : new(sync.WaitGroup) ,
}
// B
log.Debugf("1 one_task : %+v ", one_task)
go service.AddTask(one_task)
} else if k == tools.REQUEST_TCP {
}
//}(key, value)
}
// C
log.Debugf("2 one_task : %+v ", one_task)
// 接收结果
go func() {
for item := range one_task.ResultChan {
log.Debug("Receive data From ResultChan : ", string(item))
}
log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
}()
这样,就保证了C必须发生在A、B之后,这样一来,ResultChan一定先初始化了,等待AddTask后面的协程往里面写入数据,接收结果协程就会读取出来。
问题1
问题来了,既然方式一存在问题,那么方式二中是否在效率上有何弊端呢 ?
我这样写并发的逻辑是否有问题 ?
问题2
这种思想是否可取
var task Task ;
// 提交任务 线程
for key , value := range allMap{
task := Task{
params : value ,
result : make(chan interface{} , len(value) ) , // value 是一个list
}
go processOneByOne(key ,value) // 这种方式是不是开启了很多协程? len(allmap)
}
// 取结果
for result := range task.result {
// get result from chann
// to do
}
``
## 问题3
计划使用一个全局的chan,processOneByOne业务函数处理完毕之后,将结果写到该chan中,然后监听这个chann,从chann中获取结果
处理流程大致:
demo.go
func TodoWork(){
go func(){
for key ,value := range allMap{
processOneByOne(key , value )
}
}()
for item := range task.ResultChan {
// 问题一、 这里如何保证item就是上面那个key value的结果,而不是其他的KEY、value对应的结果
// 问题二、 当TodoWork在多进程环境下面时,是否存在竞争问题?
println(item)
}
}
task.go
var (
ResultChan chan interface{}
)
func init(){
ResultChan = make( chan interface{} , 100 )
}
func processOneByOne( key string , value interface{} ) {
// 处理任务
// ....
// 写入结果
// 问题三、怎么关闭ResultChan , 如果不关闭,是不是goroutine泄漏问题啊 ?
ResultChan <- "Hello World"
}
### 问题描述
### 问题出现的环境背景及自己尝试过哪些方法
### 相关代码
// 请把代码文本粘贴到下方(请勿用图片代替代码)
### 你期待的结果是什么?实际看到的错误信息又是什么?
### 题目描述
### 题目来源及自己的思路
### 相关代码
// 请把代码文本粘贴到下方(请勿用图片代替代码)
### 你期待的结果是什么?实际看到的错误信息又是什么?
### 问题描述
### 问题出现的环境背景及自己尝试过哪些方法
### 相关代码
// 请把代码文本粘贴到下方(请勿用图片代替代码)
### 你期待的结果是什么?实际看到的错误信息又是什么?
如果是我,我会采用这种模式: