Airy

Airy 查看完整档案

杭州编辑河南城建学院  |  计算机科学与工程 编辑杭州河象网络科技有限公司  |  架构师 编辑 segmentfault.com/u/airy 编辑
编辑

github.com/airylinus

个人动态

Airy 发布了文章 · 2020-08-23

应用可视化探索在线教育业务中的数据

缘起

    疫情以来,越来越多的家长和学生开始主动或者被动的接受在线这种教学方式。在线教育行业迎来了一波流量增长,积累的数据也越来越多。与此同时,越来越的创业者开始进入这个行业,行业内的竞争也越来越激烈。能否高效的利用这些数据,成为一个公司能否装上涡轮发动机迎风起飞的关键因素。

一. 业务背景

在线教育的成交转化流程一般如下:用户报名 -> 上体验课 -> 付费购课 。体验课一般包含 3 - 5 节,每天 1 节。每节课程有若干个模块,模块中会包含各种形式的互动问题。问题是封闭式的,都会有正确答案,用户的每次答题结果会保存下来。

二. 数据维度和数据结构

数据维度划分

  • 用户维度

    • 家长信息,包括性别、年龄、报名时间、地域信息、报名渠道
    • 孩子信息,包括性别、年龄、用户报名之前是否购买过本公司的其他科目的课程
  • 用户课程维度

    • 是否学完某个课节、模块、问题
    • 某个问题是否答对
  • 时序维度

    • 每一道题的答题反应时间 (听完/看完题目到最终作出回答的时间)
    • 第一次答题是否正确,第二次、第三次答题是否正确(答错之后的可以重试)

三. 数据收集和整理

设计矩形数据的字段

为了方便统计,采用最常见的矩形数据来作为分析的基础。具体来说,每个用户在某个体验课下会有一行记录,每一列就是一个特征数据。针对体验课可能有 N 天,N 并不相同的情况,在需要的字段上通过添加数字编号来解决。例如:d1s ( day one state )表示第 1 天的课程完成情况,当完整上完时字段值为 2,未开始时值为 0 ,已开始未完成时是 1。依次类推。数据表共计  40 列。

采集数据

通常数据采集有推流和拉流两种方式,各自的优缺点这里不再赘叙。感兴趣的同学可以画一个损益矩阵选择自己适用的方式。这里我们采用了拉流的方式实现数据采集,每次在业务流量低峰时跑脚本。脚本会先拉取目标用户群,然后循环抽取每个用户的具体数据。在抽取具体数据时并没有大量使用数据库表 join ,也没有过多的使用 where 条件,把计算和逻辑放到本地进行离线计算。得益于 Golang 强大的 GMP 调度器,纯 CPU 计算并不会成为瓶颈。

数据清洗

  1. 每次分析时先将目标维度的空值字段所在行删除,例如分析孩子年龄时将 0 岁、大于 15 岁的行删除。使用 Pandas 的 dropna 可以很方便的实现数据清洗。
df.dropna(subset=[column_a, column_b])
  1. 删除 outlier 。使用 Pandas 删除异常值,
clean_df = df.loc[df[column_a] > column_a_bound]

其中 column_a_bound 可以通过 4 分位数、方差、均值等得出。选择合适的值即可;

可视化探索

  1. 单维度数据的探索

     通常性别是一个单维度的数据,适合用饼图来表示:

    C5F8951C-37F5-4473-8F6B-BB327322CEBC.png

对应的代码:

plt.figure(figsize=(5,5))
vc = df[‘gender'].value_counts()
y = [vc.values[0], vc.values[1]]
patches,l_text,p_text = plt.pie(y,labels=['女', '男'],
                                explode=(0.1,0),
                                colors=['pink','yellowgreen'],
                                labeldistance = 1.1,
                                autopct = '%3.1f%%',
                                shadow = True,
                                startangle = 90,
                                pctdistance = 0.6)
for t in l_text:
    t.set_size=(30)
for t in p_text:
    t.set_size=(20)
# 设置x,y轴刻度一致,这样饼图才能是圆的
plt.axis('equal')
plt.legend()
plt.show()

类似可以探索一下不同性别在样本用户中的比例、在成交用户中的比例。体现多个比例数据适合矩状图。

C0CC1C5F-2A32-40D6-A151-F743120673D3.png

代码如下:

df_gender_deal = df.loc[df['deal'] == 1]
gender_rate = df['gender'].value_counts()
gender_deal_rate = df_gender_deal['gender'].value_counts()
gender_dist = pd.DataFrame({'all': gender_rate, 'deal': gender_deal_rate})
gender_dist['all_rate'] = gender_dist['all'].apply(lambda x: x / gender_dist.shape[0] * 100)
gender_dist['deal_rate'] = gender_dist['deal'].apply(lambda x: x / df_gender_deal.shape[0] * 100)
x = gender_dist.index
bar_width = 0.35
plt.figure(figsize=(10,5))
plt.bar(x, gender_dist['all_rate'], bar_width, align="center", color="c", label="样本占比", alpha=0.5)
plt.bar(x+bar_width, gender_dist['deal_rate'], bar_width, color="b", align="center", label="成交占比", alpha=0.5)
plt.xlabel("Gender")
plt.ylabel("Percentage")
plt.xticks(x+bar_width/2, x)
plt.legend()
plt.show()

从对比中可以提出假设,男性用户在成交中的比例上升了。上升的比例并不明显,所以我们可以认为性别对成交没有明显的影响。

  1. 多维度联合探索

单一维度提供的信息有限,有些数据需要跟时间维度一起观察变化趋势。例如:为了衡量第二天的课程有多少人能够坚持到课,我们需要对比第一天的到课用户数占比和第二天的到课用户占比。

49F44CBA-30F8-4061-ABD0-719FD2EA5A75.png

其中完课比是到课用户中完成所有课程内容的用户占比,这个数字可以衡量有课程内容和教学环节是否存在问题。上图中可以得出结论:第一天课程的完课比偏低,需要排查原因。

折线图很适合做这种趋势变化的体现,多条折线叠加到同一个图上能够更直观的表现数据。

def course_brief(df):
    active_rate = []
    finish_rate = []
    finish_at_active = []
    for i in range(4):
        vc = df['d{}'.format(i+1)].value_counts()
        n = sum(vc.values)
        active_rate.append((vc[1]+vc[2])/n * 100)
        finish_rate.append((vc[2])/n * 100)
        finish_at_active.append(vc[2]/(vc[1]+vc[2]) * 100)
    plt.figure(figsize=(10, 5))
    dayx = ['Day_1', 'Day_2', 'Day_3', 'Day_4']
    plt.plot(dayx, active_rate, color="r", label="到课率", linestyle='solid', alpha=0.5)
    plt.plot(dayx, finish_rate, color="b", label="完课率", alpha=0.5, linestyle='dotted')
    plt.plot(dayx, finish_at_active, color="g", label="完课比", alpha=0.5, linestyle='dashed')
    plt.legend()

course_brief(df)  
  1. 降维和特征构造

还有一些信息必须用多维数据表现,可视化效果并不好,我们需要构造特征进行降维。例如:我们想验证封闭式问题的答对率的变化趋势是否和成交有关联,也就是说我们想知道课堂表现越来越好的人更愿意成交还是越来越差的人更愿意成交。

基于原始数据抽象出“首次答对率” 的特征,统计每节课的所有首次答题记录中答对的题目数,该数目在回答过的题目中的比例即视为 “首次答对率”。计算出每节课的首次答对率,得到下列数据:

为了衡量这个数字的变化趋势,我们可以构造两列新的数据,startFR 和 endFR。其中 startFR = (d1fr + d2fr + d3fr) / 3,endFR = (d3fr + d4fr + d5fr) / 3。通过 pandas 可以很容易计算出两列新特征:

df['startFR'] = (df['d1fr'] + df['d2fr'] + df['d3fr']) / 3
df['endFR'] = (df['d3fr'] + df['d4fr'] + df['d5fr']) / 3

成功的将 5 维数据降到 2 维,但是还不够,还可以继续。

df['upRate'] = (df['startFR'] - df['endFR']) * 100 / df['startFR']

这里构造了一个新特征 upRate 来表示 “首次答对率” 的变化趋势,为正时表示 “首次答对率”处于上升趋势,为负时表示下降。数值表示变化的斜率,数字越大表示变化越明显。结合其他数据列,可以画出下列图来探索数据:

8082BE4E-CDFE-4884-A768-59CCB9860397.png

上图中灰色的 x 点表示未成交用户,蓝色圆点表示成交用户。X 轴为 upRate,Y 轴为第一节课的首次答对率。虚线为中位数所在的位置。散点图适合探索数据分布,观察数据的关联性。

df_deal = df.loc[df['deal']==1]
df_none = df.loc[df['deal']==0]
plt.scatter(df_deal['upRate'], df_deal['av1'], marker = 'o',color = 'blue', label = 'Deal')
plt.scatter(df_none['upRate'], df_none['av1'], marker = 'x',color = 'gray', label = 'Deal')
plt.axhline(y=df['av1'].median(),ls=":",c="black")#添加水平直线
plt.axvline(x=df['upRate'].median(),ls=":",c="green")#添加垂直直线
plt.show()

四. 总结和问题

  1. 数据可视化可以更直观的表现出数据趋势、挖掘数据的价值;
  2. 精细化运营需要从业务的各种环境开始收集数据;例如上面的数据中缺乏有效的数据来表现客户的购买力。

五. 展望

可以从一下几个方向深入的挖掘和使用数据,让数据的价值发挥出来。

产品端:
  1. 数据可视化是精细化运营的基础,通过可视化数据大盘可以及时发现产品设计、内容、运营策略上存在的问题
  2. 可视化数据的对比,可以为策略调整提供决策信息;
  3. 配合数据基线进行数据可视化,可以验证产品方案调整的效果;
销售端:
  1. 依托数据进行机器学习建模,实现高潜客户的预测和识别,提高销售人员的人效;
  2. 课程数据的挖掘可以给销售提供钩子信息、促成用户跟销售的对话;
用户端:
  1. 可以依托数据和算法进行效果外化
  2. 提供个性化的难度曲线、课程内容、学习路径
查看原文

赞 1 收藏 1 评论 0

Airy 回答了问题 · 2020-06-04

Go 频繁的进行断言以及通过接口调用方法有性能消耗吗?

简单粗暴的结论:

断言不会消耗多少性能,可以忽略。通过接口调用,不在代码中显示使用断言会有额外的性能消耗,并且消耗较大。

原因:
接口调用的代码无法在编译器确定参数类型,因此编译器无法进行内联优化。详细见:
https://mp.weixin.qq.com/s/1n...

关注 3 回答 2

Airy 关注了用户 · 2020-05-11

大彬 @lessisbetter

公众号:Go语言充电站
二维码:https://segmentfault.com/img/...

关注 879

Airy 赞了文章 · 2020-05-11

一道经常考的面试题

前段时间在找工作,也遇到一些不错的面试题,其中有一道很常见,记录一下,里面还有一点搞不明白的:

下面两段程序的输出是什么?

第一段:

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            fmt.Println(i)
            wg.Done()
        }()
    }
    wg.Wait()
}

第二段:

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            fmt.Println(n)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

很多面试题解析里面说第一段的10个goroutine输出全部是10,我对这个结论是一直持怀疑态度的,因为输出什么,取决于那个goroutine里面代码被执行时外层i循环到哪里,经我实测,也符合我自己的想法。

clipboard.png

但是那天那个面试官很肯定的说会全部输出一样的数,我忘记问他的理由是什么了。

关于第二段程序,乱序输出0-9,相信大家是没有异议的。总计有10^10种可能。针对第二段程序,那位面试官接着问了一个我觉得挺有水平的问题:这10^10种输出里面,肯定有一种是按顺序0-9依次输出的,能不能通过一些方法,让这段程序的输出顺序固定下来?这个问题我一时还真的抓不到要点了。。。后来在面试官不断的提点下,我才想到面试官的考点,不禁觉得这个面试官还是很有水平的。
第二段程序如何改动才能达到定序输出的效果呢?我们知道每个goroutine生成后,在P的本地G队列未满的时候,是依次加入到P的本地G队列里的,如果只有一个P可用,也就只有一个本地G队列存在,那么这些G的执行顺序其实是取决于P的G队列的顺序的,那么答案也就出来了,我们只要设置P的数量为1,即可达到定序输出的目的:

func main() {
    runtime.GOMAXPROCS(1)
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            fmt.Println(n)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

clipboard.png
不过这里我还是有一点不明白的是,9为什么是第一个被输出的?我猜大概是跟GMP调度有关的。目前还不明白,有知道的同学可以指点我一下,谢谢。

以上如有错误,欢迎指出。

查看原文

赞 2 收藏 1 评论 0

Airy 发布了文章 · 2020-05-03

使用 gitlab 实现 proto 文件的 semantic version 管理(2) - 配置篇

最终目标:

  1. 所有 proto 文件的改动都体现在版本号中;
  2. 开发者不需要手动编译 proto 文件;
  3. 同一个版本号在各个语言中是通用的;

配置方案:

  1. 配置 gitlab CI,实现 merge request 通过之后自动打包并生成版本号;
  2. 每个版本号生成之后自动生成对应语言的接口定义和 message 文件;
  3. 配置 Dockerfile, 在生产环境构建时使用对应版本号的包;

操作步骤:

1. 配置 gitlab CI runner

         gitlab 的 CI  runner 可以自行查找官方文档配置。配置完成之后在 proto 文件仓库中添加 `.gitlab-ci.yaml` 文件,大概内容如下:

image.png

整个 ci 过程就是使用 proto 文件生成各种语言的代码文件,然后打 tag 并 push 到仓库中。我使用了下面的代码来生成版本号,保证十年内不会重复:


year=$(date +"%y" --date="+1 year" | sed "s/2//1")
v="v1.\${year}.\$CI\_JOB\_ID”
git tag $v
git push release $v

其中 `\$D_PRIVATE_KEY` 、`$D_PUB_KEY` 是生成的 rsa key pair ,专门用来构建。在这里配置:

image.png

同时我们需要调用 gitlab 的 api 发布 release。其中 release 的 API 可以参见官网,为了方便调用我用 Go 写了一个 cmd 编译好二进制文件,然后直接调用。代码在: https://github.com/airylinus/releaser 。生成 release 的目地是为了保证 repeatable build, git tag 是可以删除的但是 release 不可以。 最终效果如下

image.png

image.png

2. 配置 Go 应用的 Dockerfile,使用生成的包

因为 Go 的 1.13 以后版本支持了私有仓库配置,切 gitlab 支持 go get。所以我们只要 release 了,就可以保证这个版本的代码是一直可用的。唯一需要配置的就是 build 的时候需要 gitlab 访问权限。这里使用了粗暴的方式配置。

image.png

3. 配置和 deploy maven 包

需要一些 maven 的配置文件,可以事先做好模板放到仓库中

mv ./protoc-gen-grpc-java-1.22.3-linux-x86_64.exe ./grpc-java.bin

chmod +x ./protoc ./protoc-gen-go ./grpc-java.bin

protoc -I protos/ -I ./ --go_out=plugins=grpc:../protos/*.proto

echo -n "Build for Java"

rm -rf ./${project}

mkdir -p ./${project}/src/main/java

protoc -I protos/ --plugin=protoc-gen-grpc-java=./grpc-java.bin --grpc-java_out=./${project}/src/main/java --java_out=./${project}/src/main/java protos/*.proto

sed "s/__VERSION__/${v}/g" ./tools/template-pom.xml | sed "s/__PROJECT__/${project}/g" > ./${project}/pom.xml

mkdir ~/.m2/

rm -rf /usr/local/maven3/conf/settings.xml

cp ./tools/settings.xml /usr/local/maven3/conf/

cd ./${project}

mvn compile package deploy -e

至此所有链路都配置好了,后面按照规范使用即可。

查看原文

赞 0 收藏 0 评论 2

Airy 发布了文章 · 2020-05-02

使用 gitlab 实现 proto 文件的 semantic version 管理(1) - 使用规范

前言

历史原因我司的后端团队在同时使用 3 种语言:Python、Go、Java。为了实现团队的水平拆分和业务逻辑的收敛,我们会在不同的业务线间使用 unary 模式的 gRPC 来进行同步通信。 例如报名业务中会存在一个查询某个用户所有报名记录的接口,这个接口是 Python 语言开发的,在不同的业务场景上 Go、Java 都会调用这个接口。这就导致该接口的 protobuf 文件由 Python 开发者维护,同时 Go 和 Java 的 client 需要同步变更以使用最新版本的接口。一旦接口需要升级,协调 server 端有发布和升级,proto 文件仓库的变更,client 端变更发布就变成了一个很麻烦的事情。我们需要一个方便透明的规范来协调各端。

一. 解决方案对比和选择

1. 可选方案

A. 在不同的 server / client 代码项目中复制 proto 文件并各自生成对应语言的包;
B. 使用 git submodule 使用同一个仓库的 proto 文件
C. 使用 semantic version 并使用第三方的方式管理 proto 文件

2. 损益分析

截屏2020-05-02 下午5.55.10.png

二. 如何创建新的 Proto 仓库

1. 创建新的 Proto 仓库

  • 每个后端项目对应新建一个 proto 仓库,proto 文件不再跟项目代码放到一起。
  • proto 三方仓库分 dev/master 两个分支;
  • master 分支只能通过提 merge request 的方式提交新代码
  • 所有 release 都从 master 打 tag 生成

2. Proto 语言文件的规范

  • proto 文件遵循只增不减的原则
  • proto 文件中的接口遵循只增不减的原则
  • proto 文件中的 message 字段遵循只增不减的原则
  • proto 文件中的 message 字段类型和序号不得修改

四. Proto 文件仓库的使用流程

1. Proto 文件仓库的修改

  • 从 master 检出分支,feature/abc123,添加字段或者接口然后合并到 dev
  • 从 dev 提 merge request 到 master 分支
  • 合并 merge 之后稍等 3 分钟(gitlab CI 会自动生成各种语言的包并发布)就会得到一个版本号: v1.1.3243 (去 git.mycompany.com/proto/{project}/-/releases 查看发布的的版本)项目 、版本

2. 在各个语言的包管理工具中使用对应的 version 引用生成的 proto 文件

  • 在 Go 项目中修改 go mod ,添加第三方依赖并指定版本
import (
    gitlab.company.com/proto/abc v1.1.3243
)
  • Python 使用 proto 项目对应版本
pip install https://gitlab.mycompany.com/proto/abc@v1.1.3243
  • Java 直接使用私有 maven 仓库中的包和对应版本

3. Go mod 中 proto 包的 import 和使用

  • 配置 git config
git config --global url."git@git.mycompany.com".instead of "http://git.mycompany.com"
git config --global url."git@git.mycompany.com".instead of "https://git.mycompany.com"
  • 设置 Go 环境  

    • 升级 go 版本到 1.13 或以上版本
    • 配置环境变量:
    GOPROXY="https://goproxy.io,direct"
                
    GOPRIVATE="git.mycompany.com"
    • 使用 go build -v  可以方便的排查和定位问题

----------- EOF ----------
转载请注明原链接,谢谢。

相关文章:
使用 gitlab 实现 proto 文件的 semantic version 管理(2) - 配置篇

查看原文

赞 3 收藏 1 评论 0

Airy 发布了文章 · 2020-04-18

超简单的算法:千位符号添加

题目:

给一个整数,每三位添加一个逗号展示。例如:1234 输出: 1,234。

答案:

  1. 取模 1000,得后三位,依次取模。得到三位数字为元素的数组
  2. 拼接上面的数组,添加符号打印。

Javascript 实现:

var a = 1234567;
function formateNumber(integer) {
    var p = integer;
    var width = 1000;
    var odds = [];
    while(p > 0) {
        odds.push(p % width);
        p = parseInt(p / width);
    }
    return odds.reverse().join(',')
}
console.log(formateNumber(a))
查看原文

赞 3 收藏 2 评论 4

Airy 收藏了文章 · 2020-04-15

golang协程池设计

Why Pool

go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量

for {
        // 监听tcp
        rw, e := l.Accept()
        if e != nil {
            .......
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) // before Serve can return
        // 启动协程处理上下文
        go c.serve(ctx)
}

虽然创建一个groutine占用的内存极小(大约2KB左右,线程通常2M左右),但是在实际生产环境无限制的开启协程显然是不科学的,比如上图的逻辑,如果来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终导致内存溢出乃至严重的崩溃,所以本文将通过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。

一个简单的协程池

过年前做过一波小需求,是将主播管理系统中信息不完整的主播找出来然后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每一个主播的数据都要访问一次直播平台所以就用应对每一个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能造成groutine性能瓶颈的地步,但是心里总是不舒服,于是放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实非常简单,用一个channel当做任务队列,初始化groutine池时确定好并发量,然后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型如下图
图片描述

实现

type SimplePool struct {
    wg   sync.WaitGroup
    work chan func() //任务队列
}

func NewSimplePoll(workers int) *SimplePool {
    p := &SimplePool{
        wg:   sync.WaitGroup{},
        work: make(chan func()),
    }
    p.wg.Add(workers)
    //根据指定的并发量去读取管道并执行
    for i := 0; i < workers; i++ {
        go func() {
            defer func() {
                // 捕获异常 防止waitGroup阻塞
                if err := recover(); err != nil {
                    fmt.Println(err)
                    p.wg.Done()
                }
            }()
            // 从workChannel中取出任务执行
            for fn := range p.work {
                fn()
            }
            p.wg.Done()
        }()
    }
    return p
}
// 添加任务
func (p *SimplePool) Add(fn func()) {
    p.work <- fn
}

// 执行
func (p *SimplePool) Run() {
    close(p.work)
    p.wg.Wait()
}

测试

测试设定为在并发数量为20的协程池中并发抓取一百个人的信息, 因为代码包含较多业务逻辑所以sleep 1秒模拟爬虫过程,理论上执行时间为5秒

func TestSimplePool(t *testing.T) {
    p := NewSimplePoll(20)
    for i := 0; i < 100; i++ {
        p.Add(parseTask(i))
    }
    p.Run()
}

func parseTask(i int) func() {
    return func() {
        // 模拟抓取数据的过程
        time.Sleep(time.Second * 1)
        fmt.Println("finish parse ", i)
    }
}

图片描述

这样一来最简单的一个groutine池就完成了

go-playground/pool

上面的groutine池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,所以又去看了一下go-playground/pool的源码实现,先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。

// 需要加入pool 中执行的任务
type WorkFunc func(wu WorkUnit) (interface{}, error)

// 工作单元
type workUnit struct {
    value      interface{}    // 任务结果 
    err        error          // 任务的报错
    done       chan struct{}  // 通知任务完成
    fn         WorkFunc    
    cancelled  atomic.Value   // 任务是否被取消
    cancelling atomic.Value   // 是否正在取消任务
    writing    atomic.Value   // 任务是否正在执行
}

接下来看Pool的结构

type limitedPool struct {
    workers uint            // 并发量 
    work    chan *workUnit  // 任务channel
    cancel  chan struct{}   // 用于通知结束的channel
    closed  bool            // 是否关闭
    m       sync.RWMutex    // 读写锁,主要用来保证 closed值的并发安全
}

初始化groutine池, 以及启动设定好数量的groutine

// 初始化pool,设定并发量
func NewLimited(workers uint) Pool {
    if workers == 0 {
        panic("invalid workers '0'")
    }
    p := &limitedPool{
        workers: workers,
    }
    p.initialize()
    return p
}

func (p *limitedPool) initialize() {
    p.work = make(chan *workUnit, p.workers*2)
    p.cancel = make(chan struct{})
    p.closed = false
    for i := 0; i < int(p.workers); i++ {
        // 初始化并发单元
        p.newWorker(p.work, p.cancel)
    }
}

// passing work and cancel channels to newWorker() to avoid any potential race condition
// betweeen p.work read & write
func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) {
    go func(p *limitedPool) {

        var wu *workUnit

        defer func(p *limitedPool) {
            // 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动
            if err := recover(); err != nil {

                trace := make([]byte, 1<<16)
                n := runtime.Stack(trace, true)

                s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))

                iwu := wu
                iwu.err = &ErrRecovery{s: s}
                close(iwu.done)

                // need to fire up new worker to replace this one as this one is exiting
                p.newWorker(p.work, p.cancel)
            }
        }(p)

        var value interface{}
        var err error

        for {
            select {
            // workChannel中读取任务
            case wu = <-work:

                // 防止channel 被关闭后读取到零值
                if wu == nil {
                    continue
                }

                // 先判断任务是否被取消
                if wu.cancelled.Load() == nil {
                    // 执行任务
                    value, err = wu.fn(wu)
                    wu.writing.Store(struct{}{})
                    
                    // 任务执行完在写入结果时需要再次检查工作单元是否被取消,防止产生竞争条件
                    if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
                        wu.value, wu.err = value, err
                        close(wu.done)
                    }
                }
            // pool是否被停止
            case <-cancel:
                return
            }
        }

    }(p)
}

往POOL中添加任务,并检查pool是否关闭

func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {
    w := &workUnit{
        done: make(chan struct{}),
        fn:   fn,
    }

    go func() {
        p.m.RLock()
        if p.closed {
            w.err = &ErrPoolClosed{s: errClosed}
            if w.cancelled.Load() == nil {
                close(w.done)
            }
            p.m.RUnlock()
            return
        }
        // 将工作单元写入workChannel, pool启动后将由上面newWorker函数中读取执行
        p.work <- w
        p.m.RUnlock()
    }()

    return w
}

在go-playground/pool包中, limitedPool的批量并发执行还需要借助batch.go来完成

// batch contains all information for a batch run of WorkUnits
type batch struct {
    pool    Pool          // 上面的limitedPool实现了Pool interface
    m       sync.Mutex    // 互斥锁,用来判断closed
    units   []WorkUnit    // 工作单元的slice, 这个主要用在不设并发限制的场景,这里忽略
    results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取
    done    chan struct{} // 通知batch是否完成
    closed  bool
    wg      *sync.WaitGroup
}
//  go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将之前创建好的Pool传入newBatch
func newBatch(p Pool) Batch {
    return &batch{
        pool:    p,
        units:   make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
        results: make(chan WorkUnit),
        done:    make(chan struct{}),
        wg:      new(sync.WaitGroup),
    }
}

// 往批量任务中添加workFunc任务
func (b *batch) Queue(fn WorkFunc) {

    b.m.Lock()
    if b.closed {
        b.m.Unlock()
        return
    }
    //往上述的limitPool中添加workFunc
    wu := b.pool.Queue(fn)

    b.units = append(b.units, wu) // keeping a reference for cancellation purposes
    b.wg.Add(1)
    b.m.Unlock()
    
    // 执行完后将workUnit写入结果集channel
    go func(b *batch, wu WorkUnit) {
        wu.Wait()
        b.results <- wu
        b.wg.Done()
    }(b, wu)
}

// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
func (b *batch) QueueComplete() {
    b.m.Lock()
    b.closed = true
    close(b.done)
    b.m.Unlock()
}

// 获取批量任务结果集
func (b *batch) Results() <-chan WorkUnit {
    go func(b *batch) {
        <-b.done
        b.m.Lock()
        b.wg.Wait()
        b.m.Unlock()
        close(b.results)
    }(b)
    return b.results
}

测试

func SendMail(int int) pool.WorkFunc {
    fn := func(wu pool.WorkUnit) (interface{}, error) {
        // sleep 1s 模拟发邮件过程
        time.Sleep(time.Second * 1)
        // 模拟异常任务需要取消
        if int == 17 {
            wu.Cancel()
        }
        if wu.IsCancelled() {
            return false, nil
        }
        fmt.Println("send to", int)
        return true, nil
    }
    return fn
}

func TestBatchWork(t *testing.T) {
    // 初始化groutine数量为20的pool
    p := pool.NewLimited(20)
    defer p.Close()
    batch := p.Batch()
    // 设置一个批量任务的过期超时时间
    t := time.After(10 * time.Second)
    go func() {
        for i := 0; i < 100; i++ {
            batch.Queue(SendMail(i))
        }
        batch.QueueComplete()
    }()
    // 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行
    r := batch.Results()
LOOP:
    for {
        select {
        case <-t:
        // 登台超时通知
            fmt.Println("recived timeout")
            break LOOP
     
        case email, ok := <-r:
        // 读取结果集
            if ok {
                if err := email.Error(); err != nil {
                    fmt.Println("err", err.Error())
                }
                fmt.Println(email.Value())
            } else {
                fmt.Println("finish")
                break LOOP
            }
        }
    }
}

    

图片描述
图片描述
接近理论值5s, 通知模拟被取消的work也正常取消

go-playground/pool在比起之前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。但是,但是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。所以我又去看了一下ants的设计和实现。

ants

ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各种池化技术(这个日后再开新坑去读源码), ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

先看一下ants的pool 结构体 (pool.go)

type Pool struct {
    // 协程池的容量 (groutine数量的上限)
    capacity int32
    // 正在执行中的groutine
    running int32
    // 过期清理间隔时间
    expiryDuration time.Duration
    // 当前可用空闲的groutine
    workers []*Worker
    // 表示pool是否关闭
    release int32
    // lock for synchronous operation.
    lock sync.Mutex
    // 用于控制pool等待获取可用的groutine
    cond *sync.Cond
    // 确保pool只被关闭一次
    once sync.Once
    // worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度
    workerCache sync.Pool
    // pool引发panic时的执行函数
    PanicHandler func(interface{})
}

接下来看pool的工作单元 worker (worker.go)

type Worker struct {
    // worker 所属的poo;
    pool *Pool
    // 任务队列
    task chan func()
    // 回收时间,即该worker的最后一次结束运行的时间
    recycleTime time.Time
}

执行worker的代码 (worker.go)

func (w *Worker) run() {
    // pool中正在执行的worker数+1
    w.pool.incRunning()
    go func() {
        defer func() {
            if p := recover(); p != nil {
                //若worker因各种问题引发panic, 
                //pool中正在执行的worker数 -1,         
                //如果设置了Pool中的PanicHandler,此时会被调用
                w.pool.decRunning()
                if w.pool.PanicHandler != nil {
                    w.pool.PanicHandler(p)
                } else {
                    log.Printf("worker exits from a panic: %v", p)
                }
            }
        }()
        
        // worker 执行任务队列
        for f := range w.task {
            //任务队列中的函数全部被执行完后,
            //pool中正在执行的worker数 -1, 
            //将worker 放回对象池
            if f == nil {
                w.pool.decRunning()
                w.pool.workerCache.Put(w)
                return
            }
            f()
            //worker 执行完任务后放回Pool 
            //使得其余正在阻塞的任务可以获取worker
            w.pool.revertWorker(w)
        }
    }()
}

了解了工作单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)

// 向pool提交任务
func (p *Pool) Submit(task func()) error {
    if 1 == atomic.LoadInt32(&p.release) {
        return ErrPoolClosed
    }
    // 获取pool中的可用worker并向其任务队列中写入任务
    p.retrieveWorker().task <- task
    return nil
}


// **核心代码** 获取可用worker
func (p *Pool) retrieveWorker() *Worker {
    var w *Worker

    p.lock.Lock()
    idleWorkers := p.workers
    n := len(idleWorkers) - 1
  // 当前pool中有可用worker, 取出(队尾)worker并执行
    if n >= 0 {
        w = idleWorkers[n]
        idleWorkers[n] = nil
        p.workers = idleWorkers[:n]
        p.lock.Unlock()
    } else if p.Running() < p.Cap() {
        p.lock.Unlock()
        // 当前pool中无空闲worker,且pool数量未达到上线
        // pool会先从临时对象池中寻找是否有已完成任务的worker,
        // 若临时对象池中不存在,则重新创建一个worker并将其启动
        if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
            w = cacheWorker.(*Worker)
        } else {
            w = &Worker{
                pool: p,
                task: make(chan func(), workerChanCap),
            }
        }
        w.run()
    } else {
        // pool中没有空余worker且达到并发上限
        // 任务会阻塞等待当前运行的worker完成任务释放会pool
        for {
            p.cond.Wait() // 等待通知, 暂时阻塞
            l := len(p.workers) - 1
            if l < 0 {
                continue
            }
            // 当有可用worker释放回pool之后, 取出
            w = p.workers[l]
            p.workers[l] = nil
            p.workers = p.workers[:l]
            break
        }
        p.lock.Unlock()
    }
    return w
}

// 释放worker回pool
func (p *Pool) revertWorker(worker *Worker) {
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.workers = append(p.workers, worker)
    // 通知pool中已经获取锁的groutine, 有一个worker已完成任务
    p.cond.Signal()
    p.lock.Unlock()
}

在批量并发任务的执行过程中, 如果有超过5纳秒(ants中默认worker过期时间为5ns)的worker未被分配新的任务,则将其作为过期worker清理掉,从而保证pool中可用的worker都能发挥出最大的作用以及将任务分配得更均匀
(pool.go)

// 该函数会在pool初始化后在协程中启动
func (p *Pool) periodicallyPurge() {
    // 创建一个5ns定时的心跳
    heartbeat := time.NewTicker(p.expiryDuration)
    defer heartbeat.Stop()

    for range heartbeat.C {
        currentTime := time.Now()
        p.lock.Lock()
        idleWorkers := p.workers
        if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
            p.lock.Unlock()
            return
        }
        n := -1
        for i, w := range idleWorkers {
            // 因为pool 的worker队列是先进后出的,所以正序遍历可用worker时前面的往往里当前时间越久
            if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                break
            }    
            // 如果worker最后一次运行时间距现在超过5纳秒,视为过期,worker收到nil, 执行上述worker.go中 if n == nil 的操作
            n = i
            w.task <- nil
            idleWorkers[i] = nil
        }
        if n > -1 {
            // 全部过期
            if n >= len(idleWorkers)-1 {
                p.workers = idleWorkers[:0]
            } else {
            // 部分过期
                p.workers = idleWorkers[n+1:]
            }
        }
        p.lock.Unlock()
    }
}

测试

func TestAnts(t *testing.T) {
    wg := sync.WaitGroup{}
    pool, _ := ants.NewPool(20)
    defer pool.Release()
    for i := 0; i < 100; i++ {
        wg.Add(1)
        pool.Submit(sendMail(i, &wg))
    }
    wg.Wait()
}

func sendMail(i int, wg *sync.WaitGroup) func() {
    return func() {
        time.Sleep(time.Second * 1)
        fmt.Println("send mail to ", i)
        wg.Done()
    }
}

图片描述
这里虽只简单的测试批量并发任务的场景, 如果大家有兴趣可以去看看ants的压力测试, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。

借用ants作者的原话来说:
然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情…

Over

一口气从简单到复杂总结了三个协程池的实现,受益匪浅, 感谢各开源库的作者, 虽然世界上没有龙,但是屠龙技是必须练的,因为它就像存款,不一定要全部都用了,但是一定不能没有!

查看原文

Airy 赞了文章 · 2020-04-15

golang协程池设计

Why Pool

go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量

for {
        // 监听tcp
        rw, e := l.Accept()
        if e != nil {
            .......
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) // before Serve can return
        // 启动协程处理上下文
        go c.serve(ctx)
}

虽然创建一个groutine占用的内存极小(大约2KB左右,线程通常2M左右),但是在实际生产环境无限制的开启协程显然是不科学的,比如上图的逻辑,如果来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终导致内存溢出乃至严重的崩溃,所以本文将通过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。

一个简单的协程池

过年前做过一波小需求,是将主播管理系统中信息不完整的主播找出来然后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每一个主播的数据都要访问一次直播平台所以就用应对每一个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能造成groutine性能瓶颈的地步,但是心里总是不舒服,于是放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实非常简单,用一个channel当做任务队列,初始化groutine池时确定好并发量,然后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型如下图
图片描述

实现

type SimplePool struct {
    wg   sync.WaitGroup
    work chan func() //任务队列
}

func NewSimplePoll(workers int) *SimplePool {
    p := &SimplePool{
        wg:   sync.WaitGroup{},
        work: make(chan func()),
    }
    p.wg.Add(workers)
    //根据指定的并发量去读取管道并执行
    for i := 0; i < workers; i++ {
        go func() {
            defer func() {
                // 捕获异常 防止waitGroup阻塞
                if err := recover(); err != nil {
                    fmt.Println(err)
                    p.wg.Done()
                }
            }()
            // 从workChannel中取出任务执行
            for fn := range p.work {
                fn()
            }
            p.wg.Done()
        }()
    }
    return p
}
// 添加任务
func (p *SimplePool) Add(fn func()) {
    p.work <- fn
}

// 执行
func (p *SimplePool) Run() {
    close(p.work)
    p.wg.Wait()
}

测试

测试设定为在并发数量为20的协程池中并发抓取一百个人的信息, 因为代码包含较多业务逻辑所以sleep 1秒模拟爬虫过程,理论上执行时间为5秒

func TestSimplePool(t *testing.T) {
    p := NewSimplePoll(20)
    for i := 0; i < 100; i++ {
        p.Add(parseTask(i))
    }
    p.Run()
}

func parseTask(i int) func() {
    return func() {
        // 模拟抓取数据的过程
        time.Sleep(time.Second * 1)
        fmt.Println("finish parse ", i)
    }
}

图片描述

这样一来最简单的一个groutine池就完成了

go-playground/pool

上面的groutine池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,所以又去看了一下go-playground/pool的源码实现,先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。

// 需要加入pool 中执行的任务
type WorkFunc func(wu WorkUnit) (interface{}, error)

// 工作单元
type workUnit struct {
    value      interface{}    // 任务结果 
    err        error          // 任务的报错
    done       chan struct{}  // 通知任务完成
    fn         WorkFunc    
    cancelled  atomic.Value   // 任务是否被取消
    cancelling atomic.Value   // 是否正在取消任务
    writing    atomic.Value   // 任务是否正在执行
}

接下来看Pool的结构

type limitedPool struct {
    workers uint            // 并发量 
    work    chan *workUnit  // 任务channel
    cancel  chan struct{}   // 用于通知结束的channel
    closed  bool            // 是否关闭
    m       sync.RWMutex    // 读写锁,主要用来保证 closed值的并发安全
}

初始化groutine池, 以及启动设定好数量的groutine

// 初始化pool,设定并发量
func NewLimited(workers uint) Pool {
    if workers == 0 {
        panic("invalid workers '0'")
    }
    p := &limitedPool{
        workers: workers,
    }
    p.initialize()
    return p
}

func (p *limitedPool) initialize() {
    p.work = make(chan *workUnit, p.workers*2)
    p.cancel = make(chan struct{})
    p.closed = false
    for i := 0; i < int(p.workers); i++ {
        // 初始化并发单元
        p.newWorker(p.work, p.cancel)
    }
}

// passing work and cancel channels to newWorker() to avoid any potential race condition
// betweeen p.work read & write
func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) {
    go func(p *limitedPool) {

        var wu *workUnit

        defer func(p *limitedPool) {
            // 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动
            if err := recover(); err != nil {

                trace := make([]byte, 1<<16)
                n := runtime.Stack(trace, true)

                s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))

                iwu := wu
                iwu.err = &ErrRecovery{s: s}
                close(iwu.done)

                // need to fire up new worker to replace this one as this one is exiting
                p.newWorker(p.work, p.cancel)
            }
        }(p)

        var value interface{}
        var err error

        for {
            select {
            // workChannel中读取任务
            case wu = <-work:

                // 防止channel 被关闭后读取到零值
                if wu == nil {
                    continue
                }

                // 先判断任务是否被取消
                if wu.cancelled.Load() == nil {
                    // 执行任务
                    value, err = wu.fn(wu)
                    wu.writing.Store(struct{}{})
                    
                    // 任务执行完在写入结果时需要再次检查工作单元是否被取消,防止产生竞争条件
                    if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
                        wu.value, wu.err = value, err
                        close(wu.done)
                    }
                }
            // pool是否被停止
            case <-cancel:
                return
            }
        }

    }(p)
}

往POOL中添加任务,并检查pool是否关闭

func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {
    w := &workUnit{
        done: make(chan struct{}),
        fn:   fn,
    }

    go func() {
        p.m.RLock()
        if p.closed {
            w.err = &ErrPoolClosed{s: errClosed}
            if w.cancelled.Load() == nil {
                close(w.done)
            }
            p.m.RUnlock()
            return
        }
        // 将工作单元写入workChannel, pool启动后将由上面newWorker函数中读取执行
        p.work <- w
        p.m.RUnlock()
    }()

    return w
}

在go-playground/pool包中, limitedPool的批量并发执行还需要借助batch.go来完成

// batch contains all information for a batch run of WorkUnits
type batch struct {
    pool    Pool          // 上面的limitedPool实现了Pool interface
    m       sync.Mutex    // 互斥锁,用来判断closed
    units   []WorkUnit    // 工作单元的slice, 这个主要用在不设并发限制的场景,这里忽略
    results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取
    done    chan struct{} // 通知batch是否完成
    closed  bool
    wg      *sync.WaitGroup
}
//  go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将之前创建好的Pool传入newBatch
func newBatch(p Pool) Batch {
    return &batch{
        pool:    p,
        units:   make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
        results: make(chan WorkUnit),
        done:    make(chan struct{}),
        wg:      new(sync.WaitGroup),
    }
}

// 往批量任务中添加workFunc任务
func (b *batch) Queue(fn WorkFunc) {

    b.m.Lock()
    if b.closed {
        b.m.Unlock()
        return
    }
    //往上述的limitPool中添加workFunc
    wu := b.pool.Queue(fn)

    b.units = append(b.units, wu) // keeping a reference for cancellation purposes
    b.wg.Add(1)
    b.m.Unlock()
    
    // 执行完后将workUnit写入结果集channel
    go func(b *batch, wu WorkUnit) {
        wu.Wait()
        b.results <- wu
        b.wg.Done()
    }(b, wu)
}

// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
func (b *batch) QueueComplete() {
    b.m.Lock()
    b.closed = true
    close(b.done)
    b.m.Unlock()
}

// 获取批量任务结果集
func (b *batch) Results() <-chan WorkUnit {
    go func(b *batch) {
        <-b.done
        b.m.Lock()
        b.wg.Wait()
        b.m.Unlock()
        close(b.results)
    }(b)
    return b.results
}

测试

func SendMail(int int) pool.WorkFunc {
    fn := func(wu pool.WorkUnit) (interface{}, error) {
        // sleep 1s 模拟发邮件过程
        time.Sleep(time.Second * 1)
        // 模拟异常任务需要取消
        if int == 17 {
            wu.Cancel()
        }
        if wu.IsCancelled() {
            return false, nil
        }
        fmt.Println("send to", int)
        return true, nil
    }
    return fn
}

func TestBatchWork(t *testing.T) {
    // 初始化groutine数量为20的pool
    p := pool.NewLimited(20)
    defer p.Close()
    batch := p.Batch()
    // 设置一个批量任务的过期超时时间
    t := time.After(10 * time.Second)
    go func() {
        for i := 0; i < 100; i++ {
            batch.Queue(SendMail(i))
        }
        batch.QueueComplete()
    }()
    // 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行
    r := batch.Results()
LOOP:
    for {
        select {
        case <-t:
        // 登台超时通知
            fmt.Println("recived timeout")
            break LOOP
     
        case email, ok := <-r:
        // 读取结果集
            if ok {
                if err := email.Error(); err != nil {
                    fmt.Println("err", err.Error())
                }
                fmt.Println(email.Value())
            } else {
                fmt.Println("finish")
                break LOOP
            }
        }
    }
}

    

图片描述
图片描述
接近理论值5s, 通知模拟被取消的work也正常取消

go-playground/pool在比起之前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。但是,但是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。所以我又去看了一下ants的设计和实现。

ants

ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各种池化技术(这个日后再开新坑去读源码), ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

先看一下ants的pool 结构体 (pool.go)

type Pool struct {
    // 协程池的容量 (groutine数量的上限)
    capacity int32
    // 正在执行中的groutine
    running int32
    // 过期清理间隔时间
    expiryDuration time.Duration
    // 当前可用空闲的groutine
    workers []*Worker
    // 表示pool是否关闭
    release int32
    // lock for synchronous operation.
    lock sync.Mutex
    // 用于控制pool等待获取可用的groutine
    cond *sync.Cond
    // 确保pool只被关闭一次
    once sync.Once
    // worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度
    workerCache sync.Pool
    // pool引发panic时的执行函数
    PanicHandler func(interface{})
}

接下来看pool的工作单元 worker (worker.go)

type Worker struct {
    // worker 所属的poo;
    pool *Pool
    // 任务队列
    task chan func()
    // 回收时间,即该worker的最后一次结束运行的时间
    recycleTime time.Time
}

执行worker的代码 (worker.go)

func (w *Worker) run() {
    // pool中正在执行的worker数+1
    w.pool.incRunning()
    go func() {
        defer func() {
            if p := recover(); p != nil {
                //若worker因各种问题引发panic, 
                //pool中正在执行的worker数 -1,         
                //如果设置了Pool中的PanicHandler,此时会被调用
                w.pool.decRunning()
                if w.pool.PanicHandler != nil {
                    w.pool.PanicHandler(p)
                } else {
                    log.Printf("worker exits from a panic: %v", p)
                }
            }
        }()
        
        // worker 执行任务队列
        for f := range w.task {
            //任务队列中的函数全部被执行完后,
            //pool中正在执行的worker数 -1, 
            //将worker 放回对象池
            if f == nil {
                w.pool.decRunning()
                w.pool.workerCache.Put(w)
                return
            }
            f()
            //worker 执行完任务后放回Pool 
            //使得其余正在阻塞的任务可以获取worker
            w.pool.revertWorker(w)
        }
    }()
}

了解了工作单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)

// 向pool提交任务
func (p *Pool) Submit(task func()) error {
    if 1 == atomic.LoadInt32(&p.release) {
        return ErrPoolClosed
    }
    // 获取pool中的可用worker并向其任务队列中写入任务
    p.retrieveWorker().task <- task
    return nil
}


// **核心代码** 获取可用worker
func (p *Pool) retrieveWorker() *Worker {
    var w *Worker

    p.lock.Lock()
    idleWorkers := p.workers
    n := len(idleWorkers) - 1
  // 当前pool中有可用worker, 取出(队尾)worker并执行
    if n >= 0 {
        w = idleWorkers[n]
        idleWorkers[n] = nil
        p.workers = idleWorkers[:n]
        p.lock.Unlock()
    } else if p.Running() < p.Cap() {
        p.lock.Unlock()
        // 当前pool中无空闲worker,且pool数量未达到上线
        // pool会先从临时对象池中寻找是否有已完成任务的worker,
        // 若临时对象池中不存在,则重新创建一个worker并将其启动
        if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
            w = cacheWorker.(*Worker)
        } else {
            w = &Worker{
                pool: p,
                task: make(chan func(), workerChanCap),
            }
        }
        w.run()
    } else {
        // pool中没有空余worker且达到并发上限
        // 任务会阻塞等待当前运行的worker完成任务释放会pool
        for {
            p.cond.Wait() // 等待通知, 暂时阻塞
            l := len(p.workers) - 1
            if l < 0 {
                continue
            }
            // 当有可用worker释放回pool之后, 取出
            w = p.workers[l]
            p.workers[l] = nil
            p.workers = p.workers[:l]
            break
        }
        p.lock.Unlock()
    }
    return w
}

// 释放worker回pool
func (p *Pool) revertWorker(worker *Worker) {
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.workers = append(p.workers, worker)
    // 通知pool中已经获取锁的groutine, 有一个worker已完成任务
    p.cond.Signal()
    p.lock.Unlock()
}

在批量并发任务的执行过程中, 如果有超过5纳秒(ants中默认worker过期时间为5ns)的worker未被分配新的任务,则将其作为过期worker清理掉,从而保证pool中可用的worker都能发挥出最大的作用以及将任务分配得更均匀
(pool.go)

// 该函数会在pool初始化后在协程中启动
func (p *Pool) periodicallyPurge() {
    // 创建一个5ns定时的心跳
    heartbeat := time.NewTicker(p.expiryDuration)
    defer heartbeat.Stop()

    for range heartbeat.C {
        currentTime := time.Now()
        p.lock.Lock()
        idleWorkers := p.workers
        if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
            p.lock.Unlock()
            return
        }
        n := -1
        for i, w := range idleWorkers {
            // 因为pool 的worker队列是先进后出的,所以正序遍历可用worker时前面的往往里当前时间越久
            if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                break
            }    
            // 如果worker最后一次运行时间距现在超过5纳秒,视为过期,worker收到nil, 执行上述worker.go中 if n == nil 的操作
            n = i
            w.task <- nil
            idleWorkers[i] = nil
        }
        if n > -1 {
            // 全部过期
            if n >= len(idleWorkers)-1 {
                p.workers = idleWorkers[:0]
            } else {
            // 部分过期
                p.workers = idleWorkers[n+1:]
            }
        }
        p.lock.Unlock()
    }
}

测试

func TestAnts(t *testing.T) {
    wg := sync.WaitGroup{}
    pool, _ := ants.NewPool(20)
    defer pool.Release()
    for i := 0; i < 100; i++ {
        wg.Add(1)
        pool.Submit(sendMail(i, &wg))
    }
    wg.Wait()
}

func sendMail(i int, wg *sync.WaitGroup) func() {
    return func() {
        time.Sleep(time.Second * 1)
        fmt.Println("send mail to ", i)
        wg.Done()
    }
}

图片描述
这里虽只简单的测试批量并发任务的场景, 如果大家有兴趣可以去看看ants的压力测试, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。

借用ants作者的原话来说:
然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情…

Over

一口气从简单到复杂总结了三个协程池的实现,受益匪浅, 感谢各开源库的作者, 虽然世界上没有龙,但是屠龙技是必须练的,因为它就像存款,不一定要全部都用了,但是一定不能没有!

查看原文

赞 48 收藏 36 评论 6

Airy 发布了文章 · 2020-04-01

软件研发的 QSS 悖论

三要素只能取其中两个,不可能同时取三个。

  • high Quality 交付质量高
  • large Scale 研发人员的规模大
  • fast Speed 研发速度快

研发是一个多人协作的过程,同时技术水平越高的人交付质量和速度越高。基于以下公理推断而来:

  • 协作成本定律。人跟人之间的沟通成本是一直存在且同参与沟通的人数呈正相关。如果在两人之间达成共识的概率是 Pr(2),则在三人之间达成共识的概率是每两个人之间互相达成共识的概率: Pr(2) X Pr(2) X Pr(2)。
  • 高水平的研发人员有高交付质量和高研发速度。低水平的研发人员有较低的交付质量,较低的研发速度。

理论推导如下:

  • 假设交付质量高且研发速度快,那么必然是共识达成的概率大,那么沟通效率高,参与沟通的人很少。这跟“研发人员规模大” 矛盾。
  • 假设研发人员多,研发速度快。因为人多,必然沟通成本高,沟通需要的是时间,交付速度快的情况下只能有一种解释,上线的产品中隐藏了大量沟通不充分造成的问题,因此交付质量必然差。
  • 依次类推即可得出 CAP 理论类似的定理。
查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 202 次点赞
  • 获得 45 枚徽章 获得 6 枚金徽章, 获得 15 枚银徽章, 获得 24 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

注册于 2011-06-07
个人主页被 3.4k 人浏览