selfboot

selfboot 查看完整档案

广州编辑中山大学  |  计算机科学 编辑  |  填写所在公司/组织 selfboot.cn 编辑
编辑

selfboot,自启动,只有自己能启动自己

个人动态

selfboot 赞了文章 · 7月30日

程序员必备神器!Shell 脚本编程最佳实践

作者:Myths
链接:https://blog.mythsman.com/201...

前言

由于工作需要,最近重新开始拾掇shell脚本。虽然绝大部分命令自己平时也经常使用,但是在写成脚本的时候总觉得写的很难看。而且当我在看其他人写的脚本的时候,总觉得难以阅读。毕竟shell脚本这个东西不算是正经的编程语言,他更像是一个工具,用来杂糅不同的程序供我们调用。

因此很多人在写的时候也是想到哪里写到哪里,基本上都像是一段超长的main函数,不忍直视。同时,由于历史原因,shell有很多不同的版本,而且也有很多有相同功能的命令需要我们进行取舍,以至于代码的规范很难统一。

考虑到上面的这些原因,我查阅了一些相关的文档,发现这些问题其实很多人都考虑过,而且  也形成了一些不错的文章,但是还是有点零散。因此我就在这里把这些文章稍微整理了一下,作为以后我自己写脚本的技术规范。

代码风格规范

开头有“蛇棒”

所谓shebang其实就是在很多脚本的第一行出现的以#!开头的注释,他指明了当我们没有指定解释器的时候默认的解释器,一般可能是下面这样:

#!/bin/bash

当然,解释器有很多种,除了bash之外,我们可以用下面的命令查看本机支持的解释器:

$ cat /etc/shells
#/etc/shells: valid login shells
/bin/sh
/bin/dash
/bin/bash
/bin/rbash
/usr/bin/screen

当我们直接使用./a.sh来执行这个脚本的时候,如果没有shebang,那么它就会默认用$SHELL指定的解释器,否则就会用shebang指定的解释器。

这种方式是我们推荐的使用方式。

代码有注释

注释,显然是一个常识,不过这里还是要再强调一下,这个在shell脚本里尤为重要。因为很多单行的shell命令不是那么浅显易懂,没有注释的话在维护起来会让人尤其的头大。

注释的意义不仅在于解释用途,而在于告诉我们注意事项,就像是一个README。

具体的来说,对于shell脚本,注释一般包括下面几个部分:

  • shebang
  • 脚本的参数
  • 脚本的用途
  • 脚本的注意事项
  • 脚本的写作时间,作者,版权等
  • 各个函数前的说明注释
  • 一些较复杂的单行命令注释

参数要规范

这一点很重要,当我们的脚本需要接受参数的时候,我们一定要先判断参数是否合乎规范,并给出合适的回显,方便使用者了解参数的使用。

最少,最少,我们至少得判断下参数的个数吧:

if [[ $# != 2 ]];then    
   echo "Parameter incorrect."    
   exit 1
fi

变量和魔数

一般情况下我们会将一些重要的环境变量定义在开头,确保这些变量的存在。

source /etc/profile
export PATH=”/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin:/apps/bin/”

这种定义方式有一个很常见的用途,最典型的应用就是,当我们本地安装了很多java版本时,我们可能需要指定一个java来用。那么这时我们就会在脚本开头重新定义JAVA_HOME以及PATH变量来进行控制。同时,一段好的代码通常是不会有很多硬编码在代码里的“魔数”的。如果一定要有,通常是用一个变量的形式定义在开头,然后调用的时候直接调用这个变量,这样方便日后的修改。

缩进有规矩

对于shell脚本,缩进是个大问题。因为很多需要缩进的地方(比如if,for语句)都不长,所有很多人都懒得去缩进,而且很多人不习惯用函数,导致缩进功能被弱化。

其实正确的缩进是很重要的,尤其是在写函数的时候,否则我们在阅读的时候很容易把函数体跟直接执行的命令搞混。

常见的缩进方法主要有”soft tab”和”hard tab”两种。

  • 所谓soft tab就是使用n个空格进行缩进(n通常是2或4)
  • 所谓hard tab当然就是指真实的\t字符
  • 这里不去撕哪种方式最好,只能说各有各的优劣。反正我习惯用hard tab。
  • 对于if和for语句之类的,我们最好不要把then,do这些关键字单独写一行,这样看上去比较丑。。。

命名有标准

所谓命名规范,基本包含下面这几点:

  • 文件名规范,以.sh结尾,方便识别
  • 变量名字要有含义,不要拼错
  • 统一命名风格,写shell一般用小写字母加下划线

编码要统一

在写脚本的时候尽量使用UTF-8编码,能够支持中文等一些奇奇怪怪的字符。不过虽然能写中文,但是在写注释以及打log的时候还是尽量英文,毕竟很多机器还是没有直接支持中文的,打出来可能会有乱码。这里还尤其需要注意一点,就是当我们是在windows下用utf-8编码来写shell脚本的时候,一定要注意这个utf-8是否是有BOM的。默认情况下windows判断utf-8格式是通过在文件开头加上三个EF BB BF字节来判断的,但是在Linux中默认是无BOM的。因此如果我们是在windows下写脚本的时候,一定要注意将编码改成Utf-8无BOM,一般用notepad++之类的编辑器都能改。否则,在Linux下运行的时候就会识别到开头的三个字符,从而报一些无法识别命令的错。当然,对于跨平台写脚本还有一个比较常见的问题就是换行符不同。windows默认是\r\n而unix下是\n。不过有两个小工具可以非常方便的解决这个问题:dos2unix,unix2dos。

权限记得加

这一点虽然很小,但是我个人却经常忘记,不加执行权限会导致无法直接执行,有点讨厌。。。

日志和回显

日志的重要性不必多说,能够方便我们回头纠错,在大型的项目里是非常重要的。

如果这个脚本是供用户直接在命令行使用的,那么我们最好还要能够在执行时实时回显执行过程,方便用户掌控。

有时候为了提高用户体验,我们会在回显中添加一些特效,比如颜色啊,闪烁啊之类的,具体可以参考ANSI/VT100 Control sequences这篇文章的介绍。

密码要移除

不要把密码硬编码在脚本里,不要把密码硬编码在脚本里,不要把密码硬编码在脚本里。

重要的事情说三遍,尤其是当脚本托管在类似Github这类平台中时。。。

太长要分行

在调用某些程序的时候,参数可能会很长,这时候为了保证较好的阅读体验,我们可以用反斜杠来分行:

./configure \
–prefix=/usr \
–sbin-path=/usr/sbin/nginx \
–conf-path=/etc/nginx/nginx.conf \

注意在反斜杠前有个空格。

编码细节规范

代码有效率

在使用命令的时候要了解命令的具体做法,尤其当数据处理量大的时候,要时刻考虑该命令是否会影响效率。

比如下面的两个sed命令:

sed -n '1p' file
sed -n '1p;1q' file

他们的作用一样,都是获取文件的第一行。但是第一条命令会读取整个文件,而第二条命令只读取第一行。当文件很大的时候,仅仅是这样一条命令不一样就会造成巨大的效率差异。

当然,这里只是为了举一个例子,这个例子真正正确的用法应该是使用head -n1 file命令。。。

勤用双引号

几乎所有的大佬都推荐在使用”$”来获取变量的时候最好加上双引号。

不加上双引号在很多情况下都会造成很大的麻烦,为什么呢?举一个例子:

#!/bin/sh
#已知当前文件夹有一个a.sh的文件
var="*.sh"
echo $var
echo "$var"

他的运行结果如下:

a.sh
*.sh

为啥会这样呢?其实可以解释为他执行了下面的命令:

echo *.sh
echo "*.sh"

在很多情况下,在将变量作为参数的时候,一定要注意上面这一点,仔细体会其中的差异。上面只是一个非常小的例子,实际应用的时候由于这个细节导致的问题实在是太多了。。。

巧用main函数

我们知道,像java,C这样的编译型语言都会有一个函数入口,这种结构使得代码可读性很强,我们知道哪些直接执行,那些是函数。但是脚本不一样,脚本属于解释性语言,从第一行直接执行到最后一行,如果在这当中命令与函数糅杂在一起,那就非常难读了。

用python的朋友都知道,一个合乎标准的python脚本大体上至少是这样的:

#!/usr/bin/env python
def func1():    
  pass
  def func2():    
  pass
  if __name__=='__main__':    
  func1()    
  func2()

他用一个很巧妙的方法实现了我们习惯的main函数,使得代码可读性更强。

在shell中,我们也有类似的小技巧:

#!/usr/bin/env bash  
  
func1(){  
 #do sth  
}  
func2(){  
 #do sth  
}  
main(){  
 func1  
 func2  
}  
main "$@"

我们可以采用这种写法,同样实现类似的main函数,使得脚本的结构化程度更好。

考虑作用域

shell中默认的变量作用域都是全局的,比如下面的脚本:

#!/usr/bin/env bash  
  
var=1  
func(){  
 var=2  
}  
func  
echo $var

他的输出结果就是2而不是1,这样显然不符合我们的编码习惯,很容易造成一些问题。

因此,相比直接使用全局变量,我们最好使用local readonly这类的命令,其次我们可以使用declare来声明变量。这些方式都比使用全局方式定义要好。

函数返回值

在使用函数的时候一定要注意,shell中函数的返回值只能是整数,估计是因为一般情况下一个函数的返回值通常表示这个函数的运行状态,所以一般都是0或者是1就够了,因此就设计成了这样。不过,如果非得想传递字符串,也可以通过下面变通的方法:

func(){  
 echo "2333"  
}  
res=$(func)  
echo "This is from $res."

这样,通过echo或者print之类的就可以做到传一些额外参数的目的。

间接引用值

什么叫间接引用?比如下面这个场景:

VAR1="2323232"
VAR2="VAR1"

我们有一个变量VAR1,又有一个变量VAR2,这个VAR2的值是VAR1的名字,那么我们现在想通过VAR2来获取VAR1的值,这时候应该怎么办呢?

比较土鳖的方法是这样:

eval echo \$$VAR2

啥意思呢?其实就是构造了一个字符串echo XXX,这个XXX就是XXX”,这个XXX就是VAR2的值VAR1,然后再用eval强制解析,这样就做到了变相取值。

这个用法的确可行,但是看起来十分的不舒服,很难直观的去理解,我们并不推荐。而且事实上我们本身就不推荐使用eval这个命令。

比较舒服的写法是下面这样:

echo ${!VAR1}

通过在变量名前加一个!就可以做到简单的间接引用了。

不过需要注意的是,用上面的方法,我们只能够做到取值,而不能做到赋值。如果想要做到赋值,还要老老实实的用eval来处理:

VAR1=VAR2
eval $VAR1=233
echo $VAR2
巧用heredocs

所谓heredocs,也可以算是一种多行输入的方法,即在”<<”后定一个标识符,接着我们可以输入多行内容,直到再次遇到标识符为止。

使用heredocs,我们可以非常方便的生成一些模板文件:

cat>>/etc/rsyncd.conf << EOF  
log file = /usr/local/logs/rsyncd.log  
transfer logging = yes  
log format = %t %a %m %f %b  
syslog facility = local3  
EOF
学会查路径

很多情况下,我们会先获取当前脚本的路径,然后一这个路径为基准,去找其他的路径。通常我们是直接用pwd以期获得脚本的路径。

不过其实这样是不严谨的,pwd获得的是当前shell的执行路径,而不是当前脚本的执行路径。

正确的做法应该是下面这两种:

script_dir=$(cd $(dirname $0) && pwd)
script_dir=$(dirname $(readlink -f $0 ))

应当先cd进当前脚本的目录然后再pwd,或者直接读取当前脚本的所在路径。

代码要简短

这里的简短不单单是指代码长度,而是只用到的命令数。原则上我们应当做到,能一条命令解决的问题绝不用两条命令解决。这不仅牵涉到代码的可读性,而且也关乎代码的执行效率。

最最经典的例子如下:

cat /etc/passwd | grep root
grep root /etc/passwd

cat命令最为人不齿的用法就是这样,用的没有任何意义,明明一条命令可以解决,他非得加根管道。。。

其实代码简短在还能某种程度上能保证效率的提升,比如下面的例子:

#method1
find . -name '*.txt' |xargs sed -i s/233/666/g
find . -name '*.txt' |xargs sed -i s/235/626/g
find . -name '*.txt' |xargs sed -i s/333/616/g
find . -name '*.txt' |xargs sed -i s/233/664/g

#method1
find . -name '*.txt' |xargs sed -i "s/233/666/g;s/235/626/g;s/333/616/g;s/233/664/g"

这两种方法做的事情都一样,就是查找所有的.txt后缀的文件并做一系列替换。前者是多次执行find,后者是执行一次find,但是增加了sed的模式串。第一种更直观一点,但是当替换的量变大的时候,第二种的速度就会比第一种快很多。这里效率提升的原因,就是第二种只要执行一次命令,而第一种要执行多次。并且,巧用xargs命令,我们还可以十分方便的进行并行化处理:

find . -name '*.txt' |xargs -P $(nproc) sed -i "s/233/666/g;s/235/626/g;s/333/616/g;s/233/664/g"

通过-P参数指定并行度,可以进一步加快执行效率。

命令并行化

当我们需要充分考虑执行效率时,我们可能需要在执行命令的时候考虑并行化。shell中最简单的并行化是通过”&”以及”wait”命令来做:

func(){  
 #do sth  
}  
for((i=0;i<10;i++))do  
 func &  
done  
wait

当然,这里并行的次数不能太多,否则机器会卡死。稍微正确的做法比较复杂,以后再讨论,如果图省事可以使用parallel命令来做,或者是用上面提到的xargs来处理。

全文本检索

我们知道,当我们想在文件夹下所有的txt文件中检索某一个字符串(比如233)的时候,我们可能会用类似这样的命令:

find . -name '*.txt' -type f | xargs grep 2333

很多情况下,这个命令会想我们所想的找到对应的匹配行,但是我们需要注意两个小问题。

find命令会符合要求的匹配文件名,但是如果文件名包含空格,这时候将文件名传给grep的时候就会有问题,这个文件就会被当成两个参数,这时候就要加一层处理,保证用空格分开的文件名不会被当成两个参数:

find . -type f|xargs -i echo '"{}"'|xargs grep 2333

有时候,文件的字符集可能跟终端的字符集不一致,这时候就会导致grep在搜索时将文件当成二进制文件从而报binary file matches之类的问题。这时候要么用iconv之类的字符集转换工具将字符集进行切换,要么就在不影响查找的情况下对grep加-a参数,将所有文件看成文本文件:

find . -type f|xargs grep -a 2333

使用新写法

这里的新写法不是指有多厉害,而是指我们可能更希望使用较新引入的一些语法,更多是偏向代码风格的,比如

尽量使用func(){}来定义函数,而不是func{}

尽量使用[[]]来代替[]

尽量使用$()将命令的结果赋给变量,而不是反引号

在复杂的场景下尽量使用printf代替echo进行回显

事实上,这些新写法很多功能都比旧的写法要强大,用的时候就知道了。

其他小tip

考虑到还有很多零碎的点,就不一一展开了,这里简单提一提。

路径尽量保持绝对路径,绝多路径不容易出错,如果非要用相对路径,最好用./修饰

优先使用bash的变量替换代替awk sed,这样更加简短

简单的if尽量使用&& ||,写成单行。

比如[[ x > 2]] && echo x

当export变量时,尽量加上子脚本的namespace,保证变量不冲突

会使用trap捕获信号,并在接受到终止信号时执行一些收尾工作

使用mktemp生成临时文件或文件夹

利用/dev/null过滤不友好的输出信息

会利用命令的返回值判断命令的执行情况

使用文件前要判断文件是否存在,否则做好异常处理

不要处理ls后的数据(比如ls -l | awk ‘{ print $8 }’),ls的结果非常不确定,并且平台有关

读取文件时不要使用for loop而要使用while read

使用cp -r命令复制文件夹的时候要注意如果目的文件夹不存在则会创建,如果存在则会复制到该文件的子文件夹下

静态检查工具shellcheck

概述

为了从制度上保证脚本的质量,我们最简单的想法大概就是搞一个静态检查工具,通过引入工具来弥补开发者可能存在的知识盲点。

市面上对于shell的静态检查工具还真不多,找来找去就找到一个叫shellcheck的工具,开源在github上,有8K多的star,看上去还是十分靠谱的。我们可以去他的主页了解具体的安装和使用信息。

安装

这个工具的对不同平台的支持力度都很大,他至少支持了Debian,Arch,Gentoo,EPEL,Fedora,OS X,openSUSE等等各种的平台的主流包管理工具。安装方便。具体可以参照安装文档

集成

既然是静态检查工具,就一定可以集成在CI框架里,shellcheck可以非常方便的集成在Travis CI中,供以shell脚本为主语言的项目进行静态检查。

样例

在文档的Gallery of bad code里,也提供了非常详细的“坏代码”的标准,具有非常不错的参考价值,可以在闲下来的时候当成”Java Puzzlers“之类的书来读读还是很惬意的。

本质

不过,其实我觉得这个项目最最精华的部分都不是上面的功能,而是他提供了一个非常非常强大的wiki。在这个wiki里,我们可以找到这个工具所有判断的依据。在这里,每一个检测到的问题都可以在wiki里找到对应的问题单号,他不仅告诉我们”这样写不好”,而且告诉我们”为什么这样写不好”,”我们应当怎么写才好”,非常适合刨根问底党进一步研究。

shell脚本写的溜,也是涨薪的必备技能哦!!如果本文对你有所帮助与借鉴,请点个在看转发分享支持一波哦!!

jishuroad.jpg

查看原文

赞 8 收藏 4 评论 0

selfboot 收藏了文章 · 7月27日

布隆过滤器(BloomFilter)原理 实现和性能测试

布隆过滤器(BloomFilter)是一种大家在学校没怎么学过,但在计算机很多领域非常常用的数据结构,它可以用来高效判断某个key是否属于一个集合,有极高的插入和查询效率(O(1)),也非常省存储空间。当然它也不是完美无缺,它也有自己的缺点,接下来跟随我一起详细了解下BloomFilter的实现原理,以及它优缺点、应用场景,最后再看下Google guava包中BloomFilter的实现,并对比下它和HashSet在不同数据量下内存空间的使用情况。

学过数据结构的人都知道,在计算机领域我们经常通过牺牲空间换时间,或者牺牲时间换空间,BloomFilter给了我们一种新的思路——牺牲准确率换空间。是的,BloomFilter不是100%准确的,它是有可能有误判,但绝对不会有漏判断,说通俗点就是,BloomFilter有可能错杀好人,但不会放过任何一个坏人。BloomFilter最大的优点就是省空间,缺点就是不是100%准确,这点当然和它的实现原理有关。

BloomFilter的原理

在讲解BloomFilter的实现前,我们先来了解下什么叫Bitmap(位图),先给你一道《编程珠玑》上的题目。

给你一个有100w个数的集合S,每个数的数据大小都是0-100w,有些数据重复出现,这就意味着有些数据可能都没出现过,让你以O(n)的时间复杂度找出0-100w之间没有出现在S中的数,尽可能减少内存的使用。

既然时间复杂度都限制是O(N)了,意味着我们不能使用排序了,我们可以开一个长为100w的int数组来标记下哪些数字出现过,在就标1,不在标0。但对于每个数来说我们只需要知道它在不在,只是0和1的区别,用int(32位)有点太浪费空间了,我们可以按二进制位来用每个int,这样一个int就可以标记32个数,空间占用率一下子减少到原来的1/32,于是我们就有了位图,Bitmap就是用n个二进制位来记录0-m之间的某个数有没有出现过。

Bitmap的局限在于它的存储数据类型有限,只能存0-m之间的数,其他的数据就存不了了。如果我们想存字符串或者其他数据怎么办?其实也简单,只需要实现一个hash函数,将你要存的数据映射到0-m之间就行了。这里假设你的hash函数产生的映射值是均匀的,我们来计算下一个m位的Bitmap到底能存多少数据?
当你在Bitmap中插入了一个数后,通过hash函数计算它在Bitmap中的位置并将其置为1,这时任意一个位置没有被标为1的概率是:

$$ 1 - \frac{1}{m} $$

当插入n个数后,这个概率会变成:

$$ (1 - \frac{1}{m})^n $$

所以任意一个位置被标记成1的概率就是:

$$ P_1 = 1 - (1 - \frac{1}{m})^n $$

这时候你判断某个key是否在这个集合S中时,只需要看下这个key在hash在Bitmap上对应的位置是否为1就行了,因为两个key对应的hash值可能是一样的,所以有可能会误判,你之前插入了a,但是hash(b)==hash(a),这时候你判断b是否在集合S中时,看到的是a的结果,实际上b没有插入过。

从上面公式中可以看出有 $P_1$ 的概率可能会误判,尤其当n比较大时这个误判概率还是挺大的。 如何减少这个误判率?我们最开始是只取了一个hash函数,如果说取k个不同的hash函数呢!我们每插入一个数据,计算k个hash值,并对k位置为1。在查找时,也是求k个hash值,然后看其是否都为1,如果都为1肯定就可以认为这个数是在集合S中的。
问题来了,用k个hash函数,每次插入都可能会写k位,更耗空间,那在同样的m下,误判率是否会更高呢?我们来推导下。

在k个hash函数的情况下,插入一个数后任意一个位置依旧是0的概率是:

$$ (1 - \frac{1}{m})^k $$

插入n个数后任意一个位置依旧是0的概率是:

$$ (1 - \frac{1}{m})^{kn} $$

所以可知,插入n个数后任意一个位置是1的概率是

$$ 1 - (1 - \frac{1}{m})^{kn} $$

因为我们用是用k个hash共同来判断是否是在集合中的,可知当用k个hash函数时其误判率如下。它一定是比上面1个hash函数时误判率要小(虽然我不会证明)

$$ \left(1-\left[1-\frac{1}{m}\right]^{k n}\right)^{k} < (1 - \left[1 - \frac{1}{m}\right]^n) $$

维基百科也给出了这个误判率的近似公式(虽然我不知道是怎么来的,所以这里就直接引用了)

$$ \left(1-\left[1-\frac{1}{m}\right]^{k n}\right)^{k} \approx\left(1-e^{-k n / m}\right)^{k} $$

到这里,我们重新发明了Bloomfilter,就是这么简单,说白了Bloomfilter就是在Bitmap之上的扩展而已。对于一个key,用k个hash函数映射到Bitmap上,查找时只需要对要查找的内容同样做k次hash映射,通过查看Bitmap上这k个位置是否都被标记了来判断是否之前被插入过,如下图。
在这里插入图片描述

通过公式推导和了解原理后,我们已经知道Bloomfilter有个很大的缺点就是不是100%准确,有误判的可能性。但是通过选取合适的bitmap大小和hash函数个数后,我们可以把误判率降到很低,在大数据盛行的时代,适当牺牲准确率来减少存储消耗还是很值得的。

除了误判之外,BloomFilter还有另外一个很大的缺点 __只支持插入,无法做删除__。如果你想在Bloomfilter中删除某个key,你不能直接将其对应的k个位全部置为0,因为这些位置有可能是被其他key共享的。基于这个缺点也有一些支持删除的BloomFilter的变种,适当牺牲了空间效率,感兴趣可以自行搜索下。

如何确定最优的m和k?

知道原理后再来了解下怎么去实现,我们在决定使用Bloomfilter之前,需要知道两个数据,一个是要存储的数量n和预期的误判率p。bitmap的大小m决定了存储空间的大小,hash函数个数k决定了计算量的大小,我们当然都希望m和k都越小越好,如何计算二者的最优值,我们大概来推导下。(备注:推导过程来自Wikipedia)

由上文可知,误判率p为

$$ p \approx \left(1-e^{-k n / m}\right)^{k} \ (1) $$

对于给定的m和n我们想让误判率p最小,就得让

$$ k=\frac{m}{n} \ln2 \ (2) $$

把(2)式代入(1)中可得

$$ p=\left(1-e^{-\left(\frac{m}{n} \ln 2\right) \frac{n}{m}}\right)^{\frac{m}{n} \ln 2} \ (3) $$

对(3)两边同时取ln并简化后,得到

$$ \ln p=-\frac{m}{n}(\ln 2)^{2} $$

最后可以计算出m的最优值为

$$ m=-\frac{n \ln p}{(\ln 2)^{2}} $$

因为误判率p和要插入的数据量n是已知的,所以我们可以直接根据上式计算出m的值,然后把m和n的值代回到(2)式中就可以得到k的值。至此我们就知道了实现一个bloomfilter所需要的所有参数了,接下来让我们看下Google guava包中是如何实现BloomFilter的。

guava中的BloomFilter

BloomFilter<T>无法通过new去创建新对象,而它提供了create静态方法来生成对象,其核心方法如下。

  static <T> BloomFilter<T> create(
      Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
    checkNotNull(funnel);
    checkArgument(
        expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
    checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
    checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
    checkNotNull(strategy);

    if (expectedInsertions == 0) {
      expectedInsertions = 1;
    }

    long numBits = optimalNumOfBits(expectedInsertions, fpp);
    int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
    try {
      return new BloomFilter<T>(new LockFreeBitArray(numBits), numHashFunctions, funnel, strategy);
    } catch (IllegalArgumentException e) {
      throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
    }
  }

从代码可以看出,需要4个参数,分别是

  • funnel 用来对参数做转化,方便生成hash值
  • expectedInsertions 预期插入的数据量大小,也就是上文公式中的n
  • fpp 误判率,也就是上文公式中的误判率p
  • strategy 生成hash值的策略,guava中也提供了默认策略,一般不需要你自己重新实现

从上面代码可知,BloomFilter创建过程中先检查参数的合法性,之后使用n和p来计算bitmap的大小m(optimalNumOfBits(expectedInsertions, fpp)),通过n和m计算hash函数的个数k(optimalNumOfHashFunctions(expectedInsertions, numBits)),这俩方法的具体实现如下。

  static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
  }
  static long optimalNumOfBits(long n, double p) {
    if (p == 0) {
      p = Double.MIN_VALUE;
    }
    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
  }

其实就是上文中列出的计算公式。
后面插入和查找的逻辑就比较简单了,这里不再赘述,有兴趣可以看下源码,我们这里通过BloomFilter提供的方法列表了解下它的功能就行。
在这里插入图片描述
从上图可以看出,BloomFilter除了提供创建和几个核心的功能外,还支持写入Stream或从Stream中重新生成BloomFilter,方便数据的共享和传输。

使用案例

  • HBase、BigTable、Cassandra、PostgreSQ等著名开源项目都用BloomFilter来减少对磁盘的访问次数,提升性能。
  • Chrome浏览器用BloomFilter来判别恶意网站。
  • 爬虫用BloomFilter来判断某个url是否爬取过。
  • 比特币也用到了BloomFilter来加速钱包信息的同步。

……

和HashSet对比

我们一直在说BloomFilter有巨大的存储优势,做个优势到底有多明显,我们拿jdk自带的HashSet和guava中实现的BloomFilter做下对比,数据仅供参考。

测试环境

测试平台 Mac
guava(28.1)BloomFilter,JDK11(64位) HashSet
使用om.carrotsearch.java-sizeof计算实际占用的内存空间

测试方式

BloomFilter vs HashSet

分别往BloomFilter和HashSet中插入UUID,总计插入100w个UUID,BloomFilter误判率为默认值0.03。每插入5w个统计下各自的占用空间。结果如下,横轴是数据量大小,纵轴是存储空间,单位kb。
在这里插入图片描述
可以看到BloomFilter存储空间一直都没有变,这里和它的实现有关,事实上你在告诉它总共要插入多少条数据时BloomFilter就计算并申请好了内存空间,所以BloomFilter占用内存不会随插入数据的多少而变化。相反,HashSet在插入数据越来越多时,其占用的内存空间也会越来越多,最终在插入完100w条数据后,其内存占用为BloomFilter的100多倍。

在不同fpp下的存储表现

在不同的误判率下,插入100w个UUID,计算其内存空间占用。结果如下,横轴是误判率大小,纵轴是存储空间,单位kb。
在这里插入图片描述

fpp,size
0.1,585.453125
0.01,1170.4765625
1.0E-3,1755.5
1.0E-4,2340.53125
1.0E-5,2925.5546875
1.0E-6,3510.578125
1.0E-7,4095.6015625
1.0E-8,4680.6328125
1.0E-9,5265.65625
1.0E-10,5850.6796875

可以看出,在同等数据量的情况下,BloomFilter的存储空间和ln(fpp)呈反比,所以增长速率其实不算快,即便误判率减少9个量级,其存储空间也只是增加了10倍。

参考资料

  1. wikipedia bloom filter
查看原文

selfboot 赞了文章 · 7月27日

布隆过滤器(BloomFilter)原理 实现和性能测试

布隆过滤器(BloomFilter)是一种大家在学校没怎么学过,但在计算机很多领域非常常用的数据结构,它可以用来高效判断某个key是否属于一个集合,有极高的插入和查询效率(O(1)),也非常省存储空间。当然它也不是完美无缺,它也有自己的缺点,接下来跟随我一起详细了解下BloomFilter的实现原理,以及它优缺点、应用场景,最后再看下Google guava包中BloomFilter的实现,并对比下它和HashSet在不同数据量下内存空间的使用情况。

学过数据结构的人都知道,在计算机领域我们经常通过牺牲空间换时间,或者牺牲时间换空间,BloomFilter给了我们一种新的思路——牺牲准确率换空间。是的,BloomFilter不是100%准确的,它是有可能有误判,但绝对不会有漏判断,说通俗点就是,BloomFilter有可能错杀好人,但不会放过任何一个坏人。BloomFilter最大的优点就是省空间,缺点就是不是100%准确,这点当然和它的实现原理有关。

BloomFilter的原理

在讲解BloomFilter的实现前,我们先来了解下什么叫Bitmap(位图),先给你一道《编程珠玑》上的题目。

给你一个有100w个数的集合S,每个数的数据大小都是0-100w,有些数据重复出现,这就意味着有些数据可能都没出现过,让你以O(n)的时间复杂度找出0-100w之间没有出现在S中的数,尽可能减少内存的使用。

既然时间复杂度都限制是O(N)了,意味着我们不能使用排序了,我们可以开一个长为100w的int数组来标记下哪些数字出现过,在就标1,不在标0。但对于每个数来说我们只需要知道它在不在,只是0和1的区别,用int(32位)有点太浪费空间了,我们可以按二进制位来用每个int,这样一个int就可以标记32个数,空间占用率一下子减少到原来的1/32,于是我们就有了位图,Bitmap就是用n个二进制位来记录0-m之间的某个数有没有出现过。

Bitmap的局限在于它的存储数据类型有限,只能存0-m之间的数,其他的数据就存不了了。如果我们想存字符串或者其他数据怎么办?其实也简单,只需要实现一个hash函数,将你要存的数据映射到0-m之间就行了。这里假设你的hash函数产生的映射值是均匀的,我们来计算下一个m位的Bitmap到底能存多少数据?
当你在Bitmap中插入了一个数后,通过hash函数计算它在Bitmap中的位置并将其置为1,这时任意一个位置没有被标为1的概率是:

$$ 1 - \frac{1}{m} $$

当插入n个数后,这个概率会变成:

$$ (1 - \frac{1}{m})^n $$

所以任意一个位置被标记成1的概率就是:

$$ P_1 = 1 - (1 - \frac{1}{m})^n $$

这时候你判断某个key是否在这个集合S中时,只需要看下这个key在hash在Bitmap上对应的位置是否为1就行了,因为两个key对应的hash值可能是一样的,所以有可能会误判,你之前插入了a,但是hash(b)==hash(a),这时候你判断b是否在集合S中时,看到的是a的结果,实际上b没有插入过。

从上面公式中可以看出有 $P_1$ 的概率可能会误判,尤其当n比较大时这个误判概率还是挺大的。 如何减少这个误判率?我们最开始是只取了一个hash函数,如果说取k个不同的hash函数呢!我们每插入一个数据,计算k个hash值,并对k位置为1。在查找时,也是求k个hash值,然后看其是否都为1,如果都为1肯定就可以认为这个数是在集合S中的。
问题来了,用k个hash函数,每次插入都可能会写k位,更耗空间,那在同样的m下,误判率是否会更高呢?我们来推导下。

在k个hash函数的情况下,插入一个数后任意一个位置依旧是0的概率是:

$$ (1 - \frac{1}{m})^k $$

插入n个数后任意一个位置依旧是0的概率是:

$$ (1 - \frac{1}{m})^{kn} $$

所以可知,插入n个数后任意一个位置是1的概率是

$$ 1 - (1 - \frac{1}{m})^{kn} $$

因为我们用是用k个hash共同来判断是否是在集合中的,可知当用k个hash函数时其误判率如下。它一定是比上面1个hash函数时误判率要小(虽然我不会证明)

$$ \left(1-\left[1-\frac{1}{m}\right]^{k n}\right)^{k} < (1 - \left[1 - \frac{1}{m}\right]^n) $$

维基百科也给出了这个误判率的近似公式(虽然我不知道是怎么来的,所以这里就直接引用了)

$$ \left(1-\left[1-\frac{1}{m}\right]^{k n}\right)^{k} \approx\left(1-e^{-k n / m}\right)^{k} $$

到这里,我们重新发明了Bloomfilter,就是这么简单,说白了Bloomfilter就是在Bitmap之上的扩展而已。对于一个key,用k个hash函数映射到Bitmap上,查找时只需要对要查找的内容同样做k次hash映射,通过查看Bitmap上这k个位置是否都被标记了来判断是否之前被插入过,如下图。
在这里插入图片描述

通过公式推导和了解原理后,我们已经知道Bloomfilter有个很大的缺点就是不是100%准确,有误判的可能性。但是通过选取合适的bitmap大小和hash函数个数后,我们可以把误判率降到很低,在大数据盛行的时代,适当牺牲准确率来减少存储消耗还是很值得的。

除了误判之外,BloomFilter还有另外一个很大的缺点 __只支持插入,无法做删除__。如果你想在Bloomfilter中删除某个key,你不能直接将其对应的k个位全部置为0,因为这些位置有可能是被其他key共享的。基于这个缺点也有一些支持删除的BloomFilter的变种,适当牺牲了空间效率,感兴趣可以自行搜索下。

如何确定最优的m和k?

知道原理后再来了解下怎么去实现,我们在决定使用Bloomfilter之前,需要知道两个数据,一个是要存储的数量n和预期的误判率p。bitmap的大小m决定了存储空间的大小,hash函数个数k决定了计算量的大小,我们当然都希望m和k都越小越好,如何计算二者的最优值,我们大概来推导下。(备注:推导过程来自Wikipedia)

由上文可知,误判率p为

$$ p \approx \left(1-e^{-k n / m}\right)^{k} \ (1) $$

对于给定的m和n我们想让误判率p最小,就得让

$$ k=\frac{m}{n} \ln2 \ (2) $$

把(2)式代入(1)中可得

$$ p=\left(1-e^{-\left(\frac{m}{n} \ln 2\right) \frac{n}{m}}\right)^{\frac{m}{n} \ln 2} \ (3) $$

对(3)两边同时取ln并简化后,得到

$$ \ln p=-\frac{m}{n}(\ln 2)^{2} $$

最后可以计算出m的最优值为

$$ m=-\frac{n \ln p}{(\ln 2)^{2}} $$

因为误判率p和要插入的数据量n是已知的,所以我们可以直接根据上式计算出m的值,然后把m和n的值代回到(2)式中就可以得到k的值。至此我们就知道了实现一个bloomfilter所需要的所有参数了,接下来让我们看下Google guava包中是如何实现BloomFilter的。

guava中的BloomFilter

BloomFilter<T>无法通过new去创建新对象,而它提供了create静态方法来生成对象,其核心方法如下。

  static <T> BloomFilter<T> create(
      Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
    checkNotNull(funnel);
    checkArgument(
        expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
    checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
    checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
    checkNotNull(strategy);

    if (expectedInsertions == 0) {
      expectedInsertions = 1;
    }

    long numBits = optimalNumOfBits(expectedInsertions, fpp);
    int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
    try {
      return new BloomFilter<T>(new LockFreeBitArray(numBits), numHashFunctions, funnel, strategy);
    } catch (IllegalArgumentException e) {
      throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
    }
  }

从代码可以看出,需要4个参数,分别是

  • funnel 用来对参数做转化,方便生成hash值
  • expectedInsertions 预期插入的数据量大小,也就是上文公式中的n
  • fpp 误判率,也就是上文公式中的误判率p
  • strategy 生成hash值的策略,guava中也提供了默认策略,一般不需要你自己重新实现

从上面代码可知,BloomFilter创建过程中先检查参数的合法性,之后使用n和p来计算bitmap的大小m(optimalNumOfBits(expectedInsertions, fpp)),通过n和m计算hash函数的个数k(optimalNumOfHashFunctions(expectedInsertions, numBits)),这俩方法的具体实现如下。

  static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
  }
  static long optimalNumOfBits(long n, double p) {
    if (p == 0) {
      p = Double.MIN_VALUE;
    }
    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
  }

其实就是上文中列出的计算公式。
后面插入和查找的逻辑就比较简单了,这里不再赘述,有兴趣可以看下源码,我们这里通过BloomFilter提供的方法列表了解下它的功能就行。
在这里插入图片描述
从上图可以看出,BloomFilter除了提供创建和几个核心的功能外,还支持写入Stream或从Stream中重新生成BloomFilter,方便数据的共享和传输。

使用案例

  • HBase、BigTable、Cassandra、PostgreSQ等著名开源项目都用BloomFilter来减少对磁盘的访问次数,提升性能。
  • Chrome浏览器用BloomFilter来判别恶意网站。
  • 爬虫用BloomFilter来判断某个url是否爬取过。
  • 比特币也用到了BloomFilter来加速钱包信息的同步。

……

和HashSet对比

我们一直在说BloomFilter有巨大的存储优势,做个优势到底有多明显,我们拿jdk自带的HashSet和guava中实现的BloomFilter做下对比,数据仅供参考。

测试环境

测试平台 Mac
guava(28.1)BloomFilter,JDK11(64位) HashSet
使用om.carrotsearch.java-sizeof计算实际占用的内存空间

测试方式

BloomFilter vs HashSet

分别往BloomFilter和HashSet中插入UUID,总计插入100w个UUID,BloomFilter误判率为默认值0.03。每插入5w个统计下各自的占用空间。结果如下,横轴是数据量大小,纵轴是存储空间,单位kb。
在这里插入图片描述
可以看到BloomFilter存储空间一直都没有变,这里和它的实现有关,事实上你在告诉它总共要插入多少条数据时BloomFilter就计算并申请好了内存空间,所以BloomFilter占用内存不会随插入数据的多少而变化。相反,HashSet在插入数据越来越多时,其占用的内存空间也会越来越多,最终在插入完100w条数据后,其内存占用为BloomFilter的100多倍。

在不同fpp下的存储表现

在不同的误判率下,插入100w个UUID,计算其内存空间占用。结果如下,横轴是误判率大小,纵轴是存储空间,单位kb。
在这里插入图片描述

fpp,size
0.1,585.453125
0.01,1170.4765625
1.0E-3,1755.5
1.0E-4,2340.53125
1.0E-5,2925.5546875
1.0E-6,3510.578125
1.0E-7,4095.6015625
1.0E-8,4680.6328125
1.0E-9,5265.65625
1.0E-10,5850.6796875

可以看出,在同等数据量的情况下,BloomFilter的存储空间和ln(fpp)呈反比,所以增长速率其实不算快,即便误判率减少9个量级,其存储空间也只是增加了10倍。

参考资料

  1. wikipedia bloom filter
查看原文

赞 18 收藏 12 评论 1

selfboot 收藏了文章 · 4月30日

通过wireshark抓包来学习TCP HTTP网络协议

很多招聘需求上都会要求熟悉TCP/IP协议、socket编程之类的,可见这一块是对于web编程是非常重要的。作为一个野生程序员对这块没什么概念,于是便找来一些书籍想来补补。很多关于协议的大部头书都是非常枯燥的,我特意挑了比较友好的《图解TCP/IP》和《图解HTTP》,但看了一遍仍是云里雾里,找不到掌握了知识后的那种自信。所以得换一种思路来学习————通过敲代码来学习,通过抓包工具来分析网络,抓包神器首推wireshark。本文是自己学习TCP过程的记录和总结。

1、使用TCP socket实现服务端和客户端,模拟http请求

写一个简单的server和client,模拟最简单的http请求,即client发送get请求,server返回hello。这里是用golang写的,最近在学习golang。

完成之后可以使用postman充当client测试你的server能不能正常返回响应,或者使用完备的http模块测试你的client。

client向指定端口发送连接请求,连接后发送一个request并收到response断开连接并退出。server可以和不同的客户端建立多个TCP连接,每来了一个新连接就开一个goruntine去处理。

TCP是全双工的,所谓全双工就是读写有两个通道,互不影响,我当时还纳闷在conn上又读又写不会出毛病吗-_-

TCP是流式传输,所以要在for中不断的去读取数据,直到断开。注意没有断开连接的时候是读不到EOF的,代码使用了bufio包中的scanner这个API来逐行读取数据,以\n为结束标志。但数据并不都是以\n结尾的,如果读不到结尾,read就会一直阻塞,所以我们需要通过header中的length判断数据的大小。

我这里偷懒了,只读了header,读到header下面的空行就返回了。加了个超时,客户端5s不理我就断线,如果有数据过来就保持连接。

server:

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "net"
    "time"
)

const rn = "\r\n"

func main() {
    l, err := net.Listen("tcp", ":8888")
    if err != nil {
        panic(err)
    }
    fmt.Println("listen to 8888")

    for {
        conn, err := l.Accept()
        if err != nil {
            fmt.Println("conn err:", err)
        }
        go handleConn(conn)
    }
}

func handleConn(conn net.Conn) {
    defer conn.Close()
    defer fmt.Println("关闭")
    fmt.Println("新连接:", conn.RemoteAddr())
    t := time.Now().Unix()

    // 超时
    go func(t *int64) {
        for {
            if time.Now().Unix() - *t >= 5 {
                fmt.Println("超时")
                conn.Close()
                return
            }
            time.Sleep(100 * time.Millisecond)
        }
    }(&t)

    for {
        data, err := readTcp(conn)
        if err != nil {
            if err == io.EOF {
                continue
            } else {
                fmt.Println("read err:", err)
                break
            }
        }
        if (data > 0) {
            writeTcp(conn)
            t = time.Now().Unix()
        } else {
            break
        }
    }
}

func readTcp(conn net.Conn) (int, error) {
    var buf bytes.Buffer
    var err error
    rd := bufio.NewScanner(conn)
    total := 0

    for rd.Scan() {
        var n int
        n, err = buf.Write(rd.Bytes())
        if err != nil {
            panic(err)
        }
        buf.Write([]byte(rn))
        total += n
        fmt.Println("读到字节:", n)
        if n == 0 {
            break
        }
    }

    err = rd.Err()

    fmt.Println("总字节数:", total)
    fmt.Println("内容:", rn, buf.String())

    return total, err
}

func writeTcp(conn net.Conn) {
    wt := bufio.NewWriter(conn)
    wt.WriteString("HTTP/1.1 200 OK" + rn)
    wt.WriteString("Date: " + time.Now().String() + rn)
    wt.WriteString("Content-Length: 5" + rn)
    wt.WriteString("Content-Type: text/plain" + rn)
    wt.WriteString(rn)
    wt.WriteString("hello")
    err := wt.Flush()
    if err != nil {
        fmt.Println("Flush err: ", err)
    }
    fmt.Println("写入完毕", conn.RemoteAddr())
}

client:

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "net"
    "time"
)

const rn = "\r\n"

func main() {
    conn, err := net.Dial("tcp", ":8888")
    defer conn.Close()
    defer fmt.Println("断开")
    if err != nil {
        panic(err)
    }
    sendReq(conn)
    for {
        total, err := readResp(conn)
        if err != nil {
            panic(err)
        }
        if total > 0 {
            break
        }
    }
}

func sendReq(conn net.Conn) {
    wt := bufio.NewWriter(conn)
    wt.WriteString("GET / HTTP/1.1" + rn)
    wt.WriteString("Date: " + time.Now().String() + rn)
    wt.WriteString(rn)
    err := wt.Flush()
    if err != nil {
        fmt.Println("Flush err: ", err)
    }
    fmt.Println("写入完毕", conn.RemoteAddr())
}

func readResp(conn net.Conn) (int, error) {
    var buf bytes.Buffer
    var err error
    rd := bufio.NewScanner(conn)
    total := 0

    for rd.Scan() {
        var n int
        n, err = buf.Write(rd.Bytes())
        if err != nil {
            panic(err)
        }
        buf.Write([]byte(rn))
        if err != nil {
            panic(err)
        }
        total += n
        fmt.Println("读到字节:", n)
        if n == 0 {
            break
        }
    }

    if err = rd.Err(); err != nil {
        fmt.Println("read err:", err)
    }

    if (total > 0) {
        fmt.Println("resp:", rn, buf.String())
    }

    return total, err
}

2、通过wireshark监听对应端口抓包分析

server和client做出来了,下面来使用wireshark抓包来看看TCP链接的真容。当然你也可以现成的http模块来收发抓包,不过还是建议自己写一个最简单的。因为现成的模块里面很多细节被隐藏,比如我开始用postman发一个请求但是会建立两个连接,疑似是先发了个HEAD请求。
图片描述
打开wireshark,默认设置就行了。选择一个网卡,输入过滤条件开始抓包,因为我们是localhost,所以选择loopback。

图片描述
抓包开始后,启动之前的server监听8888端口,再启动client发送请求,于是便抓到了一次新鲜的TCP请求。

从图中我们可以清晰的看到三次握手(1-3)和四次挥手(9-12),还有seq和ack的变化,基于TCP的HTTP请求和响应,还有什么window update(TCP的窗口控制,告诉客户端我这边很空虚,赶紧发射数据)。

这个时候再结合大部头的协议书籍,理解起来印象更深。还有各种抓包姿势,更多复杂场景,留给大家自己去调教了。

我在抓一次文件上传的过程中,看到有个包length达到了16000,一个TCP包最大的数据载荷能达到多少呢?请听下文分解。

最后给大家推荐两本书《wiresharks网络分析就是这么简单》和《wireshark网络分析的艺术》,这两本为一个系列,作者用通俗易懂的语言,介绍wireshark的奇技淫巧和网络方面的一些解决思路,非常精彩。很多人不断强调数据结构和算法这些内功,不屑于专门学习工具的使用,但好的工具在学习和工作中能带来巨大的帮助,能造出好用的工具更是了不起。

查看原文

selfboot 赞了文章 · 4月30日

通过wireshark抓包来学习TCP HTTP网络协议

很多招聘需求上都会要求熟悉TCP/IP协议、socket编程之类的,可见这一块是对于web编程是非常重要的。作为一个野生程序员对这块没什么概念,于是便找来一些书籍想来补补。很多关于协议的大部头书都是非常枯燥的,我特意挑了比较友好的《图解TCP/IP》和《图解HTTP》,但看了一遍仍是云里雾里,找不到掌握了知识后的那种自信。所以得换一种思路来学习————通过敲代码来学习,通过抓包工具来分析网络,抓包神器首推wireshark。本文是自己学习TCP过程的记录和总结。

1、使用TCP socket实现服务端和客户端,模拟http请求

写一个简单的server和client,模拟最简单的http请求,即client发送get请求,server返回hello。这里是用golang写的,最近在学习golang。

完成之后可以使用postman充当client测试你的server能不能正常返回响应,或者使用完备的http模块测试你的client。

client向指定端口发送连接请求,连接后发送一个request并收到response断开连接并退出。server可以和不同的客户端建立多个TCP连接,每来了一个新连接就开一个goruntine去处理。

TCP是全双工的,所谓全双工就是读写有两个通道,互不影响,我当时还纳闷在conn上又读又写不会出毛病吗-_-

TCP是流式传输,所以要在for中不断的去读取数据,直到断开。注意没有断开连接的时候是读不到EOF的,代码使用了bufio包中的scanner这个API来逐行读取数据,以\n为结束标志。但数据并不都是以\n结尾的,如果读不到结尾,read就会一直阻塞,所以我们需要通过header中的length判断数据的大小。

我这里偷懒了,只读了header,读到header下面的空行就返回了。加了个超时,客户端5s不理我就断线,如果有数据过来就保持连接。

server:

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "net"
    "time"
)

const rn = "\r\n"

func main() {
    l, err := net.Listen("tcp", ":8888")
    if err != nil {
        panic(err)
    }
    fmt.Println("listen to 8888")

    for {
        conn, err := l.Accept()
        if err != nil {
            fmt.Println("conn err:", err)
        }
        go handleConn(conn)
    }
}

func handleConn(conn net.Conn) {
    defer conn.Close()
    defer fmt.Println("关闭")
    fmt.Println("新连接:", conn.RemoteAddr())
    t := time.Now().Unix()

    // 超时
    go func(t *int64) {
        for {
            if time.Now().Unix() - *t >= 5 {
                fmt.Println("超时")
                conn.Close()
                return
            }
            time.Sleep(100 * time.Millisecond)
        }
    }(&t)

    for {
        data, err := readTcp(conn)
        if err != nil {
            if err == io.EOF {
                continue
            } else {
                fmt.Println("read err:", err)
                break
            }
        }
        if (data > 0) {
            writeTcp(conn)
            t = time.Now().Unix()
        } else {
            break
        }
    }
}

func readTcp(conn net.Conn) (int, error) {
    var buf bytes.Buffer
    var err error
    rd := bufio.NewScanner(conn)
    total := 0

    for rd.Scan() {
        var n int
        n, err = buf.Write(rd.Bytes())
        if err != nil {
            panic(err)
        }
        buf.Write([]byte(rn))
        total += n
        fmt.Println("读到字节:", n)
        if n == 0 {
            break
        }
    }

    err = rd.Err()

    fmt.Println("总字节数:", total)
    fmt.Println("内容:", rn, buf.String())

    return total, err
}

func writeTcp(conn net.Conn) {
    wt := bufio.NewWriter(conn)
    wt.WriteString("HTTP/1.1 200 OK" + rn)
    wt.WriteString("Date: " + time.Now().String() + rn)
    wt.WriteString("Content-Length: 5" + rn)
    wt.WriteString("Content-Type: text/plain" + rn)
    wt.WriteString(rn)
    wt.WriteString("hello")
    err := wt.Flush()
    if err != nil {
        fmt.Println("Flush err: ", err)
    }
    fmt.Println("写入完毕", conn.RemoteAddr())
}

client:

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "net"
    "time"
)

const rn = "\r\n"

func main() {
    conn, err := net.Dial("tcp", ":8888")
    defer conn.Close()
    defer fmt.Println("断开")
    if err != nil {
        panic(err)
    }
    sendReq(conn)
    for {
        total, err := readResp(conn)
        if err != nil {
            panic(err)
        }
        if total > 0 {
            break
        }
    }
}

func sendReq(conn net.Conn) {
    wt := bufio.NewWriter(conn)
    wt.WriteString("GET / HTTP/1.1" + rn)
    wt.WriteString("Date: " + time.Now().String() + rn)
    wt.WriteString(rn)
    err := wt.Flush()
    if err != nil {
        fmt.Println("Flush err: ", err)
    }
    fmt.Println("写入完毕", conn.RemoteAddr())
}

func readResp(conn net.Conn) (int, error) {
    var buf bytes.Buffer
    var err error
    rd := bufio.NewScanner(conn)
    total := 0

    for rd.Scan() {
        var n int
        n, err = buf.Write(rd.Bytes())
        if err != nil {
            panic(err)
        }
        buf.Write([]byte(rn))
        if err != nil {
            panic(err)
        }
        total += n
        fmt.Println("读到字节:", n)
        if n == 0 {
            break
        }
    }

    if err = rd.Err(); err != nil {
        fmt.Println("read err:", err)
    }

    if (total > 0) {
        fmt.Println("resp:", rn, buf.String())
    }

    return total, err
}

2、通过wireshark监听对应端口抓包分析

server和client做出来了,下面来使用wireshark抓包来看看TCP链接的真容。当然你也可以现成的http模块来收发抓包,不过还是建议自己写一个最简单的。因为现成的模块里面很多细节被隐藏,比如我开始用postman发一个请求但是会建立两个连接,疑似是先发了个HEAD请求。
图片描述
打开wireshark,默认设置就行了。选择一个网卡,输入过滤条件开始抓包,因为我们是localhost,所以选择loopback。

图片描述
抓包开始后,启动之前的server监听8888端口,再启动client发送请求,于是便抓到了一次新鲜的TCP请求。

从图中我们可以清晰的看到三次握手(1-3)和四次挥手(9-12),还有seq和ack的变化,基于TCP的HTTP请求和响应,还有什么window update(TCP的窗口控制,告诉客户端我这边很空虚,赶紧发射数据)。

这个时候再结合大部头的协议书籍,理解起来印象更深。还有各种抓包姿势,更多复杂场景,留给大家自己去调教了。

我在抓一次文件上传的过程中,看到有个包length达到了16000,一个TCP包最大的数据载荷能达到多少呢?请听下文分解。

最后给大家推荐两本书《wiresharks网络分析就是这么简单》和《wireshark网络分析的艺术》,这两本为一个系列,作者用通俗易懂的语言,介绍wireshark的奇技淫巧和网络方面的一些解决思路,非常精彩。很多人不断强调数据结构和算法这些内功,不屑于专门学习工具的使用,但好的工具在学习和工作中能带来巨大的帮助,能造出好用的工具更是了不起。

查看原文

赞 7 收藏 27 评论 0

selfboot 收藏了文章 · 4月8日

面试问我,创建多少个线程合适?我该怎么说

| 如果好看,请给个赞

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

为什么要使用多线程?

防止并发编程出错最好的办法就是不写并发程序

既然多线程编程容易出错,为什么它还经久不衰呢?

A:那还用说,肯定在某些方面有特长呗,比如你知道的【它很快,非常快】

我也很赞同这个答案,但说的不够具体

并发编程适用于什么场景?

如果问你选择多线程的原因就是一个【快】字,面试也就不会出那么多幺蛾子了。你有没有问过你自己

  1. 并发编程在所有场景下都是快的吗?
  2. 知道它很快,何为快?怎样度量?

想知道这两个问题的答案,我们需要一个从【定性】到【定量】的分析过程

使用多线程就是在正确的场景下通过设置正确个数的线程来最大化程序的运行速度(我感觉你还是啥也没说)

将这句话翻译到硬件级别就是要充分的利用 CPU 和 I/O 的利用率

两个正确得到保证,也就能达到最大化利用 CPU 和 I/O的目的了。最关键是,如何做到两个【正确】?

在聊具体场景的时候,我们必须要拿出我们的专业性来。送你两个名词 buff 加成

  • CPU 密集型程序
  • I/O 密集型程序

CPU 密集型程序

一个完整请求,I/O操作可以在很短时间内完成, CPU还有很多运算要处理,也就是说 CPU 计算的比例占很大一部分

假如我们要计算 1+2+....100亿 的总和,很明显,这就是一个 CPU 密集型程序

在【单核】CPU下,如果我们创建 4 个线程来分段计算,即:

  1. 线程1计算 [1,25亿)
  2. ...... 以此类推
  3. 线程4计算 [75亿,100亿]

我们来看下图他们会发生什么?

由于是单核 CPU,所有线程都在等待 CPU 时间片。按照理想情况来看,四个线程执行的时间总和与一个线程5独自完成是相等的,实际上我们还忽略了四个线程上下文切换的开销

所以,单核CPU处理CPU密集型程序,这种情况并不太适合使用多线程

此时如果在 4 核CPU下,同样创建四个线程来分段计算,看看会发生什么?

每个线程都有 CPU 来运行,并不会发生等待 CPU 时间片的情况,也没有线程切换的开销。理论情况来看效率提升了 4 倍

所以,如果是多核CPU 处理 CPU 密集型程序,我们完全可以最大化的利用 CPU 核心数,应用并发编程来提高效率

I/O密集型程序

与 CPU 密集型程序相对,一个完整请求,CPU运算操作完成之后还有很多 I/O 操作要做,也就是说 I/O 操作占比很大部分

我们都知道在进行 I/O 操作时,CPU是空闲状态,所以我们要最大化的利用 CPU,不能让其是空闲状态

同样在单核 CPU 的情况下:

从上图中可以看出,每个线程都执行了相同长度的 CPU 耗时和 I/O 耗时,如果你将上面的图多画几个周期,CPU操作耗时固定,将 I/O 操作耗时变为 CPU 耗时的 3 倍,你会发现,CPU又有空闲了,这时你就可以新建线程 4,来继续最大化的利用 CPU。

综上两种情况我们可以做出这样的总结:

线程等待时间所占比例越高,需要越多线程;线程CPU时间所占比例越高,需要越少线程。

到这里,相信你已经知道第一个【正确】使用多线程的场景了,那创建多少个线程是正确的呢?

创建多少个线程合适?

面试如果问到这个问题,这可是对你理论和实践的统考。想完全答对,你必须要【精通/精通/精通】小学算术

从上面知道,我们有 CPU 密集型和 I/O 密集型两个场景,不同的场景当然需要的线程数也就不一样了

CPU 密集型程序创建多少个线程合适?

有些同学早已经发现,对于 CPU 密集型来说,理论上 线程数量 = CPU 核数(逻辑) 就可以了,但是实际上,数量一般会设置为 CPU 核数(逻辑)+ 1, 为什么呢?

《Java并发编程实战》这么说:

计算密(CPU)集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作。

所以对于CPU密集型程序, CPU 核数(逻辑)+ 1 个线程数是比较好的经验值的原因了

I/O密集型程序创建多少个线程合适?

上面已经让大家按照图多画几个周期(你可以动手将I/O耗时与CPU耗时比例调大,比如6倍或7倍),这样你就会得到一个结论,对于 I/O 密集型程序:

最佳线程数 = (1/CPU利用率) = 1 + (I/O耗时/CPU耗时)

我这么体贴,当然担心有些同学不理解这个公式,我们将上图的比例手动带入到上面的公式中:

这是一个CPU核心的最佳线程数,如果多个核心,那么 I/O 密集型程序的最佳线程数就是:

最佳线程数 = CPU核心数(1/CPU利用率) = CPU核心数1 + (I/O耗时/CPU耗时)

说到这,有些同学可能有疑问了,要计算 I/O 密集型程序,是要知道 CPU 利用率的,如果我不知道这些,那要怎样给出一个初始值呢?

按照上面公式,假如几乎全是 I/O耗时,所以纯理论你就可以说是 2N(N=CPU核数),当然也有说 2N + 1的,(我猜这个 1 也是 backup),没有找到具体的推倒过程,在【并发编程实战-8.2章节】截图在此,大家有兴趣的可以自己看看

理论上来说,理论上来说,理论上来说,这样就能达到 CPU 100% 的利用率

如果理论都好用,那就用不着实践了,也就更不会有调优的事出现了。不过在初始阶段,我们确实可以按照这个理论之作为伪标准, 毕竟差也可能不会差太多,这样调优也会更好一些

谈完理论,咱们说点实际的,公式我看懂了(定性阶段结束),但是我有两个疑问:

  1. 我怎么知道具体的 I/O耗时和CPU耗时呢?
  2. 怎么查看CPU利用率?

没错,我们需要定量分析了

幸运的是,我们并不是第一个吃螃蟹的仔儿,其实有很多 APM (Application Performance Manager)工具可以帮我们得到准确的数据,学会使用这类工具,也就可以结合理论,在调优的过程得到更优的线程个数了。我这里简单列举几个,具体使用哪一个,具体应用还需要你自己去调研选择,受篇幅限制,暂不展开讨论了

  1. SkyWalking
  2. CAT
  3. zipkin

上面了解了基本的理论知识,那面试有可能问什么?又可能会以怎样的方式提问呢?

面试小问

小问一

假设要求一个系统的 TPS(Transaction Per Second 或者 Task Per Second)至少为20,然后假设每个Transaction由一个线程完成,继续假设平均每个线程处理一个Transaction的时间为4s

如何设计线程个数,使得可以在1s内处理完20个Transaction?

但是,但是,这是因为没有考虑到CPU数目。家里又没矿,一般服务器的CPU核数为16或者32,如果有80个线程,那么肯定会带来太多不必要的线程上下文切换开销(希望这句话你可以主动说出来),这就需要调优了,来做到最佳 balance

小问二

计算操作需要5ms,DB操作需要 100ms,对于一台 8个CPU的服务器,怎么设置线程数呢?

如果不知道请拿三年级期末考试题重新做(今天晚自习留下来),答案是:

线程数 = 8 * (1 + 100/5) = 168 (个)

那如果DB的 QPS(Query Per Second)上限是1000,此时这个线程数又该设置为多大呢?

同样,这是没有考虑 CPU 数目,接下来就又是细节调优的阶段了

因为一次请求不仅仅包括 CPU 和 I/O操作,具体的调优过程还要考虑内存资源,网络等具体内容

增加 CPU 核数一定能解决问题吗?

看到这,有些同学可能会认为,即便我算出了理论线程数,但实际CPU核数不够,会带来线程上下文切换的开销,所以下一步就需要增加 CPU 核数,那我们盲目的增加 CPU 核数就一定能解决问题吗?

在讲互斥锁的内容是,我故意遗留了一个知识:

怎么理解这个公式呢?

这个结论告诉我们,假如我们的串行率是 5%,那么我们无论采用什么技术,最高也就只能提高 20 倍的性能。

如何简单粗暴的理解串行百分比(其实都可以通过工具得出这个结果的)呢?来看个小 Tips:

Tips: 临界区都是串行的,非临界区都是并行的,用单线程执行临界区的时间/用单线程执行(临界区+非临界区)的时间就是串行百分比

现在你应该理解我在讲解 synchronized 关键字时所说的:

最小化临界区范围,因为临界区的大小往往就是瓶颈问题的所在,不要像乱用try catch那样一锅端

总结

多线程不一定就比但线程高效,比如大名鼎鼎的 Redis (后面会分析),因为它是基于内存操作,这种情况下,单线程可以很高效的利用CPU。而多线程的使用场景一般时存在相当比例的I/O或网络操作

另外,结合小学数学题,我们已经了解了如何从定性到定量的分析的过程,在开始没有任何数据之前,我们可以使用上文提到的经验值作为一个伪标准,其次就是结合实际来逐步的调优(综合 CPU,内存,硬盘读写速度,网络状况等)了

最后,盲目的增加 CPU 核数也不一定能解决我们的问题,这就要求我们严格的编写并发程序代码了

灵魂追问

  1. 我们已经知道创建多少个线程合适了,为什么还要搞一个线程池出来?
  2. 创建一个线程都要做哪些事情?为什么说频繁的创建线程开销很大?
  3. 多线程通常要注意共享变量问题,为什么局部变量就没有线程安全问题呢?
  4. ......

下一篇文章,我们就来说说,你熟悉又陌生的线程池问题

参考

感谢前辈们总结的精华,自己所写的并发系列好多都参考了以下资料

  • Java 并发编程实战
  • Java 并发编程之美
  • 码出高效
  • Java 并发编程的艺术
  • ......

日拱一兵 | 原创

查看原文

selfboot 赞了文章 · 4月8日

面试问我,创建多少个线程合适?我该怎么说

| 如果好看,请给个赞

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

为什么要使用多线程?

防止并发编程出错最好的办法就是不写并发程序

既然多线程编程容易出错,为什么它还经久不衰呢?

A:那还用说,肯定在某些方面有特长呗,比如你知道的【它很快,非常快】

我也很赞同这个答案,但说的不够具体

并发编程适用于什么场景?

如果问你选择多线程的原因就是一个【快】字,面试也就不会出那么多幺蛾子了。你有没有问过你自己

  1. 并发编程在所有场景下都是快的吗?
  2. 知道它很快,何为快?怎样度量?

想知道这两个问题的答案,我们需要一个从【定性】到【定量】的分析过程

使用多线程就是在正确的场景下通过设置正确个数的线程来最大化程序的运行速度(我感觉你还是啥也没说)

将这句话翻译到硬件级别就是要充分的利用 CPU 和 I/O 的利用率

两个正确得到保证,也就能达到最大化利用 CPU 和 I/O的目的了。最关键是,如何做到两个【正确】?

在聊具体场景的时候,我们必须要拿出我们的专业性来。送你两个名词 buff 加成

  • CPU 密集型程序
  • I/O 密集型程序

CPU 密集型程序

一个完整请求,I/O操作可以在很短时间内完成, CPU还有很多运算要处理,也就是说 CPU 计算的比例占很大一部分

假如我们要计算 1+2+....100亿 的总和,很明显,这就是一个 CPU 密集型程序

在【单核】CPU下,如果我们创建 4 个线程来分段计算,即:

  1. 线程1计算 [1,25亿)
  2. ...... 以此类推
  3. 线程4计算 [75亿,100亿]

我们来看下图他们会发生什么?

由于是单核 CPU,所有线程都在等待 CPU 时间片。按照理想情况来看,四个线程执行的时间总和与一个线程5独自完成是相等的,实际上我们还忽略了四个线程上下文切换的开销

所以,单核CPU处理CPU密集型程序,这种情况并不太适合使用多线程

此时如果在 4 核CPU下,同样创建四个线程来分段计算,看看会发生什么?

每个线程都有 CPU 来运行,并不会发生等待 CPU 时间片的情况,也没有线程切换的开销。理论情况来看效率提升了 4 倍

所以,如果是多核CPU 处理 CPU 密集型程序,我们完全可以最大化的利用 CPU 核心数,应用并发编程来提高效率

I/O密集型程序

与 CPU 密集型程序相对,一个完整请求,CPU运算操作完成之后还有很多 I/O 操作要做,也就是说 I/O 操作占比很大部分

我们都知道在进行 I/O 操作时,CPU是空闲状态,所以我们要最大化的利用 CPU,不能让其是空闲状态

同样在单核 CPU 的情况下:

从上图中可以看出,每个线程都执行了相同长度的 CPU 耗时和 I/O 耗时,如果你将上面的图多画几个周期,CPU操作耗时固定,将 I/O 操作耗时变为 CPU 耗时的 3 倍,你会发现,CPU又有空闲了,这时你就可以新建线程 4,来继续最大化的利用 CPU。

综上两种情况我们可以做出这样的总结:

线程等待时间所占比例越高,需要越多线程;线程CPU时间所占比例越高,需要越少线程。

到这里,相信你已经知道第一个【正确】使用多线程的场景了,那创建多少个线程是正确的呢?

创建多少个线程合适?

面试如果问到这个问题,这可是对你理论和实践的统考。想完全答对,你必须要【精通/精通/精通】小学算术

从上面知道,我们有 CPU 密集型和 I/O 密集型两个场景,不同的场景当然需要的线程数也就不一样了

CPU 密集型程序创建多少个线程合适?

有些同学早已经发现,对于 CPU 密集型来说,理论上 线程数量 = CPU 核数(逻辑) 就可以了,但是实际上,数量一般会设置为 CPU 核数(逻辑)+ 1, 为什么呢?

《Java并发编程实战》这么说:

计算密(CPU)集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作。

所以对于CPU密集型程序, CPU 核数(逻辑)+ 1 个线程数是比较好的经验值的原因了

I/O密集型程序创建多少个线程合适?

上面已经让大家按照图多画几个周期(你可以动手将I/O耗时与CPU耗时比例调大,比如6倍或7倍),这样你就会得到一个结论,对于 I/O 密集型程序:

最佳线程数 = (1/CPU利用率) = 1 + (I/O耗时/CPU耗时)

我这么体贴,当然担心有些同学不理解这个公式,我们将上图的比例手动带入到上面的公式中:

这是一个CPU核心的最佳线程数,如果多个核心,那么 I/O 密集型程序的最佳线程数就是:

最佳线程数 = CPU核心数(1/CPU利用率) = CPU核心数1 + (I/O耗时/CPU耗时)

说到这,有些同学可能有疑问了,要计算 I/O 密集型程序,是要知道 CPU 利用率的,如果我不知道这些,那要怎样给出一个初始值呢?

按照上面公式,假如几乎全是 I/O耗时,所以纯理论你就可以说是 2N(N=CPU核数),当然也有说 2N + 1的,(我猜这个 1 也是 backup),没有找到具体的推倒过程,在【并发编程实战-8.2章节】截图在此,大家有兴趣的可以自己看看

理论上来说,理论上来说,理论上来说,这样就能达到 CPU 100% 的利用率

如果理论都好用,那就用不着实践了,也就更不会有调优的事出现了。不过在初始阶段,我们确实可以按照这个理论之作为伪标准, 毕竟差也可能不会差太多,这样调优也会更好一些

谈完理论,咱们说点实际的,公式我看懂了(定性阶段结束),但是我有两个疑问:

  1. 我怎么知道具体的 I/O耗时和CPU耗时呢?
  2. 怎么查看CPU利用率?

没错,我们需要定量分析了

幸运的是,我们并不是第一个吃螃蟹的仔儿,其实有很多 APM (Application Performance Manager)工具可以帮我们得到准确的数据,学会使用这类工具,也就可以结合理论,在调优的过程得到更优的线程个数了。我这里简单列举几个,具体使用哪一个,具体应用还需要你自己去调研选择,受篇幅限制,暂不展开讨论了

  1. SkyWalking
  2. CAT
  3. zipkin

上面了解了基本的理论知识,那面试有可能问什么?又可能会以怎样的方式提问呢?

面试小问

小问一

假设要求一个系统的 TPS(Transaction Per Second 或者 Task Per Second)至少为20,然后假设每个Transaction由一个线程完成,继续假设平均每个线程处理一个Transaction的时间为4s

如何设计线程个数,使得可以在1s内处理完20个Transaction?

但是,但是,这是因为没有考虑到CPU数目。家里又没矿,一般服务器的CPU核数为16或者32,如果有80个线程,那么肯定会带来太多不必要的线程上下文切换开销(希望这句话你可以主动说出来),这就需要调优了,来做到最佳 balance

小问二

计算操作需要5ms,DB操作需要 100ms,对于一台 8个CPU的服务器,怎么设置线程数呢?

如果不知道请拿三年级期末考试题重新做(今天晚自习留下来),答案是:

线程数 = 8 * (1 + 100/5) = 168 (个)

那如果DB的 QPS(Query Per Second)上限是1000,此时这个线程数又该设置为多大呢?

同样,这是没有考虑 CPU 数目,接下来就又是细节调优的阶段了

因为一次请求不仅仅包括 CPU 和 I/O操作,具体的调优过程还要考虑内存资源,网络等具体内容

增加 CPU 核数一定能解决问题吗?

看到这,有些同学可能会认为,即便我算出了理论线程数,但实际CPU核数不够,会带来线程上下文切换的开销,所以下一步就需要增加 CPU 核数,那我们盲目的增加 CPU 核数就一定能解决问题吗?

在讲互斥锁的内容是,我故意遗留了一个知识:

怎么理解这个公式呢?

这个结论告诉我们,假如我们的串行率是 5%,那么我们无论采用什么技术,最高也就只能提高 20 倍的性能。

如何简单粗暴的理解串行百分比(其实都可以通过工具得出这个结果的)呢?来看个小 Tips:

Tips: 临界区都是串行的,非临界区都是并行的,用单线程执行临界区的时间/用单线程执行(临界区+非临界区)的时间就是串行百分比

现在你应该理解我在讲解 synchronized 关键字时所说的:

最小化临界区范围,因为临界区的大小往往就是瓶颈问题的所在,不要像乱用try catch那样一锅端

总结

多线程不一定就比但线程高效,比如大名鼎鼎的 Redis (后面会分析),因为它是基于内存操作,这种情况下,单线程可以很高效的利用CPU。而多线程的使用场景一般时存在相当比例的I/O或网络操作

另外,结合小学数学题,我们已经了解了如何从定性到定量的分析的过程,在开始没有任何数据之前,我们可以使用上文提到的经验值作为一个伪标准,其次就是结合实际来逐步的调优(综合 CPU,内存,硬盘读写速度,网络状况等)了

最后,盲目的增加 CPU 核数也不一定能解决我们的问题,这就要求我们严格的编写并发程序代码了

灵魂追问

  1. 我们已经知道创建多少个线程合适了,为什么还要搞一个线程池出来?
  2. 创建一个线程都要做哪些事情?为什么说频繁的创建线程开销很大?
  3. 多线程通常要注意共享变量问题,为什么局部变量就没有线程安全问题呢?
  4. ......

下一篇文章,我们就来说说,你熟悉又陌生的线程池问题

参考

感谢前辈们总结的精华,自己所写的并发系列好多都参考了以下资料

  • Java 并发编程实战
  • Java 并发编程之美
  • 码出高效
  • Java 并发编程的艺术
  • ......

日拱一兵 | 原创

查看原文

赞 88 收藏 59 评论 14

selfboot 关注了用户 · 4月8日

日拱一兵 @tanrigongyibing

欢迎关注,公众号「日拱一兵」,以读侦探小说思维趣味轻松学习Java技术

送你《1000G 免费精选技术学习资料》(2020 年最新)
https://mp.weixin.qq.com/s/9p...

关注 19388

selfboot 赞了文章 · 3月23日

手写一个词法分析器

前言

最近大部分时间都在撸 Python,其中也会涉及到将数据库表转换为 PythonORM 框架的 Model,但我们并没有找到一个合适的工具来做这个意义不大的”体力活“,所以每次新建表后大家都是根据自己的表结构手写一遍 Model

一两张表还好,一旦 10 几张表都要写一遍时那痛苦只有自己知道;这时程序员的 slogan 再次印证:一切毫无意义的体力劳动终将被计算机取代。

intellij plugin

既然没有现成的工具那就自己写一个吧,演示效果如下:
1-min.gif

考虑到我们主要是用 PyCharm 开发,正好 jetbrains 也提供了 SDK 用于开发插件,所以 UI 层面可以不用额外考虑了。

使用流程很简单,只需要导入 DDL 语句就可以生成 Python 所需要的 Model 代码。

例如导入以下 DDL:

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `userName` varchar(20) DEFAULT NULL COMMENT '用户名',
  `password` varchar(100) DEFAULT NULL COMMENT '密码',
  `roleId` int(11) DEFAULT NULL COMMENT '角色ID',
  PRIMARY KEY (`id`),  
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

便会生成对应的 Python 代码:

class User(db.Model):
    __tablename__ = 'user'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    userName = db.Column(db.String)  # 用户名
    password = db.Column(db.String)  # 密码
    roleId = db.Column(db.Integer)  # 角色ID

词法解析

仔细对比源文件及目标代码会很容易找出规律,无非就是解析出表名、字段、及字段的属性(是否为主键、类型、长度),最后再转换为 Python 所需要的模板即可。

在我动手之前我认为是非常简单的,无非就是解析字符串,但实际上手后发现不是那么回事;主要是有以下几个问题:

  1. 如何识别出表名称?
  2. 同样的如何识别出字段名称,同时还得关联上该字段的类型、长度、注释。
  3. 如何识别出主键?

总结一句话,如何通过一系列规则识别出一段字符串中的关键信息,这同样也是 MySQL Server 所做的事情。

在开始真正解析 DDL 之前,先来看下一段简单的脚本如何解析:

x = 20

按照我们平时开发的经验,这条语句分为以下几部分:

  • x 表示变量
  • = 表示赋值符号
  • 20 表示赋值结果

所以我们对这段脚本的解析结果应当为:

VAR      x
GE         =
VAL      100

这个解析过程在编译原理中称为”词法解析“,可能大家听到编译原理这几个字就头大(我也是);对于刚才那段脚本我们可以编写一个非常简单的词法解析器生成这样的结果。

状态迁移

再开始之前先捋一下思路,可以看到上文的结果中通过 VAR 表示变量、GE 表示赋值符号 ”=“、VAL 表示赋值结果,现在需要重点记住这三个状态。

在依次读取字符解析时,程序就是在这几个状态中来回切换,如下图:

  1. 默认为初始状态。
  2. 当字符为字母时进入 VAR 状态。
  3. 当字符为 ”=“ 符号时进入 GE 状态。

同理,当不满足这几个状态时候又会回到初始从而再次确认新的状态。

光看图有点抽象,直接来看核心代码:

    public class Result{
        public TokenType tokenType ;
        public StringBuilder text = new StringBuilder();
    }

首先定义了一个结果类,收集最终的解析结果;其中的 TokenType 就对应了图中的三种状态,简单的用枚举值来表示。

public enum TokenType {
    INIT,
    VAR,
    GE,
    VAL
}

首先对应到第一张图:初始化状态。

需要对当前解析的字符定义一个 TokenType

和图中描述的流程一致,判断当前字符给定一个状态即可。

接着对应到第二张图:状态之间的转换。

会根据不同的状态进入不同的 case,在不同的 case 中判断是否应当跳转到其他状态(进入 INIT 状态后会重新生成状态)。

举个例子: x = 20:

首选会进入 VAR 状态,接着下一个字符为空格,自然在 38 行中重新进入初始状态,导致再次确定下一个字符 = 进入 GE 状态。

当脚本为 ab = 30:
第一个字符为 a 也是进入 VAR 状态,第二个字符为 b,依然为字母,所以进入 36 行,状态不会改变,同时将 b 这个字符追加进来;后续步骤就和上一个例子一致了。

多说无益,建议大家自己跑一下单测就会明白:
https://github.com/crossoverJie/sqlalchemy-transfer/blob/master/src/test/java/top/crossoverjie/plugin/core/lab/TestLexerTest.java

DDL 解析

简单的解析完成后来看看 DDL 这样的脚本应当如何解析:

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `userName` varchar(20) DEFAULT NULL COMMENT '用户名',
  `password` varchar(100) DEFAULT NULL COMMENT '密码',
  `roleId` int(11) DEFAULT NULL COMMENT '角色ID',
  PRIMARY KEY (`id`),  
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

原理类似,首先还是要看出规律(也就是语法):

  • 表名是第一行语句,同时以 CREATE TABLE 开头。
  • 每一个字段的信息(名称、类型、长度、备注)都是以 "`" 符号开头 "," 结尾。
  • 主键是以 PRIMART 字符串开头的字段,以 ) 结尾。

根据我们需要解析的数据种类,我这里定义了这个枚举:

然后在初始化类型时进行判断赋值:

由于需要解析的数据不少,所以这里的判断条件自然也就多了。

递归解析

针对于 DDL 的语法规则,我们这里还有需要有特殊处理的地方;比如解析具体字段信息时如何关联起来?

举个例子:

`userName` varchar(20) DEFAULT NULL COMMENT '用户名',
`password` varchar(100) DEFAULT NULL COMMENT '密码',

这里我们解析出来的数据得有一个映射关系:

所以我们只能一个字段的全部信息解析完成并且关联好之后才能解析下一个字段。

于是这里我采用了递归的方式进行解析(不一定是最好的,欢迎大家提出更优的方案)。

} else if (value == '`' && pStatus == Status.BASE_INIT) {
    result.tokenType = DDLTokenType.FI;
    result.text.append(value);
} 

当当前字符为 ”`“ 符号时,将状态置为 "FI"(FieldInfo),同时当解析到为 "," 符号时便进入递归处理。

可以理解为将这一段字符串单独提取出来处理:

`userName` varchar(20) DEFAULT NULL COMMENT '用户名',

接着再将这段字符递归调用当前方法再次进行解析,这时便按照字段名称、类型、长度、注释的规则解析即可。

同时既然存在递归,还需要将子递归的数据关联起来,所以我在返回结果中新增了一个 pid 的字段,这个也容易理解。

默认值为 0,一旦递归后便自增 +1,保证每次递归的数据都是唯一的。

用同样的方法在解析主键时也是先将整个字符串提取出来:

PRIMARY KEY (`id`)

只不过是 "P" 打头 ")" 结尾。

} else if (value == 'P' && pStatus == Status.BASE_INIT) {
    result.tokenType = DDLTokenType.P_K;
    result.text.append(value);
} 

也是将整段字符串递归解析,再递归的过程中进行状态切换 P_K ---> P_K_V 最终获取到主键。


所以通过对刚才那段 DDL 解析得到的结果如下:

这样每个字段也通过了 pid 进行了区分关联。

所以现在只需要对这个词法解析器进行封装,便可以提供一个简单的 API 来获取表中的数据了。

总结

到此整个词法解析器的全部内容都已经完成了,虽然实现的是一个小功能,但我自己花的时间可不少,其中光复习编译原理就让人头疼。

但这还只是整个编译语言知识点的冰山一角,后续还有语法、语义、中间、目标代码等一系列内容,都是一个比一个难啃。

其实我相信大多数人和我想法一样,这个东西太底层而且枯燥,真正从事这方面工作的也都是凤毛麟角,所以花这时间干啥呢?

所以我也决定这个弄完后就弃坑啦。


哈哈,开个玩笑,或许有生之年自己也能实现一门编程语言,当老了和儿子吹牛时也能有点资本。

大家看完记得点赞分享一键三连哦

查看原文

赞 15 收藏 10 评论 0

selfboot 收藏了文章 · 3月14日

一篇搞懂TCP、HTTP、Socket、Socket连接池

前言

​ 作为一名开发人员我们经常会听到HTTP协议、TCP/IP协议、UDP协议、Socket、Socket长连接、Socket连接池等字眼,然而它们之间的关系、区别及原理并不是所有人都能理解清楚,这篇文章就从网络协议基础开始到Socket连接池,一步一步解释他们之间的关系。

七层网络模型

​ 首先从网络通信的分层模型讲起:七层模型,亦称OSI(Open System Interconnection)模型。自下往上分为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。所有有关通信的都离不开它,下面这张图片介绍了各层所对应的一些协议和硬件

图片描述

通过上图,我知道IP协议对应于网络层,TCP、UDP协议对应于传输层,而HTTP协议对应于应用层,OSI并没有Socket,那什么是Socket,后面我们将结合代码具体详细介绍。

TCP和UDP连接

​ 关于传输层TCP、UDP协议可能我们平时遇见的会比较多,有人说TCP是安全的,UDP是不安全的,UDP传输比TCP快,那为什么呢,我们先从TCP的连接建立的过程开始分析,然后解释UDP和TCP的区别。

TCP的三次握手和四次分手

​ 我们知道TCP建立连接需要经过三次握手,而断开连接需要经过四次分手,那三次握手和四次分手分别做了什么和如何进行的。

图片描述

第一次握手:建立连接。客户端发送连接请求报文段,将SYN位置为1,Sequence Number为x;然后,客户端进入SYN_SEND状态,等待服务器的确认;
第二次握手:服务器收到客户端的SYN报文段,需要对这个SYN报文段进行确认,设置Acknowledgment Number为x+1(Sequence Number+1);同时,自己自己还要发送SYN请求信息,将SYN位置为1,Sequence Number为y;服务器端将上述所有信息放到一个报文段(即SYN+ACK报文段)中,一并发送给客户端,此时服务器进入SYN_RECV状态;
第三次握手:客户端收到服务器的SYN+ACK报文段。然后将Acknowledgment Number设置为y+1,向服务器发送ACK报文段,这个报文段发送完毕以后,客户端和服务器端都进入ESTABLISHED状态,完成TCP三次握手。

完成了三次握手,客户端和服务器端就可以开始传送数据。以上就是TCP三次握手的总体介绍。通信结束客户端和服务端就断开连接,需要经过四次分手确认。

第一次分手:主机1(可以使客户端,也可以是服务器端),设置Sequence Number和Acknowledgment Number,向主机2发送一个FIN报文段;此时,主机1进入FIN_WAIT_1状态;这表示主机1没有数据要发送给主机2了;
第二次分手:主机2收到了主机1发送的FIN报文段,向主机1回一个ACK报文段,Acknowledgment Number为Sequence Number加1;主机1进入FIN_WAIT_2状态;主机2告诉主机1,我“同意”你的关闭请求;
第三次分手:主机2向主机1发送FIN报文段,请求关闭连接,同时主机2进入LAST_ACK状态;
第四次分手:主机1收到主机2发送的FIN报文段,向主机2发送ACK报文段,然后主机1进入TIME_WAIT状态;主机2收到主机1的ACK报文段以后,就关闭连接;此时,主机1等待2MSL后依然没有收到回复,则证明Server端已正常关闭,那好,主机1也可以关闭连接了。

可以看到一次tcp请求的建立及关闭至少进行7次通信,这还不包过数据的通信,而UDP不需3次握手和4次分手。

TCP和UDP的区别

 1、TCP是面向链接的,虽然说网络的不安全不稳定特性决定了多少次握手都不能保证连接的可靠性,但TCP的三次握手在最低限度上(实际上也很大程度上保证了)保证了连接的可靠性;而UDP不是面向连接的,UDP传送数据前并不与对方建立连接,对接收到的数据也不发送确认信号,发送端不知道数据是否会正确接收,当然也不用重发,所以说UDP是无连接的、不可靠的一种数据传输协议。 
 2、也正由于1所说的特点,使得UDP的开销更小数据传输速率更高,因为不必进行收发数据的确认,所以UDP的实时性更好。知道了TCP和UDP的区别,就不难理解为何采用TCP传输协议的MSN比采用UDP的QQ传输文件慢了,但并不能说QQ的通信是不安全的,因为程序员可以手动对UDP的数据收发进行验证,比如发送方对每个数据包进行编号然后由接收方进行验证啊什么的,即使是这样,UDP因为在底层协议的封装上没有采用类似TCP的“三次握手”而实现了TCP所无法达到的传输效率。

问题

关于传输层我们会经常听到一些问题

1.TCP服务器最大并发连接数是多少?

关于TCP服务器最大并发连接数有一种误解就是“因为端口号上限为65535,所以TCP服务器理论上的可承载的最大并发连接数也是65535”。首先需要理解一条TCP连接的组成部分:客户端IP、客户端端口、服务端IP、服务端端口。所以对于TCP服务端进程来说,他可以同时连接的客户端数量并不受限于可用端口号,理论上一个服务器的一个端口能建立的连接数是全球的IP数*每台机器的端口数。实际并发连接数受限于linux可打开文件数,这个数是可以配置的,可以非常大,所以实际上受限于系统性能。通过#ulimit -n 查看服务的最大文件句柄数,通过ulimit -n xxx 修改 xxx是你想要能打开的数量。也可以通过修改系统参数:

#vi /etc/security/limits.conf
*  soft  nofile  65536
*  hard  nofile  65536

2.为什么TIME_WAIT状态还需要等2MSL后才能返回到CLOSED状态?

这是因为虽然双方都同意关闭连接了,而且握手的4个报文也都协调和发送完毕,按理可以直接回到CLOSED状态(就好比从SYN_SEND状态到ESTABLISH状态那样);但是因为我们必须要假想网络是不可靠的,你无法保证你最后发送的ACK报文会一定被对方收到,因此对方处于LAST_ACK状态下的Socket可能会因为超时未收到ACK报文,而重发FIN报文,所以这个TIME_WAIT状态的作用就是用来重发可能丢失的ACK报文。

3.TIME_WAIT状态还需要等2MSL后才能返回到CLOSED状态会产生什么问题

通信双方建立TCP连接后,主动关闭连接的一方就会进入TIME_WAIT状态,TIME_WAIT状态维持时间是两个MSL时间长度,也就是在1-4分钟,Windows操作系统就是4分钟。进入TIME_WAIT状态的一般情况下是客户端,一个TIME_WAIT状态的连接就占用了一个本地端口。一台机器上端口号数量的上限是65536个,如果在同一台机器上进行压力测试模拟上万的客户请求,并且循环与服务端进行短连接通信,那么这台机器将产生4000个左右的TIME_WAIT Socket,后续的短连接就会产生address already in use : connect的异常,如果使用Nginx作为方向代理也需要考虑TIME_WAIT状态,发现系统存在大量TIME_WAIT状态的连接,通过调整内核参数解决。

vi /etc/sysctl.conf

编辑文件,加入以下内容:

net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30

然后执行 /sbin/sysctl -p 让参数生效。

net.ipv4.tcp_syncookies = 1 表示开启SYN Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭;
net.ipv4.tcp_tw_reuse = 1 表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭;
net.ipv4.tcp_tw_recycle = 1 表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭。
net.ipv4.tcp_fin_timeout 修改系統默认的TIMEOUT时间

HTTP协议

关于TCP/IP和HTTP协议的关系,网络有一段比较容易理解的介绍:“我们在传输数据时,可以只使用(传输层)TCP/IP协议,但是那样的话,如果没有应用层,便无法识别数据内容。如果想要使传输的数据有意义,则必须使用到应用层协议。应用层协议有很多,比如HTTP、FTP、TELNET等,也可以自己定义应用层协议。
HTTP协议即超文本传送协议(Hypertext Transfer Protocol ),是Web联网的基础,也是手机联网常用的协议之一,WEB使用HTTP协议作应用层协议,以封装HTTP文本信息,然后使用TCP/IP做传输层协议将它发到网络上。
由于HTTP在每次请求结束后都会主动释放连接,因此HTTP连接是一种“短连接”,要保持客户端程序的在线状态,需要不断地向服务器发起连接请求。通常 的做法是即时不需要获得任何数据,客户端也保持每隔一段固定的时间向服务器发送一次“保持连接”的请求,服务器在收到该请求后对客户端进行回复,表明知道 客户端“在线”。若服务器长时间无法收到客户端的请求,则认为客户端“下线”,若客户端长时间无法收到服务器的回复,则认为网络已经断开。
下面是一个简单的HTTP Post application/json数据内容的请求:

POST  HTTP/1.1
Host: 127.0.0.1:9017
Content-Type: application/json
Cache-Control: no-cache

{"a":"a"}

关于Socket(套接字)

现在我们了解到TCP/IP只是一个协议栈,就像操作系统的运行机制一样,必须要具体实现,同时还要提供对外的操作接口。就像操作系统会提供标准的编程接口,比如Win32编程接口一样,TCP/IP也必须对外提供编程接口,这就是Socket。现在我们知道,Socket跟TCP/IP并没有必然的联系。Socket编程接口在设计的时候,就希望也能适应其他的网络协议。所以,Socket的出现只是可以更方便的使用TCP/IP协议栈而已,其对TCP/IP进行了抽象,形成了几个最基本的函数接口。比如create,listen,accept,connect,read和write等等。
不同语言都有对应的建立Socket服务端和客户端的库,下面举例Nodejs如何创建服务端和客户端:
服务端:

const net = require('net');
const server = net.createServer();
server.on('connection', (client) => {
  client.write('Hi!\n'); // 服务端向客户端输出信息,使用 write() 方法
  client.write('Bye!\n');
  //client.end(); // 服务端结束该次会话
});
server.listen(9000);

服务监听9000端口
下面使用命令行发送http请求和telnet

$ curl http://127.0.0.1:9000
Bye!

$telnet 127.0.0.1 9000
Trying 192.168.1.21...
Connected to 192.168.1.21.
Escape character is '^]'.
Hi!
Bye!
Connection closed by foreign host.

注意到curl只处理了一次报文。
客户端

const client = new net.Socket();
client.connect(9000, '127.0.0.1', function () {
});
client.on('data', (chunk) => {
  console.log('data', chunk.toString())
  //data Hi!
  //Bye!
});

Socket长连接

所谓长连接,指在一个TCP连接上可以连续发送多个数据包,在TCP连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接(心跳包),一般需要自己做在线维持。 短连接是指通信双方有数据交互时,就建立一个TCP连接,数据发送完成后,则断开此TCP连接。比如Http的,只是连接、请求、关闭,过程时间较短,服务器若是一段时间内没有收到请求即可关闭连接。其实长连接是相对于通常的短连接而说的,也就是长时间保持客户端与服务端的连接状态。
通常的短连接操作步骤是:
连接→数据传输→关闭连接;

而长连接通常就是:
连接→数据传输→保持连接(心跳)→数据传输→保持连接(心跳)→……→关闭连接;

什么时候用长连接,短连接?
长连接多用于操作频繁,点对点的通讯,而且连接数不能太多情况,。每个TCP连接都需要三步握手,这需要时间,如果每个操作都是先连接,再操作的话那么处理 速度会降低很多,所以每个操作完后都不断开,次处理时直接发送数据包就OK了,不用建立TCP连接。例如:数据库的连接用长连接, 如果用短连接频繁的通信会造成Socket错误,而且频繁的Socket创建也是对资源的浪费。

什么是心跳包为什么需要:
心跳包就是在客户端和服务端间定时通知对方自己状态的一个自己定义的命令字,按照一定的时间间隔发送,类似于心跳,所以叫做心跳包。网络中的接收和发送数据都是使用Socket进行实现。但是如果此套接字已经断开(比如一方断网了),那发送数据和接收数据的时候就一定会有问题。可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制。其实TCP中已经为我们实现了一个叫做心跳的机制。如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议。也可以自己定义,所谓“心跳”就是定时发送一个自定义的结构体(心跳包或心跳帧),让对方知道自己“在线”,以确保链接的有效性。
实现:
服务端:

const net = require('net');

let clientList = [];
const heartbeat = 'HEARTBEAT'; // 定义心跳包内容确保和平时发送的数据不会冲突

const server = net.createServer();
server.on('connection', (client) => {
  console.log('客户端建立连接:', client.remoteAddress + ':' + client.remotePort);
  clientList.push(client);
  client.on('data', (chunk) => {
    let content = chunk.toString();
    if (content === heartbeat) {
      console.log('收到客户端发过来的一个心跳包');
    } else {
      console.log('收到客户端发过来的数据:', content);
      client.write('服务端的数据:' + content);
    }
  });
  client.on('end', () => {
    console.log('收到客户端end');
    clientList.splice(clientList.indexOf(client), 1);
  });
  client.on('error', () => {
    clientList.splice(clientList.indexOf(client), 1);
  })
});
server.listen(9000);
setInterval(broadcast, 10000); // 定时发送心跳包
function broadcast() {
  console.log('broadcast heartbeat', clientList.length);
  let cleanup = []
  for (let i=0;i<clientList.length;i+=1) {
    if (clientList[i].writable) { // 先检查 sockets 是否可写
      clientList[i].write(heartbeat);
    } else {
      console.log('一个无效的客户端');
      cleanup.push(clientList[i]); // 如果不可写,收集起来销毁。销毁之前要 Socket.destroy() 用 API 的方法销毁。
      clientList[i].destroy();
    }
  }
  //Remove dead Nodes out of write loop to avoid trashing loop index
  for (let i=0; i<cleanup.length; i+=1) {
    console.log('删除无效的客户端:', cleanup[i].name);
    clientList.splice(clientList.indexOf(cleanup[i]), 1);
  }
}

服务端输出结果:

客户端建立连接: ::ffff:127.0.0.1:57125
broadcast heartbeat 1
收到客户端发过来的数据: Thu, 29 Mar 2018 03:45:15 GMT
收到客户端发过来的一个心跳包
收到客户端发过来的数据: Thu, 29 Mar 2018 03:45:20 GMT
broadcast heartbeat 1
收到客户端发过来的数据: Thu, 29 Mar 2018 03:45:25 GMT
收到客户端发过来的一个心跳包
客户端建立连接: ::ffff:127.0.0.1:57129
收到客户端发过来的一个心跳包
收到客户端发过来的数据: Thu, 29 Mar 2018 03:46:00 GMT
收到客户端发过来的数据: Thu, 29 Mar 2018 03:46:04 GMT
broadcast heartbeat 2
收到客户端发过来的数据: Thu, 29 Mar 2018 03:46:05 GMT
收到客户端发过来的一个心跳包

客户端代码:

const net = require('net');

const heartbeat = 'HEARTBEAT'; 
const client = new net.Socket();
client.connect(9000, '127.0.0.1', () => {});
client.on('data', (chunk) => {
  let content = chunk.toString();
  if (content === heartbeat) {
    console.log('收到心跳包:', content);
  } else {
    console.log('收到数据:', content);
  }
});

// 定时发送数据
setInterval(() => {
  console.log('发送数据', new Date().toUTCString());
  client.write(new Date().toUTCString());
}, 5000);

// 定时发送心跳包
setInterval(function () {
  client.write(heartbeat);
}, 10000);

客户端输出结果:

发送数据 Thu, 29 Mar 2018 03:46:04 GMT
收到数据: 服务端的数据:Thu, 29 Mar 2018 03:46:04 GMT
收到心跳包: HEARTBEAT
发送数据 Thu, 29 Mar 2018 03:46:09 GMT
收到数据: 服务端的数据:Thu, 29 Mar 2018 03:46:09 GMT
发送数据 Thu, 29 Mar 2018 03:46:14 GMT
收到数据: 服务端的数据:Thu, 29 Mar 2018 03:46:14 GMT
收到心跳包: HEARTBEAT
发送数据 Thu, 29 Mar 2018 03:46:19 GMT
收到数据: 服务端的数据:Thu, 29 Mar 2018 03:46:19 GMT
发送数据 Thu, 29 Mar 2018 03:46:24 GMT
收到数据: 服务端的数据:Thu, 29 Mar 2018 03:46:24 GMT
收到心跳包: HEARTBEAT

定义自己的协议

如果想要使传输的数据有意义,则必须使用到应用层协议比如Http、Mqtt、Dubbo等。基于TCP协议上自定义自己的应用层的协议需要解决的几个问题:

  1. 心跳包格式的定义及处理
  2. 报文头的定义,就是你发送数据的时候需要先发送报文头,报文里面能解析出你将要发送的数据长度
  3. 你发送数据包的格式,是json的还是其他序列化的方式

下面我们就一起来定义自己的协议,并编写服务的和客户端进行调用:
定义报文头格式: length:000000000xxxx; xxxx代表数据的长度,总长度20,举例子不严谨。
数据表的格式: Json
服务端:

const net = require('net');
const server = net.createServer();
let clientList = [];
const heartBeat = 'HeartBeat'; // 定义心跳包内容确保和平时发送的数据不会冲突
const getHeader = (num) => {
  return 'length:' + (Array(13).join(0) + num).slice(-13);
}
server.on('connection', (client) => {
  client.name = client.remoteAddress + ':' + client.remotePort
  // client.write('Hi ' + client.name + '!\n');
  console.log('客户端建立连接', client.name);

  clientList.push(client)
  let chunks = [];
  let length = 0;
  client.on('data', (chunk) => {
    let content = chunk.toString();
    console.log("content:", content, content.length);
    if (content === heartBeat) {
      console.log('收到客户端发过来的一个心跳包');
    } else {
      if (content.indexOf('length:') === 0){
        length = parseInt(content.substring(7,20));
        console.log('length', length);
        chunks =[chunk.slice(20, chunk.length)];
      } else {
        chunks.push(chunk);
      }
      let heap = Buffer.concat(chunks);
      console.log('heap.length', heap.length)
      if (heap.length >= length) {
        try {
          console.log('收到数据', JSON.parse(heap.toString()));
          let data = '服务端的数据数据:' + heap.toString();;
          let dataBuff =  Buffer.from(JSON.stringify(data));
          let header = getHeader(dataBuff.length)
          client.write(header);
          client.write(dataBuff);
        } catch (err) {
          console.log('数据解析失败');
        }
      }
    }
  })

  client.on('end', () => {
    console.log('收到客户端end');
    clientList.splice(clientList.indexOf(client), 1);
  });
  client.on('error', () => {
    clientList.splice(clientList.indexOf(client), 1);
  })
});
server.listen(9000);
setInterval(broadcast, 10000); // 定时检查客户端 并发送心跳包
function broadcast() {
  console.log('broadcast heartbeat', clientList.length);
  let cleanup = []
  for(var i=0;i<clientList.length;i+=1) {
    if(clientList[i].writable) { // 先检查 sockets 是否可写
      // clientList[i].write(heartBeat); // 发送心跳数据
    } else {
      console.log('一个无效的客户端')
      cleanup.push(clientList[i]) // 如果不可写,收集起来销毁。销毁之前要 Socket.destroy() 用 API 的方法销毁。
      clientList[i].destroy();
    }
  }
  // 删除无效的客户端
  for(i=0; i<cleanup.length; i+=1) {
    console.log('删除无效的客户端:', cleanup[i].name);
    clientList.splice(clientList.indexOf(cleanup[i]), 1)
  }
}

日志打印:

 客户端建立连接 ::ffff:127.0.0.1:50178
 content: length:0000000000031 20
 length 31
 heap.length 0
 content: "Tue, 03 Apr 2018 06:12:37 GMT" 31
 heap.length 31
 收到数据 Tue, 03 Apr 2018 06:12:37 GMT
 broadcast heartbeat 1
 content: HeartBeat 9
 收到客户端发过来的一个心跳包
 content: length:0000000000031"Tue, 03 Apr 2018 06:12:42 GMT" 51
 length 31
 heap.length 31
 收到数据 Tue, 03 Apr 2018 06:12:42 GMT

客户端

const net = require('net');
const client = new net.Socket();
const heartBeat = 'HeartBeat'; // 定义心跳包内容确保和平时发送的数据不会冲突
const getHeader = (num) => {
  return 'length:' + (Array(13).join(0) + num).slice(-13);
}
client.connect(9000, '127.0.0.1', function () {});
let chunks = [];
let length = 0;
client.on('data', (chunk) => {
  let content = chunk.toString();
  console.log("content:", content, content.length);
  if (content === heartBeat) {
    console.log('收到服务端发过来的一个心跳包');
  } else {
    if (content.indexOf('length:') === 0){
      length = parseInt(content.substring(7,20));
      console.log('length', length);
      chunks =[chunk.slice(20, chunk.length)];
    } else {
      chunks.push(chunk);
    }
    let heap = Buffer.concat(chunks);
    console.log('heap.length', heap.length)
    if (heap.length >= length) {
      try {
        console.log('收到数据', JSON.parse(heap.toString()));
      } catch (err) {
        console.log('数据解析失败');
      }
    }
  }
});
// 定时发送数据
setInterval(function () {
  let data = new Date().toUTCString();
  let dataBuff =  Buffer.from(JSON.stringify(data));
  let header =getHeader(dataBuff.length);
  client.write(header);
  client.write(dataBuff);
}, 5000);
// 定时发送心跳包
setInterval(function () {
  client.write(heartBeat);
}, 10000);

日志打印:

 content: length:0000000000060 20
 length 60
 heap.length 0
 content: "服务端的数据数据:\"Tue, 03 Apr 2018 06:12:37 GMT\"" 44
 heap.length 60
 收到数据 服务端的数据数据:"Tue, 03 Apr 2018 06:12:37 GMT"
 content: length:0000000000060"服务端的数据数据:\"Tue, 03 Apr 2018 06:12:42 GMT\"" 64
 length 60
 heap.length 60
 收到数据 服务端的数据数据:"Tue, 03 Apr 2018 06:12:42 GMT"

客户端定时发送自定义协议数据到服务端,先发送头数据,在发送内容数据,另外一个定时器发送心跳数据,服务端判断是心跳数据,再判断是不是头数据,再是内容数据,然后解析后再发送数据给客户端。从日志的打印可以看出客户端先后writeheaderdata数据,服务端可能在一个data事件里面接收到。
这里可以看到一个客户端在同一个时间内处理一个请求可以很好的工作,但是想象这么一个场景,如果同一时间内让同一个客户端去多次调用服务端请求,发送多次头数据和内容数据,服务端的data事件收到的数据就很难区别哪些数据是哪次请求的,比如两次头数据同时到达服务端,服务端就会忽略其中一次,而后面的内容数据也不一定就对应于这个头的。所以想复用长连接并能很好的高并发处理服务端请求,就需要连接池这种方式了。

Socket连接池

什么是Socket连接池,池的概念可以联想到是一种资源的集合,所以Socket连接池,就是维护着一定数量Socket长连接的集合。它能自动检测Socket长连接的有效性,剔除无效的连接,补充连接池的长连接的数量。从代码层次上其实是人为实现这种功能的类,一般一个连接池包含下面几个属性:

  1. 空闲可使用的长连接队列
  2. 正在运行的通信的长连接队列
  3. 等待去获取一个空闲长连接的请求的队列
  4. 无效长连接的剔除功能
  5. 长连接资源池的数量配置
  6. 长连接资源的新建功能

场景: 一个请求过来,首先去资源池要求获取一个长连接资源,如果空闲队列里面有长连接,就获取到这个长连接Socket,并把这个Socket移到正在运行的长连接队列。如果空闲队列里面没有,且正在运行的队列长度小于配置的连接池资源的数量,就新建一个长连接到正在运行的队列去,如果正在运行的不下于配置的资源池长度,则这个请求进入到等待队列去。当一个正在运行的Socket完成了请求,就从正在运行的队列移到空闲的队列,并触发等待请求队列去获取空闲资源,如果有等待的情况。

这里简单介绍Nodejs的Socket连接池generic-pool模块的源码。
主要文件目录结构

.
|————lib  ------------------------- 代码库
| |————DefaultEvictor.js ---------- 
| |————Deferred.js ---------------- 
| |————Deque.js ------------------- 
| |————DequeIterator.js ----------- 
| |————DoublyLinkedList.js -------- 
| |————DoublyLinkedListIterator.js- 
| |————factoryValidator.js -------- 
| |————Pool.js -------------------- 连接池主要代码
| |————PoolDefaults.js ------------ 
| |————PooledResource.js ---------- 
| |————Queue.js ------------------- 队列
| |————ResourceLoan.js ------------ 
| |————ResourceRequest.js --------- 
| |————utils.js ------------------- 工具
|————test ------------------------- 测试目录
|————README.md  ------------------- 项目描述文件
|————.eslintrc  ------------------- eslint静态检查配置文件
|————.eslintignore  --------------- eslint静态检查忽略的文件
|————package.json ----------------- npm包依赖配置

下面介绍库的使用:

初始化连接池

'use strict';
const net = require('net');
const genericPool = require('generic-pool');

function createPool(conifg) {
  let options = Object.assign({
    fifo: true,                             // 是否优先使用老的资源
    priorityRange: 1,                       // 优先级
    testOnBorrow: true,                     // 是否开启获取验证
    // acquireTimeoutMillis: 10 * 1000,     // 获取的超时时间
    autostart: true,                        // 自动初始化和释放调度启用
    min: 10,                                // 初始化连接池保持的长连接最小数量
    max: 0,                                 // 最大连接池保持的长连接数量
    evictionRunIntervalMillis: 0,           // 资源释放检验间隔检查 设置了下面几个参数才起效果
    numTestsPerEvictionRun: 3,              // 每次释放资源数量
    softIdleTimeoutMillis: -1,              // 可用的超过了最小的min 且空闲时间时间 达到释放
    idleTimeoutMillis: 30000                // 强制释放
    // maxWaitingClients: 50                // 最大等待
  }, conifg.options);
  const factory = {

    create: function () {
      return new Promise((resolve, reject) => {
        let socket = new net.Socket();
        socket.setKeepAlive(true);
        socket.connect(conifg.port, conifg.host);
        // TODO 心跳包的处理逻辑
        socket.on('connect', () => {
          console.log('socket_pool', conifg.host, conifg.port, 'connect' );
          resolve(socket);
        });
        socket.on('close', (err) => { // 先end 事件再close事件
          console.log('socket_pool', conifg.host, conifg.port, 'close', err);
        });
        socket.on('error', (err) => {
          console.log('socket_pool', conifg.host, conifg.port, 'error', err);
          reject(err);
        });
      });
    },
    //销毁连接
    destroy: function (socket) {
      return new Promise((resolve) => {
        socket.destroy(); // 不会触发end 事件 第一次会触发发close事件 如果有message会触发error事件
        resolve();
      });
    },
    validate: function (socket) { //获取资源池校验资源有效性
      return new Promise((resolve) => {
        // console.log('socket.destroyed:', socket.destroyed, 'socket.readable:', socket.readable, 'socket.writable:', socket.writable);
        if (socket.destroyed || !socket.readable || !socket.writable) {
          return resolve(false);
        } else {
          return resolve(true);
        }
      });
    }
  };
  const pool = genericPool.createPool(factory, options);
  pool.on('factoryCreateError', (err) => { // 监听新建长连接出错 让请求直接返回错误
    const clientResourceRequest = pool._waitingClientsQueue.dequeue();
    if (clientResourceRequest) {
      clientResourceRequest.reject(err);
    }
  });
  return pool;
};

let pool = createPool({
  port: 9000,
  host: '127.0.0.1',
  options: {min: 0, max: 10}
});

使用连接池

下面连接池的使用,使用的协议是我们之前自定义的协议。

let pool = createPool({
  port: 9000,
  host: '127.0.0.1',
  options: {min: 0, max: 10}
});
const getHeader = (num) => {
  return 'length:' + (Array(13).join(0) + num).slice(-13);
}
const request = async (requestDataBuff) => {
  let client;
  try {
    client = await pool.acquire();
  } catch (e) {
    console.log('acquire socket client failed: ', e);
    throw e;
  }
  let timeout = 10000;
  return new Promise((resolve, reject) => {
    let chunks = [];
    let length = 0;
    client.setTimeout(timeout);
    client.removeAllListeners('error');
    client.on('error', (err) => {
      client.removeAllListeners('error');
      client.removeAllListeners('data');
      client.removeAllListeners('timeout');
      pool.destroyed(client);
      reject(err);
    });
    client.on('timeout', () => {
      client.removeAllListeners('error');
      client.removeAllListeners('data');
      client.removeAllListeners('timeout');
      // 应该销毁以防下一个req的data事件监听才返回数据
      pool.destroy(client);
      // pool.release(client);
      reject(`socket connect timeout set ${timeout}`);
    });
    let header = getHeader(requestDataBuff.length);
    client.write(header);
    client.write(requestDataBuff);
    client.on('data', (chunk) => {
      let content = chunk.toString();
      console.log('content', content, content.length);
      // TODO 过滤心跳包
      if (content.indexOf('length:') === 0){
        length = parseInt(content.substring(7,20));
        console.log('length', length);
        chunks =[chunk.slice(20, chunk.length)];
      } else {
        chunks.push(chunk);
      }
      let heap = Buffer.concat(chunks);
      console.log('heap.length', heap.length);
      if (heap.length >= length) {
        pool.release(client);
        client.removeAllListeners('error');
        client.removeAllListeners('data');
        client.removeAllListeners('timeout');
        try {
          // console.log('收到数据', JSON.parse(heap.toString()));
          resolve(JSON.parse(heap.toString()));
        } catch (err) {
          reject(err);
          console.log('数据解析失败');
        }
      }
    });
  });
}
request(Buffer.from(JSON.stringify({a: 'a'})))
  .then((data) => {
    console.log('收到服务的数据',data)
  }).catch(err => {
    console.log(err);
  });

request(Buffer.from(JSON.stringify({b: 'b'})))
  .then((data) => {
    console.log('收到服务的数据',data)
  }).catch(err => {
    console.log(err);
  });

setTimeout(function () { //查看是否会复用Socket 有没有建立新的连接
  request(Buffer.from(JSON.stringify({c: 'c'})))
    .then((data) => {
      console.log('收到服务的数据',data)
    }).catch(err => {
    console.log(err);
  });

  request(Buffer.from(JSON.stringify({d: 'd'})))
    .then((data) => {
      console.log('收到服务的数据',data)
    }).catch(err => {
    console.log(err);
  });
}, 1000)

日志打印:

 socket_pool 127.0.0.1 9000 connect
 socket_pool 127.0.0.1 9000 connect
 content length:0000000000040"服务端的数据数据:{\"a\":\"a\"}" 44
 length 40
 heap.length 40
 收到服务的数据 服务端的数据数据:{"a":"a"}
 content length:0000000000040"服务端的数据数据:{\"b\":\"b\"}" 44
 length 40
 heap.length 40
 收到服务的数据 服务端的数据数据:{"b":"b"}
 content length:0000000000040 20
 length 40
 heap.length 0
 content "服务端的数据数据:{\"c\":\"c\"}" 24
 heap.length 40
 收到服务的数据 服务端的数据数据:{"c":"c"}
 content length:0000000000040"服务端的数据数据:{\"d\":\"d\"}" 44
 length 40
 heap.length 40
 收到服务的数据 服务端的数据数据:{"d":"d"}

这里看到前面两个请求都建立了新的Socket连接 socket_pool 127.0.0.1 9000 connect,定时器结束后重新发起两个请求就没有建立新的Socket连接了,直接从连接池里面获取Socket连接资源。

源码分析

发现主要的代码就位于lib文件夹中的Pool.js
构造函数:
lib/Pool.js

  /**
   * Generate an Object pool with a specified `factory` and `config`.
   *
   * @param {typeof DefaultEvictor} Evictor
   * @param {typeof Deque} Deque
   * @param {typeof PriorityQueue} PriorityQueue
   * @param {Object} factory
   *   Factory to be used for generating and destroying the items.
   * @param {Function} factory.create
   *   Should create the item to be acquired,
   *   and call it's first callback argument with the generated item as it's argument.
   * @param {Function} factory.destroy
   *   Should gently close any resources that the item is using.
   *   Called before the items is destroyed.
   * @param {Function} factory.validate
   *   Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false
   *   If it should be removed from pool.
   * @param {Object} options
   */
  constructor(Evictor, Deque, PriorityQueue, factory, options) {
    super();
    factoryValidator(factory); // 检验我们定义的factory的有效性包含create destroy validate
    this._config = new PoolOptions(options); // 连接池配置
    // TODO: fix up this ugly glue-ing
    this._Promise = this._config.Promise;

    this._factory = factory;
    this._draining = false;
    this._started = false;
    /**
     * Holds waiting clients
     * @type {PriorityQueue}
     */
    this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange); // 请求的对象管管理队列queue 初始化queue的size 1 { _size: 1, _slots: [ Queue { _list: [Object] } ] }
    /**
     * Collection of promises for resource creation calls made by the pool to factory.create
     * @type {Set}
     */
    this._factoryCreateOperations = new Set(); // 正在创建的长连接

    /**
     * Collection of promises for resource destruction calls made by the pool to factory.destroy
     * @type {Set}
     */
    this._factoryDestroyOperations = new Set(); // 正在销毁的长连接

    /**
     * A queue/stack of pooledResources awaiting acquisition
     * TODO: replace with LinkedList backed array
     * @type {Deque}
     */
    this._availableObjects = new Deque(); // 空闲的资源长连接

    /**
     * Collection of references for any resource that are undergoing validation before being acquired
     * @type {Set}
     */
    this._testOnBorrowResources = new Set(); // 正在检验有效性的资源

    /**
     * Collection of references for any resource that are undergoing validation before being returned
     * @type {Set}
     */
    this._testOnReturnResources = new Set();

    /**
     * Collection of promises for any validations currently in process
     * @type {Set}
     */
    this._validationOperations = new Set();// 正在校验的中间temp

    /**
     * All objects associated with this pool in any state (except destroyed)
     * @type {Set}
     */
    this._allObjects = new Set(); // 所有的链接资源 是一个 PooledResource对象

    /**
     * Loans keyed by the borrowed resource
     * @type {Map}
     */
    this._resourceLoans = new Map(); // 被借用的对象的map release的时候用到

    /**
     * Infinitely looping iterator over available object
     * @type {DequeIterator}
     */
    this._evictionIterator = this._availableObjects.iterator(); // 一个迭代器

    this._evictor = new Evictor();

    /**
     * handle for setTimeout for next eviction run
     * @type {(number|null)}
     */
    this._scheduledEviction = null;

    // create initial resources (if factory.min > 0)
    if (this._config.autostart === true) { // 初始化最小的连接数量
      this.start();
    }
  }

可以看到包含之前说的空闲的资源队列,正在请求的资源队列,正在等待的请求队列等。
下面查看 Pool.acquire 方法
lib/Pool.js

/**
   * Request a new resource. The callback will be called,
   * when a new resource is available, passing the resource to the callback.
   * TODO: should we add a seperate "acquireWithPriority" function
   *
   * @param {Number} [priority=0]
   *   Optional.  Integer between 0 and (priorityRange - 1).  Specifies the priority
   *   of the caller if there are no available resources.  Lower numbers mean higher
   *   priority.
   *
   * @returns {Promise}
   */
  acquire(priority) { // 空闲资源队列资源是有优先等级的 
    if (this._started === false && this._config.autostart === false) {
      this.start(); // 会在this._allObjects 添加min的连接对象数
    }
    if (this._draining) { // 如果是在资源释放阶段就不能再请求资源了
      return this._Promise.reject(
        new Error("pool is draining and cannot accept work")
      );
    }
    // 如果要设置了等待队列的长度且要等待 如果超过了就返回资源不可获取
    // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime
    if (
      this._config.maxWaitingClients !== undefined &&
      this._waitingClientsQueue.length >= this._config.maxWaitingClients
    ) {
      return this._Promise.reject(
        new Error("max waitingClients count exceeded")
      );
    }

    const resourceRequest = new ResourceRequest(
      this._config.acquireTimeoutMillis, // 对象里面的超时配置 表示等待时间 会启动一个定时 超时了就触发resourceRequest.promise 的reject触发
      this._Promise
    );
    // console.log(resourceRequest)
    this._waitingClientsQueue.enqueue(resourceRequest, priority); // 请求进入等待请求队列
    this._dispense(); // 进行资源分发 最终会触发resourceRequest.promise的resolve(client) 

    return resourceRequest.promise; // 返回的是一个promise对象resolve却是在其他地方触发
  }
  /**
   * Attempt to resolve an outstanding resource request using an available resource from
   * the pool, or creating new ones
   *
   * @private
   */
  _dispense() {
    /**
     * Local variables for ease of reading/writing
     * these don't (shouldn't) change across the execution of this fn
     */
    const numWaitingClients = this._waitingClientsQueue.length; // 正在等待的请求的队列长度 各个优先级的总和
    console.log('numWaitingClients', numWaitingClients)  // 1

    // If there aren't any waiting requests then there is nothing to do
    // so lets short-circuit
    if (numWaitingClients < 1) {
      return;
    }
    //  max: 10, min: 4
    console.log('_potentiallyAllocableResourceCount', this._potentiallyAllocableResourceCount) // 目前潜在空闲可用的连接数量
    const resourceShortfall =
      numWaitingClients - this._potentiallyAllocableResourceCount; // 还差几个可用的 小于零表示不需要 大于0表示需要新建长连接的数量
    console.log('spareResourceCapacity', this.spareResourceCapacity) // 距离max数量的还有几个没有创建
    const actualNumberOfResourcesToCreate = Math.min(
      this.spareResourceCapacity, // -6
      resourceShortfall // 这个是 -3
    ); // 如果resourceShortfall>0 表示需要新建但是这新建的数量不能超过spareResourceCapacity最多可创建的
    console.log('actualNumberOfResourcesToCreate', actualNumberOfResourcesToCreate) // 如果actualNumberOfResourcesToCreate >0 表示需要创建连接
    for (let i = 0; actualNumberOfResourcesToCreate > i; i++) {
      this._createResource(); // 新增新的长连接
    }

    // If we are doing test-on-borrow see how many more resources need to be moved into test
    // to help satisfy waitingClients
    if (this._config.testOnBorrow === true) { // 如果开启了使用前校验资源的有效性
      // how many available resources do we need to shift into test
      const desiredNumberOfResourcesToMoveIntoTest =
        numWaitingClients - this._testOnBorrowResources.size;// 1
      const actualNumberOfResourcesToMoveIntoTest = Math.min(
        this._availableObjects.length, // 3
        desiredNumberOfResourcesToMoveIntoTest // 1
      );
      for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) { // 需要有效性校验的数量 至少满足最小的waiting clinet
        this._testOnBorrow(); // 资源有效校验后再分发
      }
    }

    // if we aren't testing-on-borrow then lets try to allocate what we can
    if (this._config.testOnBorrow === false) { // 如果没有开启有效性校验 就开启有效资源的分发
      const actualNumberOfResourcesToDispatch = Math.min(
        this._availableObjects.length,
        numWaitingClients
      );
      for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) { // 开始分发资源
        this._dispatchResource();
      }
    }
  }
  /**
   * Attempt to move an available resource to a waiting client
   * @return {Boolean} [description]
   */
  _dispatchResource() {
    if (this._availableObjects.length < 1) {
      return false;
    }

    const pooledResource = this._availableObjects.shift(); // 从可以资源池里面取出一个
    this._dispatchPooledResourceToNextWaitingClient(pooledResource); // 分发
    return false;
  }
  /**
   * Dispatches a pooledResource to the next waiting client (if any) else
   * puts the PooledResource back on the available list
   * @param  {PooledResource} pooledResource [description]
   * @return {Boolean}                [description]
   */
  _dispatchPooledResourceToNextWaitingClient(pooledResource) {
    const clientResourceRequest = this._waitingClientsQueue.dequeue(); // 可能是undefined 取出一个等待的quene
    console.log('clientResourceRequest.state', clientResourceRequest.state);
    if (clientResourceRequest === undefined ||
      clientResourceRequest.state !== Deferred.PENDING) {
      console.log('没有等待的')
      // While we were away either all the waiting clients timed out
      // or were somehow fulfilled. put our pooledResource back.
      this._addPooledResourceToAvailableObjects(pooledResource); // 在可用的资源里面添加一个
      // TODO: do need to trigger anything before we leave?
      return false;
    }
    // TODO clientResourceRequest 的state是否需要判断 如果已经是resolve的状态 已经超时回去了 这个是否有问题
    const loan = new ResourceLoan(pooledResource, this._Promise); 
    this._resourceLoans.set(pooledResource.obj, loan); // _resourceLoans 是个map k=>value  pooledResource.obj 就是socket本身
    pooledResource.allocate(); // 标识资源的状态是正在被使用
    clientResourceRequest.resolve(pooledResource.obj); //  acquire方法返回的promise对象的resolve在这里执行的
    return true;
  }

上面的代码就按种情况一直走下到最终获取到长连接的资源,其他更多代码大家可以自己去深入了解。

查看原文

认证与成就

  • 获得 429 次点赞
  • 获得 101 枚徽章 获得 9 枚金徽章, 获得 38 枚银徽章, 获得 54 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

  • html2Dash

    html2Dash is an Documentation Set generator intended to be used with the Dash.app API browser for OS X or one of its many clones. html2Dash is just like doc2dash but generating docset from any HTML documentations.

注册于 2013-01-11
个人主页被 4.5k 人浏览