Martin91

Martin91 查看完整档案

深圳编辑华南理工大学  |  软件工程 编辑Shopee  |  Expert Engineer 编辑 hackerpie.com 编辑
编辑

__

( Nice to meet you )
    o   ^__^
     o  (oo)\_______
        (__)\       )\/\
            ||----w |
            ||     ||

个人动态

Martin91 发布了文章 · 10月28日

《Paxos Made Simple》中文翻译:Paxos 如此简单

写在前面

个人在学习理解 Paxos 算法的过程中,花了比较多的时间,从最开始直接查看中文博客资料,感觉都是看完不知所以然或者有很多疑问,于是决定死磕《Paxos Made Simple》论文原文。但是由于有些英文的意思我自己理解起来还是有点困惑,于是过程中遇到无法理解的内容,一方面是会翻阅前辈们已经写过的论文的翻译作为参考,二是在搜索引擎里就一些难以理解的点搜索中英文的讨论,以此解决自己心中的困惑。在磕磕碰碰中完成论文的阅读之后,仍有一些不尽透彻之处,加上个人认为此论文已有的翻译质量参差不齐,所以斗胆想通过翻译以及必要译注再次加深自己的理解,另外可能的话,也希望本次翻译能够帮助到未来可能会遇到和我一样困惑的人。

部分关键术语表

论文中有一些关键术语,我已经力求用词准确,并在论文中尽力保持术语翻译的一致性,目的是尽量充分传达论文本身用词的精准,建议读者可先仔细阅读此表。

原文术语翻译中使用术语译者注
value(s)值可能比较抽象,觉得太抽象的读者建议理解为提案的“内容”亦可
learn获知有些文章译作“了解”或者“学习”,但是这里反复斟酌,还是觉得“获知”更贴切,目的性更强烈
propose提议
chosen选定一个被选定的值,意味着一个被“一致”确认下来的值
agent代理依旧觉得翻译成“代理”过于字面化
proposer(s)提议者
acceptor(s)接受者
learner(s)学习者
fail / failure失效/故障意味着系统已经完全不能工作
proposal提案
accept接受
number编号
distinguished特定的文中用于形容某个经过选举而被选中的角色

翻译全文

Paxos 如此简单

2001年11月1日

摘要

当用浅显易懂的英语来表达的话,Paxos 是非常简单的。

1 导引

Paxos 算法——一个用于实现一个容忍错误的分布式系统的算法,让很多人觉得难以理解,这可能是因为对于很多读者们而言,原来的表述太过于让人摸不着头脑1了。事实上,它是最简单浅显的分布式算法。它的核心是一个一致性算法——“synod”算法2。在下一节中,将会看到这个一致性算法不可避免地遵循一些我们希望它能够满足的特性。最后一节阐述了完整的 Paxos 算法,这个算法是通过将一致性(的实现)直接应用到用于构建分布式系统的(多副本)状态机3这种方法中得到的——这种所谓的方法应该是众所周知的,因为它是分布式系统理论中最常被引用的论文的主题。

2 一致性算法

2.1 问题描述

假设有一个由多个进程组成的集合,集合里的每个进程都可以提议(可能不同的)值4。一致性算法保证在被提议的这些值中只有一个值能够被选定。一旦一个值被选定,则所有进程都需要能够获知(learn)这个被选定的值。一致性的安全性要求做到:

  • 只有被提议的值才可以被选定,
  • 只能有一个值能被选定,
  • 只有一个值真的已经“确定”被选定,进程才能获知这个值已被选定5(译注:只有一致同意后的值,且不可能会被推翻,才能够周知给集合里的进程,结果就能使所有进程达成共识)

我们不会尝试去明确精准的活性要求6。无论如何,(算法的)目标是要保证被提议的值中有某个值能够被选定,并且一旦一个值被选定了,进程最终能够获知这个被选定的值。

我们让三类代理(agent)来执行这个一致性算法中的三个角色:提议者(proposers)、接受者(acceptors)以及学习者(learners)。在实际实现中,一个独立的进程可以充当不止一个代理,但是从代理到进程之间的映射关系不是我们这里关注的重点。

设想代理之间可以通过发送消息的方式相互通信。我们使用传统的异步(模型),而不是拜占庭问题模型,也就是说:

  • 代理以任意速度运行,可能因停止而失效(指不能正常工作),也可能重启。由于所有代理都有可能在一个值被选定之后失效再接着重启,除非失效或者重启的代理能够记住一些关键信息,否则没有任何解决方案。
  • 消息发送的长度可以是任意的,消息也可以重复或者丢失,但消息不会被篡改7

2.2 值的选定

选定一个值的最简单的方式就是只有一个接受者的代理:一个提议者将一个提案发送给这个接受者,而后者直接选定它收到的第一个提议的值即可。这种方案虽然简单,却是无法叫人满意的,因为这个(唯一的)接受者一旦失效,将导致后续的操作无法继续。

所以,让我们来尝试选定值的另一种方法吧。不再是单一的接受者,我们现在尝试使用多个接受者代理的方式。一个提议者将一个提议的值发送给一群接受者。一个接受者可能接受(accept)这个被提议的值。一旦一个足够大数量的接受者的集合都接受了一个值,那么这个值就可以算是被选定了。多大的数量才算足够大?为了确保有且只有一个值被选定,我们可以让一个所谓足够大的集合等同于这些代理中的“大多数”组成的集合。因为任意两个“大多数”的集合必然拥有至少一个共同的接受者,并且假如一个接受者最多只能接受一个值,这个方法就是行得通的。(在很多的论文中都有对于“大多数”的浅显的概括)

在不考虑(系统)故障或者消息丢失的情况下,我们期望在哪怕只有一个提议者提出值的时候也能选定一个值。这引出了条件:

P1. 接受者必须接受它收到的第一个提案。

但是这个条件引入了另一个问题。在几乎同一时间,多个不同的提议者可能提议多个不同的值,并且发生了一个特殊情况:每个接受者都接受了其中一个值,但是没有任何一个值被接受者中的“大多数”所接受。甚至只有两个提议的值,一旦它们各自被差不多一半的接受者所接受,此时即使只有一个接受者故障都可能使得无法确定该选定哪个提案8

结合P1以及“只有被大多数接受者所接受的值才能被选中”的要求,可以发现隐含条件:必须允许接受者接受不止一个提案。我们使用对每个提案进行(自然)编号的方式来跟踪接受者可以接受的不同的提案,这样的话,每个提案都包含了一个提案编号以及对应的值。为了防止混淆,我们要求不同的提案必须要有不同的编号。至于怎么实现不同的编号则取决于实现的方案9,这里我们只要做假设就好了。当一个带有某个值的提案被大多数的接受者接受了之后,这个值就算是被选定了。在这种场景下,我们可以说这个提案(以及它的值)已经被选定了。

我们可以允许多个提案被选定,但是必须保证所有被选定的提案拥有一样的值。结合提案编号归纳推理,只要保证以下条件就够了:

P2. 如果一个拥有值 v 的提案被选定,则每一个(比这个提案)更高编号且被选定的提案也都拥有值 v

由于(提案)编号是完全有序的,条件 P2 保证了至关重要的安全性属性:只有一个值被选定。

为了被选定,一个提案必须被至少一个接受者所接受。于是,我们通过满足以下条件来满足 P2:

P2a. 如果一个拥有值 v 的提案被选定,则每一个(比这个提案)更高编号且被任意一个接受者接受的提案也都拥有值 v

我们仍需坚持 P1 以保证某个提案能被选定。(并且)因为消息通信是异步的,一个提案可能会被某个从来没有收到过任何提案的特殊接受者 c 所接受。设想一个新的提议者“醒来”并且提议了一个更高编号且值不同的提案的场景。P1要求 c 不得不接受这个提案,但这又会打破 P2a 的条件。为了同时满足 P1 和 P2a,需要对 P2a 做进一步加强:

P2b. 如果一个拥有值 v 的提案被选定,则每一个(比这个提案)更高编号且被任意一个提议者提议的提案也都拥有值 v

由于只有被提议者提议的提案,才可以被接受者接受,所以 P2b 隐含了 P2a,也进一步隐含了 P2。

为了发现如何满足 P2b,我们考虑如何证明它成立。我们先假设某个编号为m,且值为 v 的提案已经被选定,然后证明任何编号为 nn > m)的提案也都拥有值 v。我们可以通过对 n 采用数学归纳法10以使证明过程更轻松,于是在以下额外的假设下可证明编号为 n 的提案拥有值 v

归纳假设:每一个编号在 m..(n-1) 之间的提案都拥有值 v,这里的 i..j 的记法代表从 ij 的一组编号。11

由于编号为 m 的提案已经被选定,那就必然存在一个由“大多数”接受者组成的集合 C,且集合里的每一个接受者都接受了这个提案。结合这个(推理)以及前面的归纳假设,m 被选定的假设则意味着:

集合 C 里的每个接受者都接受了编号在 m..(n-1) 之间的一个提案12,并且被任何一个接受者所接受的每一个编号在 m..(n-1) 之间的提案都拥有值 v。

由于任意一个由大多数接受者组成的集合 S 都必然包含集合 C 的至少一个元素,我们可以通过维护以下不变性以保证编号为 n 的提案拥有值 v

P2c. 对于任意的 v 和 n,如果一个编号为 n 且拥有值 v 的提案被提议,则存在一个由大多数接受者组成的集合 S 满足这里其中一个条件:(a)集合 S 里没有接受者接受了任何一个编号小于 n 的提案;或者是:(b)v 是集合 S 中的接受者已经接受过的所有编号小于 n 的提案中编号最高的提案的值。

因此我们可以通过维护 P2c 的不变性来满足 P2b 的条件。

为了维护 P2c 的不变性,想要提议编号为 n 的提案的提议者必须获知编号小于 n 的最大编号的提案,如果存在这样的提案的话,那它肯定是已经或者即将被大多数接受者所接受的提案。获知已经被接受的提案是足够简单的,但是预测未来哪些(提案会被)接受则是困难的。与其尝试去预测未来,不如让提议者通过获取一个“将不会存在任何一个这样的接受”的承诺来控制这个过程。换句话说,提议者请求接受者们不再接受任何编号比 n 小的提案。这就引出了以下用于提议过程的算法:

  1. 一个提议者选择一个新的提案编号 n,然后给由某些接受者组成的集合中的每一个成员发送一个请求,要求它响应以下信息:
    a. 一个承诺:不再接受任何一个编号比 n 小的提案,并且
    b. 如果它已经有接受过提案的话,则还要返回它已经接受过的编号比 n 小的最大编号的提案13
    我把这样一个请求称之为对编号 nprepare 请求。
  2. 如果提议者从大多数的接受者成功收到期待的响应,则它可以接着提议一个编号为 n 且值为 v 的提案,这里 v 就是它从1b 中收到的响应里最大编号的提案的值,如果所有响应都表明没有接受过任何提案,则提议者可以自由选择一个值。
    提议者通过向一组接受者发送一个请求来提议提案。(这里的这组接受者并不需要和响应 request 请求的接受者一致)。让我们把这个请求称之为 accept 请求。

前面这些内容描述了提议者的算法,但是对于接受者而言又该是怎样子的呢?它可以接收来自提议者的两种请求:prepare 请求和 accept 请求。接受者可以忽略任何请求而不影响安全性。所以,我们需要讨论只在哪些情况下它可以响应请求。它总会响应 prepare 请求;它也可以响应 accept 请求,接受提案,只要它(事先)没有承诺不这样做。换句话说:

P1a. 接受者可以接受编号为 n 的提案,只要它没有响应过编号大于 n 的 prepare 请求

可见 P1a 包含了 P1。

我们现在已经得到了一个足以满足安全性属性的用于选定值的完整算法——在假设提案号唯一的基础上。最终的算法还需要通过额外的一点优化来得到。

设想一个接受者收到了一个编号为 nprepare 请求,但是它已经响应过一个编号比 n 大的 prepare 请求,因此也就承诺了不再接受任何编号为 n 的新的提案。于是接受者没有理由要去响应这个新的 prepare 请求,因为它并不会考虑接受编号为 n 的提案,也就是提议者想要提议的提案。所以我们让接受者直接忽略这样一个 prepare 请求。我们也让接受者直接忽略它已经接受的提案的 prepare 请求。

加上这个优化,接受者只需要记录它已经接受过的最高编号的提案以及它已经响应过的最高编号的 prepare 请求的编号即可。因为无论失败与否,P2c 都必须保持不变,所以接受者必须能够记录这些信息,哪怕它可能崩溃,以及重启。注意提议者总是可以放弃某个提案并且装作什么都没有发生过——只要提议者不会尝试用相同的编号提议另一个提案。

将提议者和接受者的行为都放在一起,我们可以看到这个算法的操作可以分为以下两个阶段:

阶段 1:
(a)提议者选择一个提案编号 n,向“大多数”接受者发送一个带有编号 n 的 prepare 请求;
(b)如果接受者收到一个编号为 n 的 prepare 请求,且 n 比它已经响应过的任何一个 prepare 请求的编号都大,则它会向这个请求回复响应,内容包括:一个不再接受任何编号小于 n 的提案的承诺,以及它已经接受过的最大编号的提案(假如有的话)。

阶段 2:
(a)如果提议者从“大多数”接受者收到了对它前面发出的 prepare 请求的响应,它就会接着给那每一个接受者发送一个针对编号为 n 且值为 v 的提案的 accept 请求,而 v 就是它所收到的响应中最大编号的提案的值,或者是它在所有响应都表明没有接受过任何提案的前提下自由选择的值 v;
(b)如果接受者收到了一个针对编号为 n 的提案的 accept 请求,它就会接受这个请求,除非它之前已经响应过编号大于 n 的 request 请求。

提议者可以提议多个提案,只要它在每一个提案中都遵循上面的算法。它也可以在协议中间的任何时候丢弃提案。(正确性还会被保持,哪怕是对废弃提案的请求或者响应可能在提案被丢弃很久之后才到达目的地)。在某些提议者已经开始尝试提议更高编号的提案的情况下,(尽早)放弃(当前较低编号的)提案或许是一个好的主意。所以,如果接受者由于它自身已经收到了更高编号的 prepare 请求而选择忽略(当前的)prepare 或者 accept 请求,那它应该通知提议者,提议者应该在收到通知后放弃提案。总体而言,这是一个不会影响正确性的性能优化。

2.3 获知选定的值

为了获知值已被选定,学习者必须找出某个已经被大多数接受者接受的提案。最显而易见的算法就是让每一个接受者一旦接受了提案,就响应给所有学习者,并给它们发送接受了的提案信息。这种方法允许学习者们尽可能快地找出被选定的值,但这种方法也要求每个接受者要响应每个学习者——响应的数量等于接受者数量和学习者数量的乘积。

没有拜占庭式错误的这样一个假设使得一个学习者可以很容易地通过其他的学习者来获知某个值已被接受的事实。我们可以让接受者将它们的接受事件响应给某个特定的学习者,这个特定的学习者要负责在每次一个值被选定之后通知其他的多个学习者。这种方法要求所有的学习者花费额外一轮的时间用于获知被选定的值,也降低了可靠性,因为那个特定的学习者可能会故障。但是这个方法要求的响应数量只等于接受者的数量和学习者的数量之和。

更一般的,接受者可以将它们的接受事件响应给由多个特定的学习者组成的某个集合,集合中的每个学习者都会在每次一个值被选定之后通知所有的学习者。使用这样一个较大的特定的学习者组成的集合可以在更大的通信复杂度上提供更大的可靠性。

由于消息丢失,值可能在学习者无法发现的情况下被选定。学习者可以询问接受者:现在已经接受了什么提案?但是接受者的失效可能导致不可能知道是否有一个大多数的(接受者)已经接受了某个特定的提案。在那种场景下,学习者只能在一个新的提案被选定的情况下才能找出哪个值被选定了。如果学习者需要知道一个值是否已经被选定,它可以让提议者使用上面描述的算法提议一个提案即可。

2.4 可进行性

构建这样一个场景是容易的:两个提议者相继提议一系列递增编号的提案,但是没有哪一个提案能被选定。提议者 p 完成了编号 n1 的提案的阶段1,接着另一个提议者 q 也完成了编号 n2n2 > n1)的提案的阶段1.由于接受者已经承诺不会再接受任何编号小于 n2 的新提案,所以提议者 p 在阶段2为提案 n1 发送的 accept 请求会被忽略。所以,提议者 p 又接着开始并且完成了一个新的提案 n3n3 > n2)的阶段1,导致提案 q 的阶段2的 accept 请求也被忽略了,以此类推……

为了保证(过程)可进行,一个特定的提议者必须被选为唯一一个提议提案的。如果这个特定的提议者可以成功地和大多数接受者通信,并且它使用了编号比任何已经使用的编号大的提案,那么它将会成功完成提议,也就是说,提案会被接受。通过在发现某个请求已经使用了更高的提案编号的情况下主动放弃提案然后重试(阶段1),这个特定的提议者终将能够选到一个足够高的提案编号。

如果系统有足够多的组件(提议者、接受者以及通信网络)正常工作,那么就可以通过选举一个单一的特定的提议者来实现活性。Fischer, Lynch 和 Patterson 的著名(实验)结果指出:选举一个提议者的可靠算法必须使用随机性或者实时性——举例来说,使用超时机制。无论如何,不管选举成功或者失败,安全性都是可以保证的。

2.5 实现

Paxos 算法假设了一个多进程组成的网络。在它的一致性算法里,每个进程同时扮演了提议者、接受者和学习者。这个算法也会选定一个 leader,由它同时扮演特定的提议者以及特定的学习者。Paxos 一致性算法正是上面描述的算法,其中请求和响应都作为普通消息发送。(响应的消息都会用对应的提案的编号做标记,以防混淆。)需要持久化存储器在故障时发挥作用,用于维护接受者必须记住的信息。接受者需要在真正发出响应之前在持久化存储上记录它计划的响应。

剩下的就是描述一种能够保证不同的提案不会使用相同的编号提议的机制。只要不同的提议者从不相交的编号集合中选择编号,这两个不同的提议者提议的提案就一定不会拥有相同的编号。每个提议者在稳定的存储上记录各自已经尝试提议的最高编号的提案,然后使用一个比它已经用过的编号更高的提案编号再次开始阶段1的过程。

3 实现一个状态机

实现分布式系统的一种简单方式是作为向中央服务器发出命令的客户端的集合。可以将服务器描述为一个按照时序执行客户端命令的确定状态机。这个状态机拥有一个当前的状态,它通过接受一个命令作为输入来执行一个步骤,然后产生一个相应的输出以及一个新的状态。举个例子:一个分布式的银行系统的客户端可能是出纳员,而状态机的状态则由所有用户的账号余额组成。一个取款操作可以通过运行一个状态机命令来完成:这个命令当且仅当余额大于取款数量的时候才可以扣减账户余额,并生成新旧余额作为输出。

使用单个中央服务器的实现方案会随着服务器的崩溃而失效。于是,我们想到了可以使用一组服务器,每个服务器彼此独立地实现同样的状态机。由于状态机是确定的,所以如果所有服务器都执行了相同的一系列命令,那么所有服务器都将会产生同样的一系列状态以及输出。一个发出命令的客户端则可以任意采用一台服务器生成的输出。

为了保证所有服务器运行相同的一系列状态机命令,我们(需要)实现 Paxos 一致性算法的一系列独立的实例,第 i 个选定的值就是序列中的第 i 个状态机命令。每一个服务器都在每一个实例中扮演这个算法的所有角色(提议者、接受者和学习者)。就现在而言,我假设服务器的集合是固定的,所以这个一致性算法的所有实例使用相同的一群代理。

在正常操作中,一个单独的服务器被选举为了 leader,由它在这个一致性算法的所有实例中扮演特定的提议者(只有它会尝试提议提案)。(多个)客户端发送命令到 leader,leader 决定每个命令的时序。假如 leader 决定某条客户端命令应该是第 135 个命令,那么它就会尝试通过这个一致性算法的第135个实例来提议选定一个提案,命令本身就是这个提案的值。这个过程通常会顺利完成。但它也可能因为故障而失败,或者因为有另一个服务器认为它自己才是 leader 并且它认为第 135 个命令应该另有他值。但是这个一致性算法确保第 135 位上最多只有一个命令能够被选定。

在 Paxos 一致性算法里,这个方法的效率的关键在于,被提议的值要到阶段2才会被选定。回想一下,在完成提议者的算法的阶段1之后,要么要提议的值已经确定下来,要么提议者可以自由提议任何值。

我现在将要描述 Paxos 状态机的实现是如何在正常操作下发挥作用的。稍后的话,我也将会讨论我们可能会遇到什么问题。我考虑的是在前一个 leader 刚发生故障而新的 leader 已经被选举出来的时候,会有什么事情发生。(系统启动是一个特殊场景,这个时候还没有任何命令被提议)

这个新的 leader 也是这个一致性算法的所有实例中的学习者,它应该知道大多数已经被选定的命令。假设它知道 1-134、138 以及 139 号命令,也就是一致性算法的 1-134、138 以及 139 号实例的值。(我们稍后将会看到命令序列中的这样一个空缺是如何产生的。)假设这些执行的结果决定了实例 135 和 140 中提议的值,但是其他实例中没有提议值的约束。leader 执行实例 135 和 140 的阶段2,并因此可以选定 135 和 140 号命令。

leader 自身就像其他向 leader 学习 leader 所知道的所有命令的别的服务器一样,现在可以运行命令 1-135。因为136号和137号命令还没有选定,所以它还不能运行 138-140 号命令,尽管它知道 138-140 号命令。于是,我们让它通过提议将一个特殊的不会导致状态机状态切换的“no-op”命令作为第136号和137号命令(它可以通过执行一致性算法的 136 号和 137 号实例的阶段 2 来完成),以此快速填补空缺。一旦这些 no-op 命令被选定,那 138-140 号命令就可以被执行了。

现在从 1 到 140 的命令都被选定了。 leader 完成了一致性算法中大于 140 的所有实例的阶段 1,它可以在这些(完成阶段1的)实例的阶段2中自由地提议任意的值。它给某个客户端请求的下一个命令分配了 141 号命令,把它作为这个一致性算法的 141 号实例的阶段2提议的值。它接着将它收到的下一个客户端命令提议为第 142 号命令,以此类推。

leader 可以在它获知它提议的 141 号命令已被选定之前提议 142 号命令。它在提议第 141 号命令中发送的所有消息有可能丢失,而第 142 号命令会在其他服务器获知到 leader 提议的第 141 号命令之前被选定。当 leader 在实例 141 中没有收到对它的阶段2的预期响应时,它将会重发这些消息。如果一切顺利,它提议的命令会被选定。无论如何,它还是有可能在前面有失败,在选定的命令的序列上留下一段空缺。一般来说,假设一个 leader 可以提前获得 α 个命令——也就意味着,它可以在 1 到 i 号命令被选定之后提议第 i + 1 到 i + α 号命令。一个多达 α - 1 个命令的空缺可能随之形成。

一个新的被选定的 leader 执行一致性算法中的无限多的实例的阶段1——如果是在上面的场景中,就是实例 135-137,以及所有大于139的实例。让所有实例使用一样的提案编号,它可以通过向其他的服务器发送一个合理的短消息来实现这一点。在阶段1中,接受者当且仅当它已经收到了某个提议者的阶段2的消息的时候,它才会响应不止1个简单的OK。(在这个场景里,这是仅针对实例 135 和 140 的例子。)所以,一个(扮演接受者的)服务器可以用一个单一且合理短的消息回应所有的实例。因此,执行阶段1的无穷多个实例不会带来任何问题。

由于 leader 的故障以及新 leader 的选举理应很少发生,因此执行状态机命令——对命令/值达成一致的过程的有效成本,仅为运行这个一致性算法的阶段2的成本。可以看出,Paxos 一致性算法的第2阶段在存在故障的情况下,其达成协议的可能代价是所有算法中最小的。于是,Paxos 算法本质上是最优的。

对于系统正常操作的讨论中假设了总是只有一个单独的 Leader,排除了现任 Leader 故障和新任 Leader 选举之间的一小段时间。在异常的情况下,Leader 选举可能失败。如果没有服务器担任 Leader,也就没有新的认领会被提议。如果多个服务器认为它们自己都是 Leader,则它们都可以在这个一致性算法的同一个实例上提议值,这可能导致没有值能够被选定。尽管如此,安全性还是保证的——两个不同的服务器将不会对被选定的第 i 个状态机命令持有不同意见。单个 Leader 的选举只有在确保(整个过程)可进行的时候才需要。

如果服务器的集合可以变化,那必然存在某些方式用于决定哪些服务器实现这个一致性算法的哪些实例。最简单的方式就是通过状态机自己。当前的服务器集合可以成为状态的一部分,并且可以通过普通的状态机命令修改。通过让运行第 i 个状态机命令后所指明的服务器的集合来运行这个一致性算法的第 i + α 号实例,我们可以允许 Leader 提前获得 α 个命令。这允许了一个任意复杂的支持重配置的算法的简单实现。

原论文参考文献

[1]  Michael J. Fischer, Nancy Lynch, and Michael S. Paterson. Impossibility of distributed consensus with one faulty process. Journal of the ACM, 32(2):374–382, April 1985.
[2]  Idit Keidar and Sergio Rajsbaum. On the cost of fault-tolerant consensus when there are no faults—a tutorial. TechnicalReport MIT-LCS-TR-821, Laboratory for Computer Science, Massachusetts Institute Technology, Cambridge, MA, 02139, May 2001. also published in SIGACT News 32(2) (June 2001).
[3]  Leslie Lamport. The implementation of reliable distributed multiprocess systems. Computer Networks, 2:95–114, 1978.
[4]  Leslie Lamport. Time, clocks, and the ordering of events in a distributed system. Communications of the ACM, 21(7):558–565, July 1978.
[5]  Leslie Lamport. The part-time parliament. ACM Transactions on Com- puter Systems, 16(2):133–169, May 1998.

翻译过程参考资料

辅助翻译工具

  • 有道词典

  1. 让人摸不着头脑:原文中的词是“Greek”,我个人猜想这里其实是一语双关,一个意思是指 Leslie Lamport 第一次阐述 Paxos 算法的论文《The Part-Time Parliament》里用了古希腊的故事情节来阐述算法思路,另一个意思表达令人摸不着头脑,可参考有道词典双语例句:
    Reporter: The new version will promote "The Painted" as "Eastern magic of the new", is not it a bit too exaggerated and Greek?Positioning yourself how this movie?
    记者:宣传方将新版《画皮》定位为“东方新魔幻”,是不是有点儿太夸张并且令人摸不着头脑?你自己怎么定位这部电影?。
  2. synod 算法:论文作者在《The Part-Time Parliament》中对其算法的命名为 synod。
  3. 来自维基百科的“状态机复制”词条:多个相同状态机的拷贝,从同样的“初始”状态开始,经历了相同的输入序列后,会达到同样的状态,并且输出同样的结果。
  4. 原文中为单词values,翻译过程中结合上下文理解,认为加上“可能不同的”会更贴合情境。
  5. 原文内容为“A process never learns that a value has been chosen unless it actually has been.”
  6. 关于安全性属性以及活性属性,可查阅“本文参考资料”一节的“Safety & Liveness Properties”
  7. 原文单词“corrupted”,直译应为“损坏”,但是这里结合个人理解,译作“篡改”更贴切,意为不会发生拜占庭式的问题
  8. 由于所谓的大多数等于 N/2+1,所以如果有一个接受者故障,可能导致两边提议都只能得到 N/2 票,都无法行成大多数。
  9. 一种常见且可行的方案是使用时间戳+机器ID的形式,但是实际上论文中并没有对提案编号的生成做具体的规定,只要保证编号递增且唯一即可,所以实际的实现中可以有多种多样的实现方式
  10. 论文作者采用了数学上的第二归纳法,亦称“强归纳法”
  11. 结合后面的推理结论,这里是一个闭区间的标记法,即i..j对应数学记法 [i, j]
  12. 至少接受了 m 号提案,所以这个结论是成立的
  13. 记得一个“提案”始终意味着:一个编号加上一个提案的值。
查看原文

赞 0 收藏 0 评论 0

Martin91 赞了文章 · 2019-10-17

如何做好一名技术管理者,提高技术判断力

最近团队扩充迅速,团队面临了一些新的挑战,一方面我们需要一些具备技术管理的人才能够脱颖而出,另一方面,我们也需要帮助已经成为技术管理者的同学完成转型,重新认识和看待自己的工作。近期在学习前百度最佳经理人刘建国的《技术管理实战36讲》,收货颇多,现分享一篇关于如何提升技术判断力的文章,供大家学习。


转型做管理后,你可以用在技术上的时间会越来越少,尤其是写代码的机会越来越少,手越来越生,但是要做的技术评审和技术决策却有增无减,对技术判断力的要求反而也越来越高。这是因为你的决策产生的影响比之前更大了。

无怪乎会有新经理抱怨说:“技术管理者是有违人性的,一方面自己的技术越来越差,另外一方面却还要带领整个技术团队。”技术管理者对于如何保持技术能力的焦虑,由此可见一斑。

事实上,我们的上级和前辈也时常告诫我们,“技术管理者要保持自己的技术判断力”,可见这个问题是大家都看在眼里的,但是却很少有人告诉我们,技术判断力都包含了哪些内容,也很少告诉我们该怎么样去做。

今天,我就来聊聊,在管理工作越来越繁重的情况下,技术管理者该如何保持自己在技术上的判断力。

技术管理者,和普通管理者最大的区别,就是“技术”二字,这也是技术管理者最鲜明的标签和最大的竞争力,它是如此重要,但又令人不知所措,困扰着众多的技术新经理。

从技术工程师到技术管理者的转型,有很多做事的思路和方法都需要转变。其中一个重要的转变就是你和技术的关系,也就是技术对你来说,意味着什么。

当你还是一位工程师时,你是技术的操作者和实现者,所有的技术服务都从你的手中诞生;而在成为一个越来越成熟的管理者的过程中,你越来越少地直接去实操,慢慢变成了技术的应用者,你需要的是把这些技术服务装配成更大的服务和产品。

这么说可能有点枯燥,打一个比方来说,当你是一名技术工程师时,你需要关心的是一个电子芯片该如何实现;而如果你成为一名技术管理者后,你关心的则是如何使用这些电子芯片,组装成一部手机或其他设备。做管理前后对于技术的态度,就如同对芯片的态度一样。

由此可见,当工作角色对你的要求,从一个技术实现者变为一个技术应用者时,你和技术的关系就发生了变化,“技术能力”这个词的含义也悄悄发生了转变。如果你没有意识到这一点,困惑和焦虑几乎是必然发生的。

那么,从技术实现者到技术应用者,具体发生了哪些转变呢?

对于技术实现者来说,程序设计能力、编码实现能力、技术攻坚能力和技术评估能力,都是需要具备的,主要关心的是“怎么做”,属于“how”的范畴。

而对于技术应用者来说,技术评估能力变得尤其重要,因为技术管理者主要关心的是“要不要做”“做什么”,属于“why” 和 “what” 的范畴,是要在综合评估之后,做出决策和判断的。所以,很多前辈都会告诉我们要保持“技术判断力”,而并没有要求我们保持编码能力,原因就在这里。

那么该如何保持技术判断力呢?因为所有判断,都先要评估,所以技术判断力,其实就是指对技术的评估能力。你可能会说,技术评估能力还是虚的,具体都评估什么呢?

作为一个技术管理者,即技术应用者,要评估的维度主要是以下三个方面:

第一个维度是结果评估。即,你要回答“要不要做”,希望拿到什么结果,你要从哪几个维度去衡量结果,从哪几个技术指标去验收成果。

比如,你可能因为提升服务稳定性,去完善服务架构;也可能因为要提升数据准确性,去改写数据采集程序;还可能为了提升性能指标,去重构数据读写模块,等等。无论如何,你心里都需要很清楚,用什么技术指标来衡量团队的某项技术工作,而不只是完成一个个项目。

事关每项工作的效果和业绩,对结果的评估能力最为关键。虽然结果验收都是放在项目完成后,但是在事先就要明确如何验收,这样才能让大家有的放矢,以终为始。

第二个维度是可行性评估。可行性有两层含义:一是“能不能做”,二是“值不值得”。 能不能和值不值得,是两码事。不懂技术的管理者一般问的都是“能不能做”,而有经验的技术管理者和资深工程师,考虑的是“值不值得”。

所谓“值不值得”,就是成本收益问题。收益,往往是显而易见的;而成本,就有很多方面需要考虑了,这正是体现技术判断力的地方。

首先是“人财物时”等资源投入成本,这是几乎每位工程师和管理者都能考虑到的,即需要投入多少人、多少时间,甚至是多少资金和物资在该项目上,这项成本相对容易评估。

其次是维护成本,这是评估技术方案时要重点考虑的。由于我们考虑投入的时候,往往只考虑到项目发布,而发布后的维护成本很容易被忽略。

常见的技术维护成本有如下四个方面,依次是:

  • 技术选型成本。这是指你在做技术选型的时候,选择不成熟的技术所带来的成本。越成熟的技术,其技术实现成本和人力成本都是相对要低的,但是并不是说,选择新技术就一定不划算,只要考虑到成本和风险,才能做出合理决策。
  • 技术升级成本。这是指在评估技术方案的时候,其兼容性和扩展性水平带来的后期升级的难度和成本。
  • 问题排查成本。在做技术实现的时候,要特别关注后续的问题排查。好的技术实现,分分钟可以排查出问题原因;而不好的技术实现和方案,查一个问题可能需要花上几天时间,成本差异不可同日而语。
  • 代码维护成本。在编写代码的时候,要记得代码是要有可读性的。这体现在别人升级代码要花多长时间才能看明白,修改起来是否简单、安全。

考虑维护成本是技术管理者和架构师视野宽阔、能力成熟的体现。

再次是机会成本,这是技术管理者做决策时要意识到的。即,当你把人力、时间花在这件事上,同时就等于放弃了另外一件事,而没有做另外这件事将带来什么样的影响呢?就是你要考虑的机会成本,你可能会因为这个思考而调整技术方案的选择。

最后,希望你还能意识到协作成本,即多人协作所增加的时间精力开销。一个方案的协作方越多,需要沟通协调的成本也就越高,可控度越低。如果可能的话,尽量减少不同团队和人员之间的耦合,这样会大大降低协作成本。

第三个评估维度,即风险评估。技术风险评估,也叫技术风险判断力。即,有哪些技术风险需要未雨绸缪,考虑该技术方案带来最大损失的可能性和边界,以及在什么情形下会发生。这项评估工作很考验技术管理者的技术经验和风险意识,而且需要借助全团队的技术力量来做出准确判断。

对于一个技术方案或一项技术决策,如果你能从以上三个维度去评估,就说明你拥有了很好的技术意识和判断力;另外,你还会发现,如果能做好技术评估工作,你的技术能力并不会降低,还会持续提高。

那么,如何提升自己的技术判断力呢?

判断力不是天生的,也不是一蹴而就的。新经理的技术判断力,基本都来自于之前技术上的实际操作,来自于自己的经验积累。而做管理之后,技术评估方面的要求更高了,研究技术的时间和精力却减少了,这该怎么破?

别忘了,自从你带团队的那一天起,你就已经不是一个人在战斗。所以,你可以依靠团队和更广的人脉,去拓展技术视野和技术判断力。常见的几个方式如下:

  1. 建立技术学习机制。盘点你负责的业务,需要哪些方面的技术,成立一个或几个核心的技术小组,让团队对各个方向的技术保持敏感,要求小组定期做交流和分享,这样你就可以保持技术的敏感度。
  2. 专项技术调研项目化。如果某项技术对团队的业务有重要的价值,可以专门立项做技术调研,并要求项目负责人做调研汇报。
  3. 和技术大牛交流。越是厉害的技术人,越能深入浅出地把技术讲明白,所以针对某项技术找大牛取经,也是学习的好途径。你看,虽然实际操刀的时间少了,但是你和技术大牛的交流机会多了,一方面因为你有更大的影响力了,另一方面,你和大牛有了共同的诉求,就是把技术“变现”,让技术产生价值。
  4. 听取工作汇报。因为你带的是技术团队,大部分工作都和技术相关,在读员工的周报、季度汇报时,相互探讨,也是一种切磋和学习。

总之,你会发现,技术管理人的技术水准的提升和保持,主要看能从周围人的身上汲取到多少信息和知识,而不再只是靠自学。

归根结底,从技术实现者到技术应用者的转变,不断提升的是技术的使用能力,而技术实现能力由于投入的时间越来越少,会逐渐减弱。如果说带团队做项目就像组装一部手机,你会越来越清楚如何把各个组件集成起来,但是你不见得会清楚每一个电子元器件内部的技术实现。

既然你选择了做更大的事情,就不得不适当放弃一些细节,放弃一些技术实现能力,不断提升你的技术判断力,让团队行走在正确的方向上。你说是不是这么个道理呢?


By 二月的狮子

查看原文

赞 2 收藏 1 评论 0

Martin91 收藏了文章 · 2019-10-16

Goroutine并发调度模型深度解析之手撸一个协程池

个人博客原文:Goroutine并发调度模型深度解析之手撸一个高性能协程池

并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者关注最多的话题;Go语言作为一个出道以来就自带 『高并发』光环的富二代编程语言,它的并发(并行)编程肯定是值得开发者去探究的,而Go语言中的并发(并行)编程是经由goroutine实现的,goroutine是golang最重要的特性之一,具有使用成本低、消耗资源低、能效高等特点,官方宣称原生goroutine并发成千上万不成问题,于是它也成为Gopher们经常使用的特性。

Goroutine是优秀的,但不是完美的,在极大规模的高并发场景下,也可能会暴露出问题,什么问题呢?又有什么可选的解决方案?本文将通过runtime对goroutine的调度分析,帮助大家理解它的机理和发现一些内存和调度的原理和问题,并且基于此提出一种个人的解决方案 — 一个高性能的Goroutine Pool(协程池)。

Goroutine & Scheduler

Goroutine,Go语言基于并发(并行)编程给出的自家的解决方案。goroutine是什么?通常goroutine会被当做coroutine(协程)的 golang实现,从比较粗浅的层面来看,这种认知也算是合理,但实际上,goroutine并非传统意义上的协程,现在主流的线程模型分三种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),传统的协程库属于用户级线程模型,而goroutine和它的Go Scheduler在底层实现上其实是属于两级线程模型,因此,有时候为了方便理解可以简单把goroutine类比成协程,但心里一定要有个清晰的认知 — goroutine并不等同于协程。

线程那些事儿

互联网时代以降,由于在线用户数量的爆炸,单台服务器处理的连接也水涨船高,迫使编程模式由从前的串行模式升级到并发模型,而几十年来,并发模型也是一代代地升级,有IO多路复用、多进程以及多线程,这几种模型都各有长短,现代复杂的高并发架构大多是几种模型协同使用,不同场景应用不同模型,扬长避短,发挥服务器的最大性能,而多线程,因为其轻量和易用,成为并发编程中使用频率最高的并发模型,而后衍生的协程等其他子产品,也都基于它,而我们今天要分析的 goroutine 也是基于线程,因此,我们先来聊聊线程的三大模型:

线程的实现模型主要有3种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),它们之间最大的差异就在于用户线程与内核调度实体(KSE,Kernel Scheduling Entity)之间的对应关系上。而所谓的内核调度实体 KSE 就是指可以被操作系统内核调度器调度的对象实体(这说的啥玩意儿,敢不敢通俗易懂一点?)。简单来说 KSE 就是内核级线程,是操作系统内核的最小调度单元,也就是我们写代码的时候通俗理解上的线程了(这么说不就懂了嘛!装什么13)。

用户级线程模型

用户线程与内核线程KSE是多对一(N : 1)的映射模型,多个用户线程的一般从属于单个进程并且多线程的调度是由用户自己的线程库来完成,线程的创建、销毁以及多线程之间的协调等操作都是由用户自己的线程库来负责而无须借助系统调用来实现。一个进程中所有创建的线程都只和同一个KSE在运行时动态绑定,也就是说,操作系统只知道用户进程而对其中的线程是无感知的,内核的所有调度都是基于用户进程。许多语言实现的 协程库 基本上都属于这种方式(比如python的gevent)。由于线程调度是在用户层面完成的,也就是相较于内核调度不需要让CPU在用户态和内核态之间切换,这种实现方式相比内核级线程可以做的很轻量级,对系统资源的消耗会小很多,因此可以创建的线程数量与上下文切换所花费的代价也会小得多。但该模型有个原罪:并不能做到真正意义上的并发,假设在某个用户进程上的某个用户线程因为一个阻塞调用(比如I/O阻塞)而被CPU给中断(抢占式调度)了,那么该进程内的所有线程都被阻塞(因为单个用户进程内的线程自调度是没有CPU时钟中断的,从而没有轮转调度),整个进程被挂起。即便是多CPU的机器,也无济于事,因为在用户级线程模型下,一个CPU关联运行的是整个用户进程,进程内的子线程绑定到CPU执行是由用户进程调度的,内部线程对CPU是不可见的,此时可以理解为CPU的调度单位是用户进程。所以很多的协程库会把自己一些阻塞的操作重新封装为完全的非阻塞形式,然后在以前要阻塞的点上,主动让出自己,并通过某种方式通知或唤醒其他待执行的用户线程在该KSE上运行,从而避免了内核调度器由于KSE阻塞而做上下文切换,这样整个进程也不会被阻塞了。

内核级线程模型

用户线程与内核线程KSE是一对一(1 : 1)的映射模型,也就是每一个用户线程绑定一个实际的内核线程,而线程的调度则完全交付给操作系统内核去做,应用程序对线程的创建、终止以及同步都基于内核提供的系统调用来完成,大部分编程语言的线程库(比如Java的java.lang.Thread、C++11的std::thread等等)都是对操作系统的线程(内核级线程)的一层封装,创建出来的每个线程与一个独立的KSE静态绑定,因此其调度完全由操作系统内核调度器去做。这种模型的优势和劣势同样明显:优势是实现简单,直接借助操作系统内核的线程以及调度器,所以CPU可以快速切换调度线程,于是多个线程可以同时运行,因此相较于用户级线程模型它真正做到了并行处理;但它的劣势是,由于直接借助了操作系统内核来创建、销毁和以及多个线程之间的上下文切换和调度,因此资源成本大幅上涨,且对性能影响很大。

两级线程模型

两级线程模型是博采众长之后的产物,充分吸收前两种线程模型的优点且尽量规避它们的缺点。在此模型下,用户线程与内核KSE是多对多(N : M)的映射模型:首先,区别于用户级线程模型,两级线程模型中的一个进程可以与多个内核线程KSE关联,于是进程内的多个线程可以绑定不同的KSE,这点和内核级线程模型相似;其次,又区别于内核级线程模型,它的进程里的所有线程并不与KSE一一绑定,而是可以动态绑定同一个KSE, 当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行。所以,两级线程模型既不是用户级线程模型那种完全靠自己调度的也不是内核级线程模型完全靠操作系统调度的,而是中间态(自身调度与系统调度协同工作),也就是 — 『薛定谔的模型』(误),因为这种模型的高度复杂性,操作系统内核开发者一般不会使用,所以更多时候是作为第三方库的形式出现,而Go语言中的runtime调度器就是采用的这种实现方案,实现了Goroutine与KSE之间的动态关联,不过Go语言的实现更加高级和优雅;该模型为何被称为两级?即用户调度器实现用户线程到KSE的『调度』,内核调度器实现KSE到CPU上的『调度』

G-P-M 模型概述

每一个OS线程都有一个固定大小的内存块(一般会是2MB)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。这个固定大小的栈同时很大又很小。因为2MB的栈对于一个小小的goroutine来说是很大的内存浪费,而对于一些复杂的任务(如深度嵌套的递归)来说又显得太小。因此,Go语言做了它自己的『线程』。

在Go语言中,每一个goroutine是一个独立的执行单元,相较于每个OS线程固定分配2M内存的模式,goroutine的栈采取了动态扩容方式, 初始时仅为2KB,随着任务执行按需增长,最大可达1GB(64位机器最大是1G,32位机器最大是256M),且完全由golang自己的调度器 Go Scheduler 来调度。此外,GC还会周期性地将不再使用的内存回收,收缩栈空间。 因此,Go程序可以同时并发成千上万个goroutine是得益于它强劲的调度器和高效的内存模型。Go的创造者大概对goroutine的定位就是屠龙刀,因为他们不仅让goroutine作为golang并发编程的最核心组件(开发者的程序都是基于goroutine运行的)而且golang中的许多标准库的实现也到处能见到goroutine的身影,比如net/http这个包,甚至语言本身的组件runtime运行时和GC垃圾回收器都是运行在goroutine上的,作者对goroutine的厚望可见一斑。

任何用户线程最终肯定都是要交由OS线程来执行的,goroutine(称为G)也不例外,但是G并不直接绑定OS线程运行,而是由Goroutine Scheduler中的 P - Logical Processor (逻辑处理器)来作为两者的『中介』,P可以看作是一个抽象的资源或者一个上下文,一个P绑定一个OS线程,在golang的实现里把OS线程抽象成一个数据结构:M,G实际上是由M通过P来进行调度运行的,但是在G的层面来看,P提供了G运行所需的一切资源和环境,因此在G看来P就是运行它的 “CPU”,由 G、P、M 这三种由Go抽象出来的实现,最终形成了Go调度器的基本结构:

  • G: 表示Goroutine,每个Goroutine对应一个G结构体,G存储Goroutine的运行堆栈、状态以及任务函数,可重用。G并非执行体,每个G需要绑定到P才能被调度执行。
  • P: Processor,表示逻辑处理器, 对G来说,P相当于CPU核,G只有绑定到P(在P的local runq中)才能被调度。对M来说,P提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定,但是不论GOMAXPROCS设置为多大,P的数量最大为256。
  • M: Machine,OS线程抽象,代表着真正执行计算的资源,在绑定有效的P后,进入schedule循环;而schedule循环的机制大致是从Global队列、P的Local队列以及wait队列中获取G,切换到G的执行栈上并执行G的函数,调用goexit做清理工作并回到M,如此反复。M并不保留G状态,这是G可以跨M调度的基础,M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致系统调度不过来,目前默认最大限制为10000个。

关于P,我们需要再絮叨几句,在Go 1.0发布的时候,它的调度器其实G-M模型,也就是没有P的,调度过程全由G和M完成,这个模型暴露出一些问题:

  • 单一全局互斥锁(Sched.Lock)和集中状态存储的存在导致所有goroutine相关操作,比如:创建、重新调度等都要上锁;
  • goroutine传递问题:M经常在M之间传递『可运行』的goroutine,这导致调度延迟增大以及额外的性能损耗;
  • 每个M做内存缓存,导致内存占用过高,数据局部性较差;
  • 由于syscall调用而形成的剧烈的worker thread阻塞和解除阻塞,导致额外的性能损耗。

这些问题实在太扎眼了,导致Go1.0虽然号称原生支持并发,却在并发性能上一直饱受诟病,然后,Go语言委员会中一个核心开发大佬看不下了,亲自下场重新设计和实现了Go调度器(在原有的G-M模型中引入了P)并且实现了一个叫做 work-stealing 的调度算法:

  • 每个P维护一个G的本地队列;
  • 当一个G被创建出来,或者变为可执行状态时,就把他放到P的可执行队列中;
  • 当一个G在M里执行结束后,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行, M就随机选择另外一个P,从其可执行的G队列中取走一半。

该算法避免了在goroutine调度时使用全局锁。

至此,Go调度器的基本模型确立:

G-P-M模型

G-P-M 模型调度

Go调度器工作时会维护两种用来保存G的任务队列:一种是一个Global任务队列,一种是每个P维护的Local任务队列。

当通过go关键字创建一个新的goroutine的时候,它会优先被放入P的本地队列。为了运行goroutine,M需要持有(绑定)一个P,接着M会启动一个OS线程,循环从P的本地队列里取出一个goroutine并执行。当然还有上文提及的 work-stealing调度算法:当M执行完了当前P的Local队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从Global队列寻找G来执行,如果Global队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。

如果一切正常,调度器会以上述的那种方式顺畅地运行,但这个世界没这么美好,总有意外发生,以下分析goroutine在两种例外情况下的行为。

Go runtime会在下面的goroutine被阻塞的情况下运行另外一个goroutine:

  • blocking syscall (for example opening a file)
  • network input
  • channel operations
  • primitives in the sync package

这四种场景又可归类为两种类型:

用户态阻塞/唤醒

当goroutine因为channel操作或者network I/O而阻塞时(实际上golang已经用netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞,仅阻塞G,这里仅仅是举个栗子),对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有runnable的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为runnable,尝试加入G2所在P的runnext,然后再是P的Local队列和Global队列。

系统调用阻塞

当G被阻塞在某个系统调用上时,此时G会阻塞在_Gsyscall状态,M也处于 block on syscall 状态,此时的M可被抢占调度:执行该G的M会与P解绑,而P则尝试与其它idle的M绑定,继续执行其它G。如果没有其它idle的M,但P的Local队列中仍然有G需要执行,则创建一个新的M;当系统调用完成后,G会重新尝试获取一个idle的P进入它的Local队列恢复执行,如果没有idle的P,G会被标记为runnable加入到Global队列。

以上就是从宏观的角度对Goroutine和它的调度器进行的一些概要性的介绍,当然,Go的调度中更复杂的抢占式调度、阻塞调度的更多细节,大家可以自行去找相关资料深入理解,本文只讲到Go调度器的基本调度过程,为后面自己实现一个Goroutine Pool提供理论基础,这里便不再继续深入上述说的那几个调度了,事实上如果要完全讲清楚Go调度器,一篇文章的篇幅也实在是捉襟见肘,所以想了解更多细节的同学可以去看看Go调度器 G-P-M 模型的设计者 Dmitry Vyukov 写的该模型的设计文档《Go Preemptive Scheduler Design》以及直接去看源码,G-P-M模型的定义放在src/runtime/runtime2.go里面,而调度过程则放在了src/runtime/proc.go里。

大规模Goroutine的瓶颈

既然Go调度器已经这么牛逼优秀了,我们为什么还要自己去实现一个golang的 Goroutine Pool 呢?事实上,优秀不代表完美,任何不考虑具体应用场景的编程模式都是耍流氓!有基于G-P-M的Go调度器背书,go程序的并发编程中,可以任性地起大规模的goroutine来执行任务,官方也宣称用golang写并发程序的时候随便起个成千上万的goroutine毫无压力。

然而,你起1000个goroutine没有问题,10000也没有问题,10w个可能也没问题;那,100w个呢?1000w个呢?(这里只是举个极端的例子,实际编程起这么大规模的goroutine的例子极少)这里就会出问题,什么问题呢?

  1. 首先,即便每个goroutine只分配2KB的内存,但如果是恐怖如斯的数量,聚少成多,内存暴涨,就会对GC造成极大的负担,写过java的同学应该知道jvm GC那万恶的STW(Stop The World)机制,也就是GC的时候会挂起用户程序直到垃圾回收完,虽然Go1.8之后的GC已经去掉了STW以及优化成了并行GC,性能上有了不小的提升,但是,如果太过于频繁地进行GC,依然会有性能瓶颈;
  2. 其次,还记得前面我们说的runtime和GC也都是goroutine吗?是的,如果goroutine规模太大,内存吃紧,runtime调度和垃圾回收同样会出问题,虽然G-P-M模型足够优秀,韩信点兵,多多益善,但你不能不给士兵发口粮(内存)吧?巧妇难为无米之炊,没有内存,Go调度器就会阻塞goroutine,结果就是P的Local队列积压,又导致内存溢出,这就是个死循环...,甚至极有可能程序直接Crash掉,本来是想享受golang并发带来的快感效益,结果却得不偿失。

一个http标准库引发的血案

我想,作为golang拥趸的Gopher们一定都使用过它的net/http标准库,很多人都说用golang写web server完全可以不用借助第三方的web framework,仅用net/http标准库就能写一个高性能的web server,的确,我也用过它写过web server,简洁高效,性能表现也相当不错,除非有比较特殊的需求否则一般的确不用借助第三方web framework,但是天下没有白吃的午餐,net/http为啥这么快?要搞清这个问题,从源码入手是最好的途径。孔子曾经曰过:源码面前,如同裸奔。所以,高清无码是阻碍程序猿发展大大滴绊脚石啊,源码才是我们进步阶梯,切记切记!

接下来我们就来先看看net/http内部是怎么实现的。

net/http接收请求且开始处理的源码放在src/net/http/server.go里,先从入口函数ListenAndServe进去:

func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

看到最后那个srv.Serve调用了吗?没错,这个Serve方法里面就是实际处理http请求的逻辑,我们再进入这个方法内部:

func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    ...
    // 不断循环取出TCP连接
    for {
        // 看我看我!!!
        rw, e := l.Accept()
        ...
        // 再看我再看我!!!
        go c.serve(ctx)
    }
}

首先,这个方法的参数(l net.Listener) ,是一个TCP监听的封装,负责监听网络端口,rw, e := l.Accept()则是一个阻塞操作,从网络端口取出一个新的TCP连接进行处理,最后go c.serve(ctx)就是最后真正去处理这个http请求的逻辑了,看到前面的go关键字了吗?没错,这里启动了一个新的goroutine去执行处理逻辑,而且这是在一个无限循环体里面,所以意味着,每来一个请求它就会开一个goroutine去处理,相当任性粗暴啊…,不过有Go调度器背书,一般来说也没啥压力,然而,如果,我是说如果哈,突然一大波请求涌进来了(比方说黑客搞了成千上万的肉鸡DDOS你,没错!就这么倒霉!),这时候,就很成问题了,他来10w个请求你就要开给他10w个goroutine,来100w个你就要老老实实开给他100w个,线程调度压力陡升,内存爆满,再然后,你就跪了…

釜底抽薪

有问题,就一定有解决的办法,那么,有什么方案可以减缓大规模goroutine对系统的调度和内存压力?要想解决问题,最重要的是找到造成问题的根源,这个问题根源是什么?goroutine的数量过多导致资源侵占,那要解决这个问题就要限制运行的goroutine数量,合理复用,节省资源,具体就是 — goroutine池化。

超大规模并发的场景下,不加限制的大规模的goroutine可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢还是其次,更危险的是可能使得程序crash。所以,goroutine池化是有其现实意义的。

首先,100w个任务,是不是真的需要100w个goroutine来处理?未必!用1w个goroutine也一样可以处理,让一个goroutine多处理几个任务就是了嘛,池化的核心优势就在于对goroutine的复用。此举首先极大减轻了runtime调度goroutine的压力,其次,便是降低了对内存的消耗。

有一个商场,来了1000个顾客买东西,那么该如何安排导购员服务这1000人呢?有两种方案:

第一,我雇1000个导购员实行一对一服务,这种当然是最高效的,但是太浪费资源了,雇1000个人的成本极高且管理困难,这些可以先按下不表,但是每个顾客到商场买东西也不是一进来就马上买,一般都得逛一逛,选一选,也就是得花时间挑,1000个导购员一对一盯着,效率极低;这就引出第二种方案:我只雇10个导购员,就在商场里待命,有顾客需要咨询的时候招呼导购员过去进行处理,导购员处理完之后就回来,等下一个顾客需要咨询的时候再去,如此往返反复...

第二种方案有没有觉得很眼熟?没错,其基本思路就是模拟一个I/O多路复用,通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。关于多路复用,不在本文的讨论范围之内,便不再赘述,详细原理可以参考 I/O多路复用

第一种方案就是net/http标准库采用的:来一个请求开一个goroutine处理;第二种方案就是Goroutine Pool(I/O多路复用)。

实现一个 Goroutine Pool

因为上述陈列的一些由于goroutine规模过大而可能引发的问题,需要有方案来解决这些问题,上文已经分析过,把goroutine池化是一种行之有效的方案,基于此,可以实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。

哎玛!前面絮絮叨叨了这么多,终于进入正题了,接下来就开始讲解如何实现一个高性能的Goroutine Pool,秒杀原生并发的goroutine,在执行速度和占用内存上提高并发程序的性能。好了,话不多说,开始装逼分析。

设计思路

Goroutine Pool 的实现思路大致如下:

启动服务之时先初始化一个 Goroutine Pool 池,这个Pool维护了一个类似栈的LIFO队列 ,里面存放负责处理任务的Worker,然后在client端提交task到Pool中之后,在Pool内部,接收task之后的核心操作是:

  1. 检查当前Worker队列中是否有空闲的Worker,如果有,取出执行当前的task;
  2. 没有空闲Worker,判断当前在运行的Worker是否已超过该Pool的容量,是 — 阻塞等待直至有Worker被放回Pool;否 — 新开一个Worker(goroutine)处理;
  3. 每个Worker执行完任务之后,放回Pool的队列中等待。

调度过程如下:

按照这个设计思路,我实现了一个高性能的Goroutine Pool,较好地解决了上述的大规模调度和资源占用的问题,在执行速度和内存占用方面相较于原生goroutine并发占有明显的优势,尤其是内存占用,因为复用,所以规避了无脑启动大规模goroutine的弊端,可以节省大量的内存。

此外,该调度系统还有一个清理过期Worker的定时任务,该任务在初始化一个Pool之时启动,每隔一定的时间间隔去检查空闲Worker队列中是否有已经过期的Worker,有则清理掉,通过定时清理过期worker,进一步节省系统资源。

完整的项目代码可以在我的github上获取:传送门,也欢迎提意见和交流。

实现细节

Goroutine Pool的设计原理前面已经讲过了,整个调度过程相信大家应该可以理解了,但是有一句老话说得好,空谈误国,实干兴邦,设计思路有了,具体实现的时候肯定会有很多细节、难点,接下来我们通过分析这个Goroutine Pool的几个核心实现以及它们的联动来引导大家过一遍Goroutine Pool的原理。

首先是Pool struct

type sig struct{}

type f func() error

// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // expiryDuration set the expired time (second) of every worker.
    expiryDuration time.Duration

    // freeSignal is used to notice pool there are available
    // workers which can be sent to work.
    freeSignal chan sig

    // workers is a slice that store the available workers.
    workers []*Worker

    // release is used to notice the pool to closed itself.
    release chan sig

    // lock for synchronous operation
    lock sync.Mutex

    once sync.Once
}

Pool是一个通用的协程池,支持不同类型的任务,亦即每一个任务绑定一个函数提交到池中,批量执行不同类型任务,是一种广义的协程池;本项目中还实现了另一种协程池 — 批量执行同类任务的协程池PoolWithFunc,每一个PoolWithFunc只会绑定一个任务函数pf,这种Pool适用于大批量相同任务的场景,因为每个Pool只绑定一个任务函数,因此PoolWithFunc相较于Pool会更加节省内存,但通用性就不如前者了,为了让大家更好地理解协程池的原理,这里我们用通用的Pool来分析。

capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker绑定一个goroutine;running是当前正在执行任务的worker数量;expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;freeSignal是一个信号,因为Pool开启的worker数量有上限,因此当全部worker都在执行任务的时候,新进来的请求就需要阻塞等待,那当执行完任务的worker被放回Pool之时,如何通知阻塞的请求绑定一个空闲的worker运行呢?freeSignal就是来做这个事情的;workers是一个slice,用来存放空闲worker,请求进入Pool之后会首先检查workers中是否有空闲worker,若有则取出绑定任务执行,否则判断当前运行的worker是否已经达到容量上限,是—阻塞等待,否—新开一个worker执行任务;release是当关闭该Pool支持通知所有worker退出运行以防goroutine泄露;lock是一个锁,用以支持Pool的同步操作;once用在确保Pool关闭操作只会执行一次。

初始化Pool并启动定期清理过期worker任务

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
    if size <= 0 {
        return nil, ErrPoolSizeInvalid
    }
    p := &Pool{
        capacity:       int32(size),
        freeSignal:     make(chan sig, math.MaxInt32),
        release:        make(chan sig, 1),
        expiryDuration: time.Duration(expiry) * time.Second,
    }
    // 启动定期清理过期worker任务,独立goroutine运行,
    // 进一步节省系统资源
    p.monitorAndClear()
    return p, nil
}

提交任务到Pool

p.Submit(task f)如下:

// Submit submit a task to pool
func (p *Pool) Submit(task f) error {
    if len(p.release) > 0 {
        return ErrPoolClosed
    }
    w := p.getWorker()
    w.sendTask(task)
    return nil
}

第一个if判断当前Pool是否已被关闭,若是则不再接受新任务,否则获取一个Pool中可用的worker,绑定该task执行。

获取可用worker(核心)

p.getWorker()源码:

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
    var w *Worker
    // 标志,表示当前运行的worker数量是否已达容量上限
    waiting := false
    // 涉及从workers队列取可用worker,需要加锁
    p.lock.Lock()
    workers := p.workers
    n := len(workers) - 1
    // 当前worker队列为空(无空闲worker)
    if n < 0 {
        // 运行worker数目已达到该Pool的容量上限,置等待标志
        if p.running >= p.capacity {
            waiting = true
        // 否则,运行数目加1
        } else {
            p.running++
        }
    // 有空闲worker,从队列尾部取出一个使用
    } else {
        <-p.freeSignal
        w = workers[n]
        workers[n] = nil
        p.workers = workers[:n]
    }
    // 判断是否有worker可用结束,解锁
    p.lock.Unlock()

    if waiting {
        // 阻塞等待直到有空闲worker
        <-p.freeSignal
        p.lock.Lock()
        workers = p.workers
        l := len(workers) - 1
        w = workers[l]
        workers[l] = nil
        p.workers = workers[:l]
        p.lock.Unlock()
    // 当前无空闲worker但是Pool还没有满,
    // 则可以直接新开一个worker执行任务
    } else if w == nil {
        w = &Worker{
            pool: p,
            task: make(chan f),
        }
        w.run()
    }
    return w
}

上面的源码中加了较为详细的注释,结合前面的设计思路,相信大家应该能理解获取可用worker绑定任务执行这个协程池的核心操作,主要就是实现一个LIFO队列用来存取可用worker达到资源复用的效果,之所以采用LIFO后进先出队列是因为后进先出可以保证空闲worker队列是按照每个worker的最后运行时间从远到近的顺序排列,方便在后续定期清理过期worker时排序以及清理完之后重新分配空闲worker队列,这里还要关注一个地方:达到Pool容量限制之后,额外的任务请求需要阻塞等待idle worker,这里是为了防止无节制地创建goroutine,事实上Go调度器有一个复用机制,每次使用go关键字的时候它会检查当前结构体M中的P中,是否有可用的结构体G。如果有,则直接从中取一个,否则,需要分配一个新的结构体G。如果分配了新的G,需要将它挂到runtime的相关队列中,但是调度器却没有限制goroutine的数量,这在瞬时性goroutine爆发的场景下就可能来不及复用G而依然创建了大量的goroutine,所以ants除了复用还做了限制goroutine数量。

其他部分可以依照注释理解,这里不再赘述。

任务执行

// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
    // pool who owns this worker.
    pool *Pool

    // task is a job should be done.
    task chan f

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
    //atomic.AddInt32(&w.pool.running, 1)
    go func() {
        //监听任务列表,一旦有任务立马取出运行
        for f := range w.task {
            if f == nil {
                atomic.AddInt32(&w.pool.running, -1)
                return
            }
            f()

            //回收复用
            w.pool.putWorker(w)
        }
    }()
}

// stop this worker.
func (w *Worker) stop() {
    w.sendTask(nil)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task f) {
    w.task <- task
}

结合前面的p.Submit(task f)p.getWorker(),提交任务到Pool之后,获取一个可用worker,每新建一个worker实例之时都需要调用w.run()启动一个goroutine监听worker的任务列表task,一有任务提交进来就执行;所以,当调用worker的sendTask(task f)方法提交任务到worker的任务队列之后,马上就可以被接收并执行,当任务执行完之后,会调用w.pool.putWorker(w *Worker)方法将这个已经执行完任务的worker从当前任务解绑放回Pool中,以供下个任务可以使用,至此,一个任务从提交到完成的过程就此结束,Pool调度将进入下一个循环。

Worker回收(goroutine复用)

// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
    // 写入回收时间,亦即该worker的最后运行时间
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.workers = append(p.workers, worker)
    p.lock.Unlock()
    p.freeSignal <- sig{}
}

动态扩容或者缩小池容量

// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
    if size < p.Cap() {
        diff := p.Cap() - size
        for i := 0; i < diff; i++ {
            p.getWorker().stop()
        }
    } else if size == p.Cap() {
        return
    }
    atomic.StoreInt32(&p.capacity, int32(size))
}

定期清理过期Worker

func (p *Pool) monitorAndClear() {
    heartbeat := time.NewTicker(p.expiryDuration)
    go func() {
        for range heartbeat.C {
            currentTime := time.Now()
            p.lock.Lock()
            idleWorkers := p.workers
            n := 0
            for i, w := range idleWorkers {
                if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                    break
                }
                n = i
                w.stop()
                idleWorkers[i] = nil
                p.running--
            }
            if n > 0 {
                n++
                p.workers = idleWorkers[n:]
            }
            p.lock.Unlock()
        }
    }()
}

定期检查空闲worker队列中是否有已过期的worker并清理:因为采用了LIFO后进先出队列存放空闲worker,所以该队列默认已经是按照worker的最后运行时间由远及近排序,可以方便地按顺序取出空闲队列中的每个worker并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长,若是,则清理掉该goroutine,释放该worker,并且将剩下的未过期worker重新分配到当前Pool的空闲worker队列中,进一步节省系统资源。

概括起来,ants Goroutine Pool的调度过程图示如下:

彩蛋

还记得前面我说除了通用的Pool struct之外,本项目还实现了一个PoolWithFunc struct—一个执行批量同类任务的协程池,PoolWithFunc相较于Pool,因为一个池只绑定一个任务函数,省去了每一次task都需要传送一个任务函数的代价,因此其性能优势比起Pool更明显,这里我们稍微讲一下一个协程池只绑定一个任务函数的细节:

上码!

type pf func(interface{}) error

// PoolWithFunc accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // expiryDuration set the expired time (second) of every worker.
    expiryDuration time.Duration

    // freeSignal is used to notice pool there are available
    // workers which can be sent to work.
    freeSignal chan sig

    // workers is a slice that store the available workers.
    workers []*WorkerWithFunc

    // release is used to notice the pool to closed itself.
    release chan sig

    // lock for synchronous operation
    lock sync.Mutex

    // pf is the function for processing tasks
    poolFunc pf

    once sync.Once
}

PoolWithFunc struct中的大部分字段和Pool struct基本一致,重点关注poolFunc pf,这是一个函数类型,也就是该Pool绑定的指定任务函数,而client提交到这种类型的Pool的数据就不再是一个任务函数task f了,而是poolFunc pf任务函数的形参,然后交由WorkerWithFunc处理:

// WorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type WorkerWithFunc struct {
    // pool who owns this worker.
    pool *PoolWithFunc

    // args is a job should be done.
    args chan interface{}

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *WorkerWithFunc) run() {
    go func() {
        for args := range w.args {
            if args == nil || len(w.pool.release) > 0 {
                atomic.AddInt32(&w.pool.running, -1)
                return
            }
            w.pool.poolFunc(args)
            w.pool.putWorker(w)
        }
    }()
}

// stop this worker.
func (w *WorkerWithFunc) stop() {
    w.sendTask(nil)
}

// sendTask sends a task to this worker.
func (w *WorkerWithFunc) sendTask(args interface{}) {
    w.args <- args
}

上面的源码可以看到WorkerWithFunc是一个类似Worker的结构,只不过监听的是函数的参数队列,每接收到一个参数包,就直接调用PoolWithFunc绑定好的任务函数poolFunc pf任务函数执行任务,接下来的流程就和Worker是一致的了,执行完任务后就把worker放回协程池,等待下次使用。

至于其他逻辑如提交task、获取Worker绑定任务等基本复用自Pool struct,具体细节有细微差别,但原理一致,万变不离其宗,有兴趣的同学可以看我在github上的源码:Goroutine Pool协程池 ants

Benchmarks

吹了这么久的Goroutine Pool,那都是虚的,理论上池化可以复用goroutine,提升性能节省内存,没有benchmark数据之前,好像也不能服众哈!所以,本章就来进行一次实测,验证一下再大规模goroutine并发的场景下,Goroutine Pool的表现是不是真的比原生Goroutine并发更好!

测试机器参数:

OS : macOS High Sierra
Processor : 2.7 GHz Intel Core i5
Memory : 8 GB 1867 MHz DDR3

Go1.9

Pool测试

测试代码传送门

测试结果:

这里为了模拟大规模goroutine的场景,两次测试的并发次数分别是100w和1000w,前两个测试分别是执行100w个并发任务不使用Pool和使用了ants的Goroutine Pool的性能,后两个则是1000w个任务下的表现,可以直观的看出在执行速度和内存使用上,ants的Pool都占有明显的优势。100w的任务量,使用ants,执行速度与原生goroutine相当甚至略快,但只实际使用了不到5w个goroutine完成了全部任务,且内存消耗仅为原生并发的40%;而当任务量达到1000w,优势则更加明显了:用了70w左右的goroutine完成全部任务,执行速度比原生goroutine提高了100%,且内存消耗依旧保持在不使用Pool的40%左右。

PoolWithFunc测试

测试代码传送门

测试结果:

  • Benchmarkxxx-4 格式为基准测试函数名-GOMAXPROCS,后面的-4代表测试函数运行时对应的CPU核数
  • 1 表示执行的次数
  • xx ns/op 表示每次的执行时间
  • xx B/op 表示每次执行分配的总字节数(内存消耗)
  • xx allocs/op 表示每次执行发生了多少次内存分配

因为PoolWithFunc这个Pool只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于Pool对原生goroutine在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生goroutine的300%,而内存消耗的优势已经达到了两位数的差距,原生goroutine的内存消耗达到了ants的35倍且原生goroutine的每次执行的内存分配次数也达到了ants45倍,1000w的任务量,ants的初始分配容量是5w,因此它完成了所有的任务依旧只使用了5w个goroutine!事实上,ants的Goroutine Pool的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。

吞吐量测试

上面的benchmarks出来以后,我当时的内心是这样的:

但是太顺利反而让我疑惑,因为结合我过去这20几年的坎坷人生来看,事情应该不会这么美好才对,果不其然,细细一想,虽然ants Groutine Pool能在大规模并发下执行速度和内存消耗都对原生goroutine占有明显优势,但前面的测试demo相信大家注意到了,里面使用了WaitGroup,也就是用来对goroutine同步的工具,所以上面的benchmarks中主进程会等待所有子goroutine完成任务后才算完成一次性能测试,然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情...

彼时,我内心变成了这样:

幸好,ants在同步批量任务方面有点曲高和寡,但是如果是异步批量任务的场景下,就有用武之地了,也就是说,在大批量的任务无须同步等待完成的情况下,可以再测一下ants和原生goroutine并发的性能对比,这个时候的性能对比也即是吞吐量对比了,就是在相同大规模数量的请求涌进来的时候,ants和原生goroutine谁能用更快的速度、更少的内存『吞』完这些请求。

测试代码传送门

测试结果:

10w 吞吐量

100w 吞吐量

1000W 吞吐量

因为在我的电脑上测试1000w吞吐量的时候原生goroutine已经到了极限,因此程序直接把电脑拖垮了,无法正常测试了,所以1000w吞吐的测试数据只有antsPool的。

从该demo测试吞吐性能对比可以看出,使用ants的吞吐性能相较于原生goroutine可以保持在2~6倍的性能压制,而内存消耗则可以达到10~20倍的节省优势。

总结

至此,一个高性能的 Goroutine Pool 开发就完成了,事实上,原理不难理解,总结起来就是一个『复用』,具体落实到代码细节就是锁同步、原子操作、channel通信等这些技巧的使用,ant这整个项目没有借助任何第三方的库,用golang的标准库就完成了所有功能,因为本身golang的语言原生库已经足够优秀,很多时候开发golang项目的时候是可以保持轻量且高性能的,未必事事需要借助第三方库。

关于ants的价值,其实前文也提及过了,ants在大规模的异步&同步批量任务处理都有着明显的性能优势(特别是异步批量任务),而单机上百万上千万的同步批量任务处理现实意义不大,但是在异步批量任务处理方面有很大的应用价值,所以我个人觉得,Goroutine Pool真正的价值还是在:

  1. 限制并发的goroutine数量;
  2. 复用goroutine,减轻runtime调度压力,提升程序性能;
  3. 规避过多的goroutine侵占系统资源(CPU&内存)。

后记

Go语言的三位最初的缔造者 — Rob Pike、Robert Griesemer 和 Ken Thompson 中,Robert Griesemer 参与设计了Java的HotSpot虚拟机和Chrome浏览器的JavaScript V8引擎,Rob Pike 在大名鼎鼎的bell lab侵淫多年,参与了Plan9操作系统、C编译器以及多种语言编译器的设计和实现,Ken Thompson 更是图灵奖得主、Unix之父、C语言之父。这三人在计算机史上可是元老级别的人物,特别是 Ken Thompson ,是一手缔造了Unix和C语言计算机领域的上古大神,所以Go语言的设计哲学有着深深的Unix烙印:简单、模块化、正交、组合、pipe、功能短小且聚焦等;而令许多开发者青睐于Go的简洁、高效编程模式的原因,也正在于此。

Go语言的三个爸爸

本文从三大线程模型到Go并发调度器再到自定制的 Goroutine Pool,算是较为完整的窥探了整个Go语言并发模型的前世今生,我们也可以看到,Go的设计当然不完美,比如一直被诟病的error处理模式、不支持泛型、差强人意的包管理以及面向对象模式的过度抽象化等等,实际上没有任何一门编程语言敢说自己是完美的,还是那句话,任何不考虑应用场景和语言定位的争执都毫无意义,而Go的定位从出道开始就是系统编程语言&云计算编程语言(这个有点模糊),而Go的作者们也一直坚持的是用最简单抽象的工程化设计完成最复杂的功能,所以如果从这个层面去看Go的并发模型,就可以看出其实除了G-P-M模型中引入的 P ,并没有太多革新的原创理论,两级线程模型是早已成熟的理论,抢占式调度更不是什么新鲜的调度模式,Go的伟大之处是在于它诞生之初就是依照Go在谷歌:以软件工程为目的的语言设计而设计的,Go其实就是将这些经典的理论和技术以一种优雅高效的工程化方式组合了起来,并用简单抽象的API或语法糖开放给使用者,Go一直致力于找寻一个高性能&开发效率的双赢点,目前为止,它做得远不够完美,但足够优秀。另外Go通过引入channel与goroutine协同工作,将一种区别于锁&原子操作的并发编程模式 — CSP 带入了Go语言,对开发人员在并发编程模式上的思考有很大的启发。

从本文中对Go调度器的分析以及antsGoroutine Pool 的设计与实现过程,对Go的并发模型做了一次解构和优化思考,在ants中的代码实现对锁同步、原子操作、channel通信的使用也做了一次较为全面的实践,希望对Gopher们在Go语言并发模型与并发编程的理解上能有所裨益。

感谢阅读。

参考

查看原文

Martin91 赞了文章 · 2019-10-16

Goroutine并发调度模型深度解析之手撸一个协程池

个人博客原文:Goroutine并发调度模型深度解析之手撸一个高性能协程池

并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者关注最多的话题;Go语言作为一个出道以来就自带 『高并发』光环的富二代编程语言,它的并发(并行)编程肯定是值得开发者去探究的,而Go语言中的并发(并行)编程是经由goroutine实现的,goroutine是golang最重要的特性之一,具有使用成本低、消耗资源低、能效高等特点,官方宣称原生goroutine并发成千上万不成问题,于是它也成为Gopher们经常使用的特性。

Goroutine是优秀的,但不是完美的,在极大规模的高并发场景下,也可能会暴露出问题,什么问题呢?又有什么可选的解决方案?本文将通过runtime对goroutine的调度分析,帮助大家理解它的机理和发现一些内存和调度的原理和问题,并且基于此提出一种个人的解决方案 — 一个高性能的Goroutine Pool(协程池)。

Goroutine & Scheduler

Goroutine,Go语言基于并发(并行)编程给出的自家的解决方案。goroutine是什么?通常goroutine会被当做coroutine(协程)的 golang实现,从比较粗浅的层面来看,这种认知也算是合理,但实际上,goroutine并非传统意义上的协程,现在主流的线程模型分三种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),传统的协程库属于用户级线程模型,而goroutine和它的Go Scheduler在底层实现上其实是属于两级线程模型,因此,有时候为了方便理解可以简单把goroutine类比成协程,但心里一定要有个清晰的认知 — goroutine并不等同于协程。

线程那些事儿

互联网时代以降,由于在线用户数量的爆炸,单台服务器处理的连接也水涨船高,迫使编程模式由从前的串行模式升级到并发模型,而几十年来,并发模型也是一代代地升级,有IO多路复用、多进程以及多线程,这几种模型都各有长短,现代复杂的高并发架构大多是几种模型协同使用,不同场景应用不同模型,扬长避短,发挥服务器的最大性能,而多线程,因为其轻量和易用,成为并发编程中使用频率最高的并发模型,而后衍生的协程等其他子产品,也都基于它,而我们今天要分析的 goroutine 也是基于线程,因此,我们先来聊聊线程的三大模型:

线程的实现模型主要有3种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),它们之间最大的差异就在于用户线程与内核调度实体(KSE,Kernel Scheduling Entity)之间的对应关系上。而所谓的内核调度实体 KSE 就是指可以被操作系统内核调度器调度的对象实体(这说的啥玩意儿,敢不敢通俗易懂一点?)。简单来说 KSE 就是内核级线程,是操作系统内核的最小调度单元,也就是我们写代码的时候通俗理解上的线程了(这么说不就懂了嘛!装什么13)。

用户级线程模型

用户线程与内核线程KSE是多对一(N : 1)的映射模型,多个用户线程的一般从属于单个进程并且多线程的调度是由用户自己的线程库来完成,线程的创建、销毁以及多线程之间的协调等操作都是由用户自己的线程库来负责而无须借助系统调用来实现。一个进程中所有创建的线程都只和同一个KSE在运行时动态绑定,也就是说,操作系统只知道用户进程而对其中的线程是无感知的,内核的所有调度都是基于用户进程。许多语言实现的 协程库 基本上都属于这种方式(比如python的gevent)。由于线程调度是在用户层面完成的,也就是相较于内核调度不需要让CPU在用户态和内核态之间切换,这种实现方式相比内核级线程可以做的很轻量级,对系统资源的消耗会小很多,因此可以创建的线程数量与上下文切换所花费的代价也会小得多。但该模型有个原罪:并不能做到真正意义上的并发,假设在某个用户进程上的某个用户线程因为一个阻塞调用(比如I/O阻塞)而被CPU给中断(抢占式调度)了,那么该进程内的所有线程都被阻塞(因为单个用户进程内的线程自调度是没有CPU时钟中断的,从而没有轮转调度),整个进程被挂起。即便是多CPU的机器,也无济于事,因为在用户级线程模型下,一个CPU关联运行的是整个用户进程,进程内的子线程绑定到CPU执行是由用户进程调度的,内部线程对CPU是不可见的,此时可以理解为CPU的调度单位是用户进程。所以很多的协程库会把自己一些阻塞的操作重新封装为完全的非阻塞形式,然后在以前要阻塞的点上,主动让出自己,并通过某种方式通知或唤醒其他待执行的用户线程在该KSE上运行,从而避免了内核调度器由于KSE阻塞而做上下文切换,这样整个进程也不会被阻塞了。

内核级线程模型

用户线程与内核线程KSE是一对一(1 : 1)的映射模型,也就是每一个用户线程绑定一个实际的内核线程,而线程的调度则完全交付给操作系统内核去做,应用程序对线程的创建、终止以及同步都基于内核提供的系统调用来完成,大部分编程语言的线程库(比如Java的java.lang.Thread、C++11的std::thread等等)都是对操作系统的线程(内核级线程)的一层封装,创建出来的每个线程与一个独立的KSE静态绑定,因此其调度完全由操作系统内核调度器去做。这种模型的优势和劣势同样明显:优势是实现简单,直接借助操作系统内核的线程以及调度器,所以CPU可以快速切换调度线程,于是多个线程可以同时运行,因此相较于用户级线程模型它真正做到了并行处理;但它的劣势是,由于直接借助了操作系统内核来创建、销毁和以及多个线程之间的上下文切换和调度,因此资源成本大幅上涨,且对性能影响很大。

两级线程模型

两级线程模型是博采众长之后的产物,充分吸收前两种线程模型的优点且尽量规避它们的缺点。在此模型下,用户线程与内核KSE是多对多(N : M)的映射模型:首先,区别于用户级线程模型,两级线程模型中的一个进程可以与多个内核线程KSE关联,于是进程内的多个线程可以绑定不同的KSE,这点和内核级线程模型相似;其次,又区别于内核级线程模型,它的进程里的所有线程并不与KSE一一绑定,而是可以动态绑定同一个KSE, 当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行。所以,两级线程模型既不是用户级线程模型那种完全靠自己调度的也不是内核级线程模型完全靠操作系统调度的,而是中间态(自身调度与系统调度协同工作),也就是 — 『薛定谔的模型』(误),因为这种模型的高度复杂性,操作系统内核开发者一般不会使用,所以更多时候是作为第三方库的形式出现,而Go语言中的runtime调度器就是采用的这种实现方案,实现了Goroutine与KSE之间的动态关联,不过Go语言的实现更加高级和优雅;该模型为何被称为两级?即用户调度器实现用户线程到KSE的『调度』,内核调度器实现KSE到CPU上的『调度』

G-P-M 模型概述

每一个OS线程都有一个固定大小的内存块(一般会是2MB)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。这个固定大小的栈同时很大又很小。因为2MB的栈对于一个小小的goroutine来说是很大的内存浪费,而对于一些复杂的任务(如深度嵌套的递归)来说又显得太小。因此,Go语言做了它自己的『线程』。

在Go语言中,每一个goroutine是一个独立的执行单元,相较于每个OS线程固定分配2M内存的模式,goroutine的栈采取了动态扩容方式, 初始时仅为2KB,随着任务执行按需增长,最大可达1GB(64位机器最大是1G,32位机器最大是256M),且完全由golang自己的调度器 Go Scheduler 来调度。此外,GC还会周期性地将不再使用的内存回收,收缩栈空间。 因此,Go程序可以同时并发成千上万个goroutine是得益于它强劲的调度器和高效的内存模型。Go的创造者大概对goroutine的定位就是屠龙刀,因为他们不仅让goroutine作为golang并发编程的最核心组件(开发者的程序都是基于goroutine运行的)而且golang中的许多标准库的实现也到处能见到goroutine的身影,比如net/http这个包,甚至语言本身的组件runtime运行时和GC垃圾回收器都是运行在goroutine上的,作者对goroutine的厚望可见一斑。

任何用户线程最终肯定都是要交由OS线程来执行的,goroutine(称为G)也不例外,但是G并不直接绑定OS线程运行,而是由Goroutine Scheduler中的 P - Logical Processor (逻辑处理器)来作为两者的『中介』,P可以看作是一个抽象的资源或者一个上下文,一个P绑定一个OS线程,在golang的实现里把OS线程抽象成一个数据结构:M,G实际上是由M通过P来进行调度运行的,但是在G的层面来看,P提供了G运行所需的一切资源和环境,因此在G看来P就是运行它的 “CPU”,由 G、P、M 这三种由Go抽象出来的实现,最终形成了Go调度器的基本结构:

  • G: 表示Goroutine,每个Goroutine对应一个G结构体,G存储Goroutine的运行堆栈、状态以及任务函数,可重用。G并非执行体,每个G需要绑定到P才能被调度执行。
  • P: Processor,表示逻辑处理器, 对G来说,P相当于CPU核,G只有绑定到P(在P的local runq中)才能被调度。对M来说,P提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定,但是不论GOMAXPROCS设置为多大,P的数量最大为256。
  • M: Machine,OS线程抽象,代表着真正执行计算的资源,在绑定有效的P后,进入schedule循环;而schedule循环的机制大致是从Global队列、P的Local队列以及wait队列中获取G,切换到G的执行栈上并执行G的函数,调用goexit做清理工作并回到M,如此反复。M并不保留G状态,这是G可以跨M调度的基础,M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致系统调度不过来,目前默认最大限制为10000个。

关于P,我们需要再絮叨几句,在Go 1.0发布的时候,它的调度器其实G-M模型,也就是没有P的,调度过程全由G和M完成,这个模型暴露出一些问题:

  • 单一全局互斥锁(Sched.Lock)和集中状态存储的存在导致所有goroutine相关操作,比如:创建、重新调度等都要上锁;
  • goroutine传递问题:M经常在M之间传递『可运行』的goroutine,这导致调度延迟增大以及额外的性能损耗;
  • 每个M做内存缓存,导致内存占用过高,数据局部性较差;
  • 由于syscall调用而形成的剧烈的worker thread阻塞和解除阻塞,导致额外的性能损耗。

这些问题实在太扎眼了,导致Go1.0虽然号称原生支持并发,却在并发性能上一直饱受诟病,然后,Go语言委员会中一个核心开发大佬看不下了,亲自下场重新设计和实现了Go调度器(在原有的G-M模型中引入了P)并且实现了一个叫做 work-stealing 的调度算法:

  • 每个P维护一个G的本地队列;
  • 当一个G被创建出来,或者变为可执行状态时,就把他放到P的可执行队列中;
  • 当一个G在M里执行结束后,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行, M就随机选择另外一个P,从其可执行的G队列中取走一半。

该算法避免了在goroutine调度时使用全局锁。

至此,Go调度器的基本模型确立:

G-P-M模型

G-P-M 模型调度

Go调度器工作时会维护两种用来保存G的任务队列:一种是一个Global任务队列,一种是每个P维护的Local任务队列。

当通过go关键字创建一个新的goroutine的时候,它会优先被放入P的本地队列。为了运行goroutine,M需要持有(绑定)一个P,接着M会启动一个OS线程,循环从P的本地队列里取出一个goroutine并执行。当然还有上文提及的 work-stealing调度算法:当M执行完了当前P的Local队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从Global队列寻找G来执行,如果Global队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。

如果一切正常,调度器会以上述的那种方式顺畅地运行,但这个世界没这么美好,总有意外发生,以下分析goroutine在两种例外情况下的行为。

Go runtime会在下面的goroutine被阻塞的情况下运行另外一个goroutine:

  • blocking syscall (for example opening a file)
  • network input
  • channel operations
  • primitives in the sync package

这四种场景又可归类为两种类型:

用户态阻塞/唤醒

当goroutine因为channel操作或者network I/O而阻塞时(实际上golang已经用netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞,仅阻塞G,这里仅仅是举个栗子),对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有runnable的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为runnable,尝试加入G2所在P的runnext,然后再是P的Local队列和Global队列。

系统调用阻塞

当G被阻塞在某个系统调用上时,此时G会阻塞在_Gsyscall状态,M也处于 block on syscall 状态,此时的M可被抢占调度:执行该G的M会与P解绑,而P则尝试与其它idle的M绑定,继续执行其它G。如果没有其它idle的M,但P的Local队列中仍然有G需要执行,则创建一个新的M;当系统调用完成后,G会重新尝试获取一个idle的P进入它的Local队列恢复执行,如果没有idle的P,G会被标记为runnable加入到Global队列。

以上就是从宏观的角度对Goroutine和它的调度器进行的一些概要性的介绍,当然,Go的调度中更复杂的抢占式调度、阻塞调度的更多细节,大家可以自行去找相关资料深入理解,本文只讲到Go调度器的基本调度过程,为后面自己实现一个Goroutine Pool提供理论基础,这里便不再继续深入上述说的那几个调度了,事实上如果要完全讲清楚Go调度器,一篇文章的篇幅也实在是捉襟见肘,所以想了解更多细节的同学可以去看看Go调度器 G-P-M 模型的设计者 Dmitry Vyukov 写的该模型的设计文档《Go Preemptive Scheduler Design》以及直接去看源码,G-P-M模型的定义放在src/runtime/runtime2.go里面,而调度过程则放在了src/runtime/proc.go里。

大规模Goroutine的瓶颈

既然Go调度器已经这么牛逼优秀了,我们为什么还要自己去实现一个golang的 Goroutine Pool 呢?事实上,优秀不代表完美,任何不考虑具体应用场景的编程模式都是耍流氓!有基于G-P-M的Go调度器背书,go程序的并发编程中,可以任性地起大规模的goroutine来执行任务,官方也宣称用golang写并发程序的时候随便起个成千上万的goroutine毫无压力。

然而,你起1000个goroutine没有问题,10000也没有问题,10w个可能也没问题;那,100w个呢?1000w个呢?(这里只是举个极端的例子,实际编程起这么大规模的goroutine的例子极少)这里就会出问题,什么问题呢?

  1. 首先,即便每个goroutine只分配2KB的内存,但如果是恐怖如斯的数量,聚少成多,内存暴涨,就会对GC造成极大的负担,写过java的同学应该知道jvm GC那万恶的STW(Stop The World)机制,也就是GC的时候会挂起用户程序直到垃圾回收完,虽然Go1.8之后的GC已经去掉了STW以及优化成了并行GC,性能上有了不小的提升,但是,如果太过于频繁地进行GC,依然会有性能瓶颈;
  2. 其次,还记得前面我们说的runtime和GC也都是goroutine吗?是的,如果goroutine规模太大,内存吃紧,runtime调度和垃圾回收同样会出问题,虽然G-P-M模型足够优秀,韩信点兵,多多益善,但你不能不给士兵发口粮(内存)吧?巧妇难为无米之炊,没有内存,Go调度器就会阻塞goroutine,结果就是P的Local队列积压,又导致内存溢出,这就是个死循环...,甚至极有可能程序直接Crash掉,本来是想享受golang并发带来的快感效益,结果却得不偿失。

一个http标准库引发的血案

我想,作为golang拥趸的Gopher们一定都使用过它的net/http标准库,很多人都说用golang写web server完全可以不用借助第三方的web framework,仅用net/http标准库就能写一个高性能的web server,的确,我也用过它写过web server,简洁高效,性能表现也相当不错,除非有比较特殊的需求否则一般的确不用借助第三方web framework,但是天下没有白吃的午餐,net/http为啥这么快?要搞清这个问题,从源码入手是最好的途径。孔子曾经曰过:源码面前,如同裸奔。所以,高清无码是阻碍程序猿发展大大滴绊脚石啊,源码才是我们进步阶梯,切记切记!

接下来我们就来先看看net/http内部是怎么实现的。

net/http接收请求且开始处理的源码放在src/net/http/server.go里,先从入口函数ListenAndServe进去:

func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

看到最后那个srv.Serve调用了吗?没错,这个Serve方法里面就是实际处理http请求的逻辑,我们再进入这个方法内部:

func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    ...
    // 不断循环取出TCP连接
    for {
        // 看我看我!!!
        rw, e := l.Accept()
        ...
        // 再看我再看我!!!
        go c.serve(ctx)
    }
}

首先,这个方法的参数(l net.Listener) ,是一个TCP监听的封装,负责监听网络端口,rw, e := l.Accept()则是一个阻塞操作,从网络端口取出一个新的TCP连接进行处理,最后go c.serve(ctx)就是最后真正去处理这个http请求的逻辑了,看到前面的go关键字了吗?没错,这里启动了一个新的goroutine去执行处理逻辑,而且这是在一个无限循环体里面,所以意味着,每来一个请求它就会开一个goroutine去处理,相当任性粗暴啊…,不过有Go调度器背书,一般来说也没啥压力,然而,如果,我是说如果哈,突然一大波请求涌进来了(比方说黑客搞了成千上万的肉鸡DDOS你,没错!就这么倒霉!),这时候,就很成问题了,他来10w个请求你就要开给他10w个goroutine,来100w个你就要老老实实开给他100w个,线程调度压力陡升,内存爆满,再然后,你就跪了…

釜底抽薪

有问题,就一定有解决的办法,那么,有什么方案可以减缓大规模goroutine对系统的调度和内存压力?要想解决问题,最重要的是找到造成问题的根源,这个问题根源是什么?goroutine的数量过多导致资源侵占,那要解决这个问题就要限制运行的goroutine数量,合理复用,节省资源,具体就是 — goroutine池化。

超大规模并发的场景下,不加限制的大规模的goroutine可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢还是其次,更危险的是可能使得程序crash。所以,goroutine池化是有其现实意义的。

首先,100w个任务,是不是真的需要100w个goroutine来处理?未必!用1w个goroutine也一样可以处理,让一个goroutine多处理几个任务就是了嘛,池化的核心优势就在于对goroutine的复用。此举首先极大减轻了runtime调度goroutine的压力,其次,便是降低了对内存的消耗。

有一个商场,来了1000个顾客买东西,那么该如何安排导购员服务这1000人呢?有两种方案:

第一,我雇1000个导购员实行一对一服务,这种当然是最高效的,但是太浪费资源了,雇1000个人的成本极高且管理困难,这些可以先按下不表,但是每个顾客到商场买东西也不是一进来就马上买,一般都得逛一逛,选一选,也就是得花时间挑,1000个导购员一对一盯着,效率极低;这就引出第二种方案:我只雇10个导购员,就在商场里待命,有顾客需要咨询的时候招呼导购员过去进行处理,导购员处理完之后就回来,等下一个顾客需要咨询的时候再去,如此往返反复...

第二种方案有没有觉得很眼熟?没错,其基本思路就是模拟一个I/O多路复用,通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。关于多路复用,不在本文的讨论范围之内,便不再赘述,详细原理可以参考 I/O多路复用

第一种方案就是net/http标准库采用的:来一个请求开一个goroutine处理;第二种方案就是Goroutine Pool(I/O多路复用)。

实现一个 Goroutine Pool

因为上述陈列的一些由于goroutine规模过大而可能引发的问题,需要有方案来解决这些问题,上文已经分析过,把goroutine池化是一种行之有效的方案,基于此,可以实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。

哎玛!前面絮絮叨叨了这么多,终于进入正题了,接下来就开始讲解如何实现一个高性能的Goroutine Pool,秒杀原生并发的goroutine,在执行速度和占用内存上提高并发程序的性能。好了,话不多说,开始装逼分析。

设计思路

Goroutine Pool 的实现思路大致如下:

启动服务之时先初始化一个 Goroutine Pool 池,这个Pool维护了一个类似栈的LIFO队列 ,里面存放负责处理任务的Worker,然后在client端提交task到Pool中之后,在Pool内部,接收task之后的核心操作是:

  1. 检查当前Worker队列中是否有空闲的Worker,如果有,取出执行当前的task;
  2. 没有空闲Worker,判断当前在运行的Worker是否已超过该Pool的容量,是 — 阻塞等待直至有Worker被放回Pool;否 — 新开一个Worker(goroutine)处理;
  3. 每个Worker执行完任务之后,放回Pool的队列中等待。

调度过程如下:

按照这个设计思路,我实现了一个高性能的Goroutine Pool,较好地解决了上述的大规模调度和资源占用的问题,在执行速度和内存占用方面相较于原生goroutine并发占有明显的优势,尤其是内存占用,因为复用,所以规避了无脑启动大规模goroutine的弊端,可以节省大量的内存。

此外,该调度系统还有一个清理过期Worker的定时任务,该任务在初始化一个Pool之时启动,每隔一定的时间间隔去检查空闲Worker队列中是否有已经过期的Worker,有则清理掉,通过定时清理过期worker,进一步节省系统资源。

完整的项目代码可以在我的github上获取:传送门,也欢迎提意见和交流。

实现细节

Goroutine Pool的设计原理前面已经讲过了,整个调度过程相信大家应该可以理解了,但是有一句老话说得好,空谈误国,实干兴邦,设计思路有了,具体实现的时候肯定会有很多细节、难点,接下来我们通过分析这个Goroutine Pool的几个核心实现以及它们的联动来引导大家过一遍Goroutine Pool的原理。

首先是Pool struct

type sig struct{}

type f func() error

// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // expiryDuration set the expired time (second) of every worker.
    expiryDuration time.Duration

    // freeSignal is used to notice pool there are available
    // workers which can be sent to work.
    freeSignal chan sig

    // workers is a slice that store the available workers.
    workers []*Worker

    // release is used to notice the pool to closed itself.
    release chan sig

    // lock for synchronous operation
    lock sync.Mutex

    once sync.Once
}

Pool是一个通用的协程池,支持不同类型的任务,亦即每一个任务绑定一个函数提交到池中,批量执行不同类型任务,是一种广义的协程池;本项目中还实现了另一种协程池 — 批量执行同类任务的协程池PoolWithFunc,每一个PoolWithFunc只会绑定一个任务函数pf,这种Pool适用于大批量相同任务的场景,因为每个Pool只绑定一个任务函数,因此PoolWithFunc相较于Pool会更加节省内存,但通用性就不如前者了,为了让大家更好地理解协程池的原理,这里我们用通用的Pool来分析。

capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker绑定一个goroutine;running是当前正在执行任务的worker数量;expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;freeSignal是一个信号,因为Pool开启的worker数量有上限,因此当全部worker都在执行任务的时候,新进来的请求就需要阻塞等待,那当执行完任务的worker被放回Pool之时,如何通知阻塞的请求绑定一个空闲的worker运行呢?freeSignal就是来做这个事情的;workers是一个slice,用来存放空闲worker,请求进入Pool之后会首先检查workers中是否有空闲worker,若有则取出绑定任务执行,否则判断当前运行的worker是否已经达到容量上限,是—阻塞等待,否—新开一个worker执行任务;release是当关闭该Pool支持通知所有worker退出运行以防goroutine泄露;lock是一个锁,用以支持Pool的同步操作;once用在确保Pool关闭操作只会执行一次。

初始化Pool并启动定期清理过期worker任务

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
    if size <= 0 {
        return nil, ErrPoolSizeInvalid
    }
    p := &Pool{
        capacity:       int32(size),
        freeSignal:     make(chan sig, math.MaxInt32),
        release:        make(chan sig, 1),
        expiryDuration: time.Duration(expiry) * time.Second,
    }
    // 启动定期清理过期worker任务,独立goroutine运行,
    // 进一步节省系统资源
    p.monitorAndClear()
    return p, nil
}

提交任务到Pool

p.Submit(task f)如下:

// Submit submit a task to pool
func (p *Pool) Submit(task f) error {
    if len(p.release) > 0 {
        return ErrPoolClosed
    }
    w := p.getWorker()
    w.sendTask(task)
    return nil
}

第一个if判断当前Pool是否已被关闭,若是则不再接受新任务,否则获取一个Pool中可用的worker,绑定该task执行。

获取可用worker(核心)

p.getWorker()源码:

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
    var w *Worker
    // 标志,表示当前运行的worker数量是否已达容量上限
    waiting := false
    // 涉及从workers队列取可用worker,需要加锁
    p.lock.Lock()
    workers := p.workers
    n := len(workers) - 1
    // 当前worker队列为空(无空闲worker)
    if n < 0 {
        // 运行worker数目已达到该Pool的容量上限,置等待标志
        if p.running >= p.capacity {
            waiting = true
        // 否则,运行数目加1
        } else {
            p.running++
        }
    // 有空闲worker,从队列尾部取出一个使用
    } else {
        <-p.freeSignal
        w = workers[n]
        workers[n] = nil
        p.workers = workers[:n]
    }
    // 判断是否有worker可用结束,解锁
    p.lock.Unlock()

    if waiting {
        // 阻塞等待直到有空闲worker
        <-p.freeSignal
        p.lock.Lock()
        workers = p.workers
        l := len(workers) - 1
        w = workers[l]
        workers[l] = nil
        p.workers = workers[:l]
        p.lock.Unlock()
    // 当前无空闲worker但是Pool还没有满,
    // 则可以直接新开一个worker执行任务
    } else if w == nil {
        w = &Worker{
            pool: p,
            task: make(chan f),
        }
        w.run()
    }
    return w
}

上面的源码中加了较为详细的注释,结合前面的设计思路,相信大家应该能理解获取可用worker绑定任务执行这个协程池的核心操作,主要就是实现一个LIFO队列用来存取可用worker达到资源复用的效果,之所以采用LIFO后进先出队列是因为后进先出可以保证空闲worker队列是按照每个worker的最后运行时间从远到近的顺序排列,方便在后续定期清理过期worker时排序以及清理完之后重新分配空闲worker队列,这里还要关注一个地方:达到Pool容量限制之后,额外的任务请求需要阻塞等待idle worker,这里是为了防止无节制地创建goroutine,事实上Go调度器有一个复用机制,每次使用go关键字的时候它会检查当前结构体M中的P中,是否有可用的结构体G。如果有,则直接从中取一个,否则,需要分配一个新的结构体G。如果分配了新的G,需要将它挂到runtime的相关队列中,但是调度器却没有限制goroutine的数量,这在瞬时性goroutine爆发的场景下就可能来不及复用G而依然创建了大量的goroutine,所以ants除了复用还做了限制goroutine数量。

其他部分可以依照注释理解,这里不再赘述。

任务执行

// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
    // pool who owns this worker.
    pool *Pool

    // task is a job should be done.
    task chan f

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
    //atomic.AddInt32(&w.pool.running, 1)
    go func() {
        //监听任务列表,一旦有任务立马取出运行
        for f := range w.task {
            if f == nil {
                atomic.AddInt32(&w.pool.running, -1)
                return
            }
            f()

            //回收复用
            w.pool.putWorker(w)
        }
    }()
}

// stop this worker.
func (w *Worker) stop() {
    w.sendTask(nil)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task f) {
    w.task <- task
}

结合前面的p.Submit(task f)p.getWorker(),提交任务到Pool之后,获取一个可用worker,每新建一个worker实例之时都需要调用w.run()启动一个goroutine监听worker的任务列表task,一有任务提交进来就执行;所以,当调用worker的sendTask(task f)方法提交任务到worker的任务队列之后,马上就可以被接收并执行,当任务执行完之后,会调用w.pool.putWorker(w *Worker)方法将这个已经执行完任务的worker从当前任务解绑放回Pool中,以供下个任务可以使用,至此,一个任务从提交到完成的过程就此结束,Pool调度将进入下一个循环。

Worker回收(goroutine复用)

// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
    // 写入回收时间,亦即该worker的最后运行时间
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.workers = append(p.workers, worker)
    p.lock.Unlock()
    p.freeSignal <- sig{}
}

动态扩容或者缩小池容量

// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
    if size < p.Cap() {
        diff := p.Cap() - size
        for i := 0; i < diff; i++ {
            p.getWorker().stop()
        }
    } else if size == p.Cap() {
        return
    }
    atomic.StoreInt32(&p.capacity, int32(size))
}

定期清理过期Worker

func (p *Pool) monitorAndClear() {
    heartbeat := time.NewTicker(p.expiryDuration)
    go func() {
        for range heartbeat.C {
            currentTime := time.Now()
            p.lock.Lock()
            idleWorkers := p.workers
            n := 0
            for i, w := range idleWorkers {
                if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                    break
                }
                n = i
                w.stop()
                idleWorkers[i] = nil
                p.running--
            }
            if n > 0 {
                n++
                p.workers = idleWorkers[n:]
            }
            p.lock.Unlock()
        }
    }()
}

定期检查空闲worker队列中是否有已过期的worker并清理:因为采用了LIFO后进先出队列存放空闲worker,所以该队列默认已经是按照worker的最后运行时间由远及近排序,可以方便地按顺序取出空闲队列中的每个worker并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长,若是,则清理掉该goroutine,释放该worker,并且将剩下的未过期worker重新分配到当前Pool的空闲worker队列中,进一步节省系统资源。

概括起来,ants Goroutine Pool的调度过程图示如下:

彩蛋

还记得前面我说除了通用的Pool struct之外,本项目还实现了一个PoolWithFunc struct—一个执行批量同类任务的协程池,PoolWithFunc相较于Pool,因为一个池只绑定一个任务函数,省去了每一次task都需要传送一个任务函数的代价,因此其性能优势比起Pool更明显,这里我们稍微讲一下一个协程池只绑定一个任务函数的细节:

上码!

type pf func(interface{}) error

// PoolWithFunc accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // expiryDuration set the expired time (second) of every worker.
    expiryDuration time.Duration

    // freeSignal is used to notice pool there are available
    // workers which can be sent to work.
    freeSignal chan sig

    // workers is a slice that store the available workers.
    workers []*WorkerWithFunc

    // release is used to notice the pool to closed itself.
    release chan sig

    // lock for synchronous operation
    lock sync.Mutex

    // pf is the function for processing tasks
    poolFunc pf

    once sync.Once
}

PoolWithFunc struct中的大部分字段和Pool struct基本一致,重点关注poolFunc pf,这是一个函数类型,也就是该Pool绑定的指定任务函数,而client提交到这种类型的Pool的数据就不再是一个任务函数task f了,而是poolFunc pf任务函数的形参,然后交由WorkerWithFunc处理:

// WorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type WorkerWithFunc struct {
    // pool who owns this worker.
    pool *PoolWithFunc

    // args is a job should be done.
    args chan interface{}

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *WorkerWithFunc) run() {
    go func() {
        for args := range w.args {
            if args == nil || len(w.pool.release) > 0 {
                atomic.AddInt32(&w.pool.running, -1)
                return
            }
            w.pool.poolFunc(args)
            w.pool.putWorker(w)
        }
    }()
}

// stop this worker.
func (w *WorkerWithFunc) stop() {
    w.sendTask(nil)
}

// sendTask sends a task to this worker.
func (w *WorkerWithFunc) sendTask(args interface{}) {
    w.args <- args
}

上面的源码可以看到WorkerWithFunc是一个类似Worker的结构,只不过监听的是函数的参数队列,每接收到一个参数包,就直接调用PoolWithFunc绑定好的任务函数poolFunc pf任务函数执行任务,接下来的流程就和Worker是一致的了,执行完任务后就把worker放回协程池,等待下次使用。

至于其他逻辑如提交task、获取Worker绑定任务等基本复用自Pool struct,具体细节有细微差别,但原理一致,万变不离其宗,有兴趣的同学可以看我在github上的源码:Goroutine Pool协程池 ants

Benchmarks

吹了这么久的Goroutine Pool,那都是虚的,理论上池化可以复用goroutine,提升性能节省内存,没有benchmark数据之前,好像也不能服众哈!所以,本章就来进行一次实测,验证一下再大规模goroutine并发的场景下,Goroutine Pool的表现是不是真的比原生Goroutine并发更好!

测试机器参数:

OS : macOS High Sierra
Processor : 2.7 GHz Intel Core i5
Memory : 8 GB 1867 MHz DDR3

Go1.9

Pool测试

测试代码传送门

测试结果:

这里为了模拟大规模goroutine的场景,两次测试的并发次数分别是100w和1000w,前两个测试分别是执行100w个并发任务不使用Pool和使用了ants的Goroutine Pool的性能,后两个则是1000w个任务下的表现,可以直观的看出在执行速度和内存使用上,ants的Pool都占有明显的优势。100w的任务量,使用ants,执行速度与原生goroutine相当甚至略快,但只实际使用了不到5w个goroutine完成了全部任务,且内存消耗仅为原生并发的40%;而当任务量达到1000w,优势则更加明显了:用了70w左右的goroutine完成全部任务,执行速度比原生goroutine提高了100%,且内存消耗依旧保持在不使用Pool的40%左右。

PoolWithFunc测试

测试代码传送门

测试结果:

  • Benchmarkxxx-4 格式为基准测试函数名-GOMAXPROCS,后面的-4代表测试函数运行时对应的CPU核数
  • 1 表示执行的次数
  • xx ns/op 表示每次的执行时间
  • xx B/op 表示每次执行分配的总字节数(内存消耗)
  • xx allocs/op 表示每次执行发生了多少次内存分配

因为PoolWithFunc这个Pool只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于Pool对原生goroutine在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生goroutine的300%,而内存消耗的优势已经达到了两位数的差距,原生goroutine的内存消耗达到了ants的35倍且原生goroutine的每次执行的内存分配次数也达到了ants45倍,1000w的任务量,ants的初始分配容量是5w,因此它完成了所有的任务依旧只使用了5w个goroutine!事实上,ants的Goroutine Pool的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。

吞吐量测试

上面的benchmarks出来以后,我当时的内心是这样的:

但是太顺利反而让我疑惑,因为结合我过去这20几年的坎坷人生来看,事情应该不会这么美好才对,果不其然,细细一想,虽然ants Groutine Pool能在大规模并发下执行速度和内存消耗都对原生goroutine占有明显优势,但前面的测试demo相信大家注意到了,里面使用了WaitGroup,也就是用来对goroutine同步的工具,所以上面的benchmarks中主进程会等待所有子goroutine完成任务后才算完成一次性能测试,然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情...

彼时,我内心变成了这样:

幸好,ants在同步批量任务方面有点曲高和寡,但是如果是异步批量任务的场景下,就有用武之地了,也就是说,在大批量的任务无须同步等待完成的情况下,可以再测一下ants和原生goroutine并发的性能对比,这个时候的性能对比也即是吞吐量对比了,就是在相同大规模数量的请求涌进来的时候,ants和原生goroutine谁能用更快的速度、更少的内存『吞』完这些请求。

测试代码传送门

测试结果:

10w 吞吐量

100w 吞吐量

1000W 吞吐量

因为在我的电脑上测试1000w吞吐量的时候原生goroutine已经到了极限,因此程序直接把电脑拖垮了,无法正常测试了,所以1000w吞吐的测试数据只有antsPool的。

从该demo测试吞吐性能对比可以看出,使用ants的吞吐性能相较于原生goroutine可以保持在2~6倍的性能压制,而内存消耗则可以达到10~20倍的节省优势。

总结

至此,一个高性能的 Goroutine Pool 开发就完成了,事实上,原理不难理解,总结起来就是一个『复用』,具体落实到代码细节就是锁同步、原子操作、channel通信等这些技巧的使用,ant这整个项目没有借助任何第三方的库,用golang的标准库就完成了所有功能,因为本身golang的语言原生库已经足够优秀,很多时候开发golang项目的时候是可以保持轻量且高性能的,未必事事需要借助第三方库。

关于ants的价值,其实前文也提及过了,ants在大规模的异步&同步批量任务处理都有着明显的性能优势(特别是异步批量任务),而单机上百万上千万的同步批量任务处理现实意义不大,但是在异步批量任务处理方面有很大的应用价值,所以我个人觉得,Goroutine Pool真正的价值还是在:

  1. 限制并发的goroutine数量;
  2. 复用goroutine,减轻runtime调度压力,提升程序性能;
  3. 规避过多的goroutine侵占系统资源(CPU&内存)。

后记

Go语言的三位最初的缔造者 — Rob Pike、Robert Griesemer 和 Ken Thompson 中,Robert Griesemer 参与设计了Java的HotSpot虚拟机和Chrome浏览器的JavaScript V8引擎,Rob Pike 在大名鼎鼎的bell lab侵淫多年,参与了Plan9操作系统、C编译器以及多种语言编译器的设计和实现,Ken Thompson 更是图灵奖得主、Unix之父、C语言之父。这三人在计算机史上可是元老级别的人物,特别是 Ken Thompson ,是一手缔造了Unix和C语言计算机领域的上古大神,所以Go语言的设计哲学有着深深的Unix烙印:简单、模块化、正交、组合、pipe、功能短小且聚焦等;而令许多开发者青睐于Go的简洁、高效编程模式的原因,也正在于此。

Go语言的三个爸爸

本文从三大线程模型到Go并发调度器再到自定制的 Goroutine Pool,算是较为完整的窥探了整个Go语言并发模型的前世今生,我们也可以看到,Go的设计当然不完美,比如一直被诟病的error处理模式、不支持泛型、差强人意的包管理以及面向对象模式的过度抽象化等等,实际上没有任何一门编程语言敢说自己是完美的,还是那句话,任何不考虑应用场景和语言定位的争执都毫无意义,而Go的定位从出道开始就是系统编程语言&云计算编程语言(这个有点模糊),而Go的作者们也一直坚持的是用最简单抽象的工程化设计完成最复杂的功能,所以如果从这个层面去看Go的并发模型,就可以看出其实除了G-P-M模型中引入的 P ,并没有太多革新的原创理论,两级线程模型是早已成熟的理论,抢占式调度更不是什么新鲜的调度模式,Go的伟大之处是在于它诞生之初就是依照Go在谷歌:以软件工程为目的的语言设计而设计的,Go其实就是将这些经典的理论和技术以一种优雅高效的工程化方式组合了起来,并用简单抽象的API或语法糖开放给使用者,Go一直致力于找寻一个高性能&开发效率的双赢点,目前为止,它做得远不够完美,但足够优秀。另外Go通过引入channel与goroutine协同工作,将一种区别于锁&原子操作的并发编程模式 — CSP 带入了Go语言,对开发人员在并发编程模式上的思考有很大的启发。

从本文中对Go调度器的分析以及antsGoroutine Pool 的设计与实现过程,对Go的并发模型做了一次解构和优化思考,在ants中的代码实现对锁同步、原子操作、channel通信的使用也做了一次较为全面的实践,希望对Gopher们在Go语言并发模型与并发编程的理解上能有所裨益。

感谢阅读。

参考

查看原文

赞 57 收藏 45 评论 5

Martin91 发布了文章 · 2019-09-22

谨防猴子补丁以及Python中排查技巧

背景

前两天晚上线上系统突发故障,在立马打开线上错误日志之后,却只能得到一堆毫无意义的程序调用栈(traceback)的输出,于是团队成员陷入漫长而又抓瞎的问题排查过程中。问题很幸运地得到了解决,但是我一直想不明白为什么日志里打印的调用栈毫无意义,按照经验,它应该打印的是异常产生过程中的调用栈才是。在经过后续的源码分析和排查之后,我才发现其实是因为项目中一个老旧的代码使用了猴子补丁导致,这也是这篇文章想要讨论的内容。

什么是猴子补丁

猴子补丁是一种用来在运行时修改(增加、变更、删除等)系统软件行为的编程方式。在动态语言里有广泛的猴子补丁应用的影子,比如 Ruby 的打开类的特性支持运行时扩展类的定义甚至替换方法的实现,Python 的方法或者函数由于可以在运行时进行替换而使得猴子补丁的应用非常方便,其他像 JavaScript 语言同样可以应用猴子补丁。

猴子补丁是把双刃剑

猴子补丁以其灵活性,可以实现补丁代码和应用代码的完全分离,同时使得应用代码在调用方式上保持调用方式始终不变。
从应用代码的角度来看,它调用的就是某个模块的原始定义的方法或者函数;而从被调用的方法或者函数的角度来看,猴子补丁的存在对它是透明的存在,以下展示一个 Python 语言的 Demo:

我们从一个极简例子开始,向这个美好的世界问好:

def greet():
    print("Hello World!")

if __name__ == "__main__":
    greet()

假如执行以上脚本,得到的结果是:

$ python demo.py
Hello World!

这个很简单,接下来假如打一个猴子补丁:我们扩充原来的 greet 的行为,现在除了打印信息,还要打印下当前的时间:

from datetime import datetime
def greet():
    print("Hello World!")

# monkey patch
original_greet = greet
def greet_with_time():
    original_greet()
    print(datetime.now())
greet = greet_with_time  # replace the implementation
# monkey patch

if __name__ == "__main__":
    greet() # 这里的调用和原来没有变化

运行它,得到的结果是:

$ python demo.py
Hello World!
2019-09-21 23:40:42.575782

我们得到了预期的结果!
从代码分析,我们添加了一个新的函数 greet_with_time,其会调用原来的 greet 函数,然后打印当前时间,最后将 greet 函数通过将函数赋值给变量的方式完成对 greet 函数的替换。而对于最后的 greet 函数的调用,却无需任何改动,以此达到了同样还是调用 greet 函数,行为却大相径庭的目的。
上面的 demo 只是限于篇幅简化了代码,真实项目里的猴子补丁代码总是在另外的模块或者文件里。想象在一个复杂的大型工程里,如果你的代码里猴子补丁泛滥,可想对于系统的行为分析以及问题排查,将是一种灾难性的挑战。

现在对猴子补丁有了一定的了解之后,我们再来看看我在实际项目中遇到的例子。

一堆毫无意义的堆栈信息

我在本地重现了我开头提到的我们所遇到的异常,以下是和线上环境一致的堆栈信息:

2019-09-19 17:30:11.103|CRITICAL|138:140147476383488|log.py:282|log.log|Task command.celery.crontab_task.some_task[538ddb72-89b0-45fe-811e-107202dc665b] INTERNAL ERROR: AttributeError("'long' object has no attribute 'insert'",)
Traceback (most recent call last):
  File "/usr/local/bin/celery", line 10, in <module>
    sys.exit(main())
  File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 30, in main
    main()
  ...... 限于篇幅,这里省略很多无意义的内容
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 384, in on_success
    return self.on_failure(ret_value)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 443, in on_failure
    self._log_error(exc_info, send_failed_event=send_failed_event)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 511, in _log_error
    'internal': internal}})
  File "/usr/local/lib/python2.7/dist-packages/celery/utils/log.py", line 282, in log
    return Logger.log(self, *args, **kwargs)
None

从这个堆栈信息看,它打印的实际上是调用了 Logger.log 函数的堆栈,其中根本没有任何代码看到有 .insert 相关字眼,其与 AttributeError("'long' object has no attribute 'insert'",) 根本毫无关系,这样的堆栈信息,有和没有基本一个样。于是乎,我接着通过编辑器通过源码进行了更多的探索。

首先还是借助上面的堆栈去分析到底哪里出了问题,所以我先看了 celery/worker/job.py:504-511 处的代码:

        context = {
            'hostname': self.hostname,
            'id': self.id,
            'name': self.name,
            'exc': exception,
            'traceback': traceback,
            'args': sargs,
            'kwargs': skwargs,
            'description': description,
        }

        logger.log(severity, format.strip(), context,
                   exc_info=exc_info,
                   extra={'data': {'id': self.id,
                                   'name': self.name,
                                   'args': sargs,
                                   'kwargs': skwargs,
                                   'hostname': self.hostname,
                                   'internal': internal}})

这里调用了 logger.log 方法(logger 的来源在 Celery 的代码里可分析,但是不是这篇文章的重点,故此不展开)并且通过 context 对象传入了两个重要的信息:exceptiontraceback。在对 logger.log 源码的进一步阅读中,我确认了这块日志打印的核心依赖于对 traceback.print_exception 函数的调用

    def formatException(self, ei):
        """
        Format and return the specified exception information as a string.

        This default implementation just uses
        traceback.print_exception()
        """
        sio = io.StringIO()
        tb = ei[2]
        traceback.print_exception(ei[0], ei[1], tb, None, sio)

于是乎,我回到了 celery/worker/job.py:504-511 处的代码,在 logger.log 前面插入了两种打印错误堆栈信息的代码:

        # context  = ...

        ################################################################
        import traceback as _traceback
        # Method 1: like what logger.log does
        _traceback.print_exception(*exc_info)

        # Method 2: use `format_exception` instead
        print(''.join(_traceback.format_exception(*exc_info)))
        ################################################################

        logger.log(....

重新启动 celery 后,执行异步任务后,得到的第一种错误堆栈和前面我贴出来的堆栈信息是完全一致的,这个倒也好理解,毕竟这里的 print_exception 函数的调用就是 logger.log 里的核心实现。而 format_exception 的调用给了我真正有意义的错误堆栈信息:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 283, in trace_task
    uuid, retval, SUCCESS, request=task_request,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 271, in store_result
    request=request, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 505, in _store_result
    self.set(self.get_key_for_task(task_id), self.encode(meta))
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/redis.py", line 161, in set
    return self.ensure(self._set, (key, value), **retry_policy)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/redis.py", line 150, in ensure
    **retry_policy
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/__init__.py", line 246, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/redis.py", line 170, in _set
    pipe.execute()
  File "/usr/local/lib/python2.7/dist-packages/redis/client.py", line 2879, in execute
    return execute(conn, stack, raise_on_error)
  File "/usr/local/lib/python2.7/dist-packages/redis/client.py", line 2785, in _execute_transaction
    response.insert(i, e)
AttributeError: 'long' object has no attribute 'insert'

好家伙,这下就清晰了,原来这个代码的异常真正出处是这里!
但是问题就来了,为什么print_exceptionformat_exception给出的堆栈信息不一样呢?我充满疑问地去查找了官方文档,但是困惑更重了:

traceback.format_exception(etype, value, tb[, limit])
Format a stack trace and the exception information. The arguments have the same meaning as the corresponding arguments to print_exception(). The return value is a list of strings, each ending in a newline and some containing internal newlines. When these lines are concatenated and printed, exactly the same text is printed as does print_exception().

重点在最后一句,Python 官方文档说了,两个函数输出的错误堆栈是一样(exactly the same text)的!

揪出猴子补丁

其实,问题的真正排查过程耗费了我好多时间,我一直没有往猴子补丁上想,最后倒是在出门赴朋友的饭约的地铁上灵机一动,用手机翻看了公司 GitLab 上的项目代码,一下找到了元凶。

def _patch_print_exception():
    import traceback

    def custom_print_exception(etype, value, tb, limit=None, file=None):
        exc_info = sys.exc_info()
        stack = traceback.extract_stack()
        # ... omit other source codes 

    traceback.print_exception = custom_print_exception

从补丁代码看,补丁直接覆盖了原版的代码,并且实现上也直接粗暴地无视了传入的几个异常信息参数!所以才会出现这么大的乌龙,出现毫无关系的异常堆栈信息!(╯‵□′)╯︵┻━┻

排查猴子补丁的技巧

猴子补丁这类编程技巧固然会利弊共存,使用上必然需要额外慎重,但也并非需要敬而远之,重点是掌握必要的排查技巧,以下我针对这次的教训又去找下一些可能有帮助的方法:

1. 通过函数或方法自身属性检查方法或者函数的信息

众所周知,Python 的所有对象都有一堆内置的属性,函数也不例外,以我项目中的例子:

# django shell
In [1]: traceback.print_exception.func_code
Out[1]: <code object custom_print_exception at 0x109e9f030, file "/Users/boy/work_area/project/project-source/lib/common/logger.py", line 295>

一看就知道,这个函数的真实代码其实就是项目中的补丁代码!

2. 借助 inspect 包来检查

Python 自身提供的工具包非常多,inspect 自然也是利器之一,其可以用来对几乎所有类型做运行时的检查,还是以我的实际例子:

# django shell
In [1]: import inspect
In [2]: inspect.getfile(traceback.print_exception)
Out[2]: '/Users/boy/work_area/project/project-source/lib/common/logger.py'

In [3]: inspect.getsource(traceback.print_exception)
Out[3]: '\tdef custom_print_exception(etype, value, tb, limit=None, file=None): ......\n'

In [4]: print inspect.getsource(traceback.print_exception)
Out[4]: def custom_print_exception(etype, value, tb, limit=None, file=None):disable=redefined-builtin
            if file is None:
                file = sys.stderr
            exc_info = sys.exc_info()
            stack = traceback.extract_stack()
            ...

总之,如果遇上代码行为与预期不符却又无法和官方文档或者官方源码对应,那么可能就是依赖的方法或者函数被打了猴子补丁,而最快速确认猴子补丁的方式,就是第一时间检查所调用的函数或者方法的实际定义,即应用上述方法即可!

题外话

做 Ruby 开发时,我也遇到过猴子补丁的陷阱,Ruby 里也有类似的方法:

file, line = A.new.method(:foo).source_location
puts "Method foo is defined in #{file}, line #{line}"
# => "Method foo is defined in temp.rb, line 2"

参考链接

  1. Wikipedia: Monkey patch
  2. Python's official document: traceback.format_exception
  3. Python's official document: inspect
  4. How can I get source code of a method dynamically and also which file is this method locate in
查看原文

赞 3 收藏 0 评论 0

Martin91 发布了文章 · 2019-09-06

Mac OS 环境 Rails 6.0 下 webpack-dev-server wrong version 问题解决方案

错误信息

昨天装上了 Ruby on Rails 6.0,满心欢喜初始化项目并且按照指引安装了 webpacker 之后,执行熟悉无比的 rails c 命令,却给了一个报错:

# 错误信息片段

yarn check v1.7.0
success Folder in sync.
Done in 0.15s.
yarn check v1.7.0
error "webpack-dev-server#yargs#cliui" is wrong version: expected "^4.0.0", got "5.0.0"
error "webpack-dev-server#yargs#yargs-parser" is wrong version: expected "^11.1.1", got "13.1.1"
error Found 2 errors.
info Visit https://yarnpkg.com/en/docs/cli/check for documentation about this command.

解决方案

目前关于 Rails 6.0 相关的资料感觉不多,所幸找到了一篇日文版的帖子,成功解决了上边的问题:

$ brew upgrade yarn
$ yarn upgrade

最终问题解决,又能愉快地前进了!

注意:我自己已经一年多没怎么开发 Rails 项目了,全栈开发那就更久远了。Rails 6.0 改动比较大,很多新的组件我自己也还没有来得及熟悉,所以这篇文章就先不做上面解决方案的原理分析了,仅为备忘,可能会有其他人遇到一样的问题。

参考资料

查看原文

赞 1 收藏 0 评论 0

Martin91 发布了文章 · 2019-08-18

pymysql 开启调试模式

今天在排查线上一个奇怪的数据库连接问题,所以打开了 pymysql 的源码在阅读,发现 pymysql 在其 connections 模块里内置了一个 DEBUG 变量用于控制是否开启调试模式,是的话,会将当前连接的操作以及报文内容都打印到控制台。

使用方法

在你的服务器初始化代码里,加上对 DEBUG 的设置,比如:

import pymysql
pymysql.install_as_MySQLdb()
pymysql.connections.DEBUG = True  # 这是我新加的一行

重启服务器后,访问相关接口,会看到标准输出里有类似下面的一些输出:

clipboard.png

查看原文

赞 0 收藏 0 评论 0

Martin91 发布了文章 · 2019-08-15

django 快速启动数据库客户端程序

        实际工作经历中,免不了有时候需要连接数据库进行问题排查分析的场景,之前一直习惯通过 mysql -uxxx -hxxxx -P1234 ... 这样的方式来启动命令行形式的 MySQL 数据库客户端程序,只是用起来比较麻烦,每次都要拷贝各个配置参数,还要记得不要在命令里显式打印密码。后来想起来在开发 Ruby on Rails 程序的时候,其提供了 rails dbconsole 的命令,可以方便直接启动对应的数据库客户端命令行程序,联想到 Django 理论上也有,所以找到了 python manage.py dbshell 这个命令,使用效果和自己手动敲 mysql 命令行是一样的,省去繁琐的参数设定步骤。

使用效果

clipboard.png

用法

其用法可以直接查询命令行帮助文档:

# python manage.py dbshell -h
Usage: manage.py dbshell [options]

Runs the command-line client for specified database, or the default database if none is provided.

Options:
  -v VERBOSITY, --verbosity=VERBOSITY
                        Verbosity level; 0=minimal output, 1=normal output,
                        2=verbose output, 3=very verbose output
  --settings=SETTINGS   The Python path to a settings module, e.g.
                        "myproject.settings.main". If this isn't provided, the
                        DJANGO_SETTINGS_MODULE environment variable will be
                        used.
  --pythonpath=PYTHONPATH
                        A directory to add to the Python path, e.g.
                        "/home/djangoprojects/myproject".
  --traceback           Raise on exception
  --database=DATABASE   Nominates a database onto which to open a shell.
                        Defaults to the "default" database.
  --version             show program's version number and exit
  -h, --help            show this help message and exit
查看原文

赞 0 收藏 0 评论 0

Martin91 发布了文章 · 2019-03-23

不严谨的不同语言下大 Excel 文件写入的性能比较

背景

去年因为线上系统需要导出大量数据(大概是 11 万行)到 Excel,代码是 Python 2.7 写的,除去数据库查询耗时,整个的 Excel 文件生成也还要耗费几十秒的时间,这听起来真是一个非常夸张的事情。后来为其更换了号称性能表现最好的 pyexcelerate 库,性能确实有提升,但是仍是差强人意的在小几十秒。

昨天突发奇想,如果是换成其他语言,这个 excel 导出是否还需要这么长时间?于是经过一番试验之后,就有了今天的这篇文章。

特别声明:试验只是为了感官上做个简单对比,测试结果采集数据只考虑了耗时,没有考虑资源消耗等情况,需要严谨的性能对比的读者,可以放弃阅读了。

测试内容

使用不同的语言及其版本,测试各自完成包含 100,000 行 x 50 列单元格的 excel 文件的生成,对比其各自耗费时间,3次重复执行取其平均值后进行横向比较。

已经测试的语言及版本

  • Ruby 2.6 + axlsx 2.0.1
  • Python 2.7 + pyexcelerate 0.7.3
  • Python 3.6 + pyexcelerate 0.7.3
  • Go 1.10.1 + gooxml 0.8

测试代码

https://github.com/Martin91/e...

结果

clipboard.png

结论

就这个测试场景来说:

  1. Go 1.10.1 + gooxml 0.8 是最快的;
  2. 同样是 pyexcelerate 0.7.3,Python 2.7 性能优于 Python 3.6;
  3. Ruby 2.6 + axlsx 2.0.1 表现最不给力,这里有个题外话,选择的 axlsx 本身并不是性能最好的 gem,只是流行度够高,Ruby 有一个专门针对性能优化后的 gem,但是因为知之甚少,没有采用。
查看原文

赞 1 收藏 1 评论 0

Martin91 发布了文章 · 2019-01-14

利用 Postman Chrome app 和 Chrome 浏览器共享网站 cookie

背景

作为一个Web工程师,最熟悉的日常工作莫过于后台接口开发与联调测试,而在接口测试上,大家最喜爱的工具清单里,必然少不了 Postman 这一利器。然而,有时接口测试需要准备好登录态,或者其他状态数据,而这些数据往往就存在浏览器 Cookie 里边。结合本文介绍的工具,便可以无缝在 Postman Chrome app (为什么强调是 Postman Chrome app,文章末尾会说明)和 Chrome 浏览器之间共享 Cookie,而这个共享过程对用户是透明的。

工具清单

以下工具请自行安装,我只贴下官方的软件界面截图。

  1. Chrome 浏览器
  2. Postman Chrome app
    clipboard.png
  3. Postman Interceptor
    clipboard.png

使用步骤

以下我们以 Github 网站为例,演示下如何实现 Cookie 共享。

一、确认 Postman Interceptor 插件安装成功(如图所示)

clipboard.png

二、启动 Postman,在右上角的卫星小图标那里开启 Chrome Interceptor

clipboard.png

三、在 Chrome 浏览器里正常登陆 GitHub 网站(此步骤没什么好演示的 ╭(╯^╰)╮)

四、在 Postman Chrome app 中直接模拟请求通知接口

接口路径:https://github.com/notificati...
clipboard.png
也就是说,这个时候,我们虽然没有对 Postman 做特殊的 Cookie 设置,但是它的请求的登录态都被服务器验证通过了,cookie 共享成功!

假如这个时候退出浏览器的登录态呢?

我们先从 GitHub 退出登录,还是刚才的请求,这个时候的响应是:
clipboard.png
是的,因为 Chrome 里已经退出登录,所以 Postman 这边也自然失去登录态了,说明两边 Cookie 是同步的。

Postman Interceptor 的 Bonus

clipboard.png
Postman Interceptor 还有一点比较爽的是,它的 Request Capture 支持捕捉 Chrome 浏览器里的请求记录,并且自动同步到 Postman Chrome app 里边,这样的话,我们就可以方便直接在 Postman 里获取到我们需要测试的网络请求,而不是一个一个自己填写参数之类的了。
clipboard.png

缺陷

遗憾的是,按照官方说明,现在 Postman Interceptor 的这个Cookie 共享还不能支持独立安装的桌面版(从官方下载而不是从 Chrome 应用市场下载)的 Postman Desktop,所以,如果你希望使用上述功能,你只能安装回 Postman Chrome app,而这个版本相对桌面版,功能自然也会少。

Note: Interceptor feature is supported only in our Postman Chrome Apps and is not available in Postman Desktop Apps at the moment.

另一方面,考虑到 Chrome 浏览器将会在不久的将来停掉 Chrome apps 的支持,可能这个方案也撑不了太久。

如果你真心希望 Postman 将上述功能加到他们的桌面版里,可以到他们的官方GitHub issues去请愿,他们正在收集大家的意见。但是……这个请愿帖已经两年多了,而就在我表达请求之前的几个小时到几天之前,都有人陆续去请愿,所以也不知道会不会真的如愿了。

总结

对于确实需要获取网站 cookie 才能完成接口测试的场景,上述方法有一定的便利性,也才有必要使用我的方法,其他场景的接口测试,你们就无视我吧。

参考链接

  1. Postman: Using the Interceptor to read and write cookies
  2. Postman Help Center: How do I access Chrome's cookies in Postman's Chrome App?
  3. Postman Learning Center: Interceptor extension
  4. Google is phasing out Chrome apps for Mac and Windows
查看原文

赞 3 收藏 2 评论 0

认证与成就

  • 获得 67 次点赞
  • 获得 3 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 3 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2014-03-25
个人主页被 1.2k 人浏览