书旅

书旅 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

个人微信公众号:IT猿圈

个人动态

书旅 赞了文章 · 2020-12-14

【PHP7源码分析】PHP内存管理

作者: 顺风车运营研发团队 李乐

第一章 从操作系统内存管理说起

程序是代码和数据的集合,进程是运行着的程序;操作系统需要为进程分配内存;进程运行完毕需要释放内存;内存管理就是内存的分配和释放;

1. 分段管理

分段最早出现在8086系统中,当时只有16位地址总线,其能访问的最大地址是64k;当时的内存大小为1M;如何利用16位地址访问1M的内存空间呢?

于是提出了分段式内存管理;
将内存地址分为段地址与段偏移,段地址会存储在寄存器中,段偏移即程序实际使用的地址;当CPU需要访问内存时,会将段地址左移4位,再加上段偏移,即可得到物理内存地址;
即内存地址=段地址*16+段偏移地址。

后来的IA-32在内存中使用一张段表来记录各个段映射的物理内存地址,CPU只需要为这个段表提供一个记录其首地址的寄存器就可以了;如下图所示:

图片描述

进程包含多个段:代码段,数据段,链接库等;系统需要为每个段分配内存;
一种很自然地想法是,根据每个段实际需要的大小进行分配,并记录已经占用的空间和剩余空间:
当一个段请求内存时,如果有内存中有很多大小不一的空闲位置,那么选择哪个最合理?

a)首先适配:空闲链表中选择第一个位置(优点:查表速度快) 
b)最差适配:选择一个最大的空闲区域 
c)最佳适配:选择一个空闲位置大小和申请内存大小最接近的位置,比如申请一个40k内存,而恰巧内存中有一个50k的空闲位置;

内存分段管理具有以下优点:

a)内存共享: 对内存分段,可以很容易把其中的代码段或数据段共享给其他程序;
b)安全性: 将内存分为不同的段之后,因为不同段的内容类型不同,所以他们能进行的操作也不同,比如代码段的内容被加载后就不应该允许写的操作,因为这样会改变程序的行为
c)动态链接: 动态链接是指在作业运行之前,并不把几个目标程序段链接起来。要运行时,先将主程序所对应的目标程序装入内存并启动运行,当运行过程中又需要调用某段时,才将该段(目标程序)调入内存并进行链接。

尽管分段管理的方式解决了内存的分配与释放,但是会带来大量的内存碎片;即尽管我们内存中仍然存在很大空间,但全部都是一些零散的空间,当申请大块内存时会出现申请失败;为了不使这些零散的空间浪费,操作系统会做内存紧缩,即将内存中的段移动到另一位置。但明显移动进程是一个低效的操作。

2.分页管理

先说说虚拟内存的概念。CPU访问物理内存的速度要比磁盘快的多,物理内存可以认为是磁盘的缓存,但物理内存是有限的,于是人们想到利用磁盘空间虚拟出的一块逻辑内存
(这部分磁盘空间Windows下称之为虚拟内存,Linux下被称为交换空间(Swap Space));

虚拟内存和真实的物理内存存在着映射关系;

为了解决分段管理带来的碎片问题,操作系统将虚拟内存分割为虚拟页,相应的物理内存被分割为物理页;而虚拟页和物理页的大小默认都是4K字节;

操作系统以页为单位分配内存:假设需要3k字节的内存,操作系统会直接分配一个4K页给进程
,这就产生了内部碎片(浪费率优于分段管理)

前面说过,物理内存可以认为是磁盘的缓存;虚拟页首先需要分配给进程并创建与物理页的映射关系,然后才能将磁盘数据载入内存供CPU使用;由此可见,虚拟内存系统必须能够记录一个虚拟页是否已经分配给进程;是否已经将磁盘数据载入内存,对应哪个物理页;假如没有载入内存,这个虚拟页存放在磁盘的哪个位置;
于是虚拟页可以分为三种类型:已分配,未缓存,已缓存;

当访问没有缓存的虚拟页时,系统会在物理内存中选择一个牺牲页,并将虚拟页从磁盘赋值到物理内存,替换这个牺牲页;而如果这个牺牲页已经被修改,则还需要写回磁盘;这个过程就是所谓的缺页中断;

虚拟页的集合就称为页表(pageTable),页表就是一个页表条目(page table entry)的数组;每个页表条目都包含有效位标志,记录当前虚拟页是否分配,当前虚拟页的访问控制权限;同时包含物理页号或磁盘地址;

图片描述

进程所看到的地址都是虚拟地址;在访问虚拟地址时,操作系统需要将虚拟地址转化为实际的物理地址;而虚拟地址到物理地址的映射是存储在页表的;

将虚拟地址分为两部分:虚拟页号,记录虚拟页在页表中的偏移量(相当于数组索引);页内偏移量;而页表的首地址是存储在寄存器中;

图片描述

对于32位系统,内存为4G,页大小为4K,假设每个页表项4字节;则页表包含1M个页表项,占用4M的存储空间,页表本身就需要分配1K个物理页;
页表条目太大时,页表本身需要占用更多的物理内存,而且其内存还必须是连续的;

目前有三种优化技术:

1)多级页表
一级页表中的每个PTE负责映射虚拟地址空间中一个4M的片(chunk),每一个片由1024个连续的页面组成;二级页表的每个PTE都映射一个4K的虚拟内存页面;

优点:节约内存(假如一级页表中的PTE为null,则其指向的二级页表就不存在了,而大多数进程4G的虚拟地址空间大部分都是未分配的;只有一级页表才总是需要在主存中,系统可以在需要的时候创建、调入、调出二级页表)
缺点:虚拟地址到物理地址的翻译更复杂了

图片描述

2)TLB
多级页表可以节约内存,但是对于一次地址翻译,增加了内存访问次数,k级页表,需要访问k次内存才能完成地址的翻译;

由此出现了TLB:他是一个更小,访问速度更快的虚拟地址的缓存;当需要翻译虚拟地址时,先在TLB查找,命中的话就可以直接完成地址的翻译;没命中再页表中查找;

图片描述

3)hugePage

因为内存大小是固定的,为了减少映射表的条目,可采取的办法只有增加页的尺寸。hugePage便因此而来,使用大页面2m,4m,16m等等。如此一来映射条目则明显减少。

3.linux虚拟内存

linux为每个进程维护一个单独的虚拟地址空间,进程都以为自己独占了整个内存空间,如图所示:

图片描述
linux将内存组织为一些区域(段)的集合,如代码段,数据段,堆,共享库段,以及用户栈都是不同的区域。每个存在的虚拟页面都保存在某个区域中,不属于任何一个区域的虚拟页是不存在的,不能被进程使用;

![clipboard.png](/img/bV9623)

内核为系统中的每个进程维护一个单独的任务结构task_struct,任务中的一个字段指向mm_struct,他描述了虚拟内存的当前状态。其中包含两个字段:pgd指向第一级页表的基址(当内核运行这个进程时,就将pgd的内容存储在cr3控制寄存器中);mmap指向一个vm_area_struct区域结构的链表;区域结构主要包括以下字段:
vm_start:区域的起始地址;
vm_end:区域的结束地址;
vm_port:指向这个区域所包含页的读写许可权限;
vm_flags:描述这个区域是与其他进程共享的,还是私有的等信息;

当我们访问虚拟地址时,内核会遍历vm_area_struct链表,根据vm_start和vm_end能够判断地址合法性;根据vm_por能够判断地址访问的合法性;
遍历链表时间性能较差,内核会将vm_area_struct区域组织成一棵树;

说到这里就不得不提一下系统调用mmap,其函数声明为

void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )

函数mmap要求内核创建一个新的虚拟内存区域(注意是新的区域,和堆是平级关系,即mmap函数并不是在堆上分配内存的,);最好是从地址addr开始(一般传null),并将文件描述fd符指定的对象的一个连续的chunk(大小为len,从文件偏移offset开始)映射到这个新的区域;当fd传-1时,可用于申请分配内存;

参数port描述这个区域的访问控制权限,可以取以下值:

PROT_EXEC //页内容可以被执行
PROT_READ //页内容可以被读取
PROT_WRITE //页可以被写入
PROT_NONE //页不可访问

参数flags由描述被映射对象类型的位组成,如MAP_SHARED 表示与其它所有映射这个对象的进程共享映射空间;MAP_PRIVATE 表示建立一个写入时拷贝的私有映射,内存区域的写入不会影响到原文件。

php在分配2M以上大内存时,就是直接使用mmap申请的;

第二章 说说内存分配器

malloc是c库函数,用于在堆上分配内存;操作系统给进程分配的堆空间是若干个页,我们再调用malloc向进程请求分配若干字节大小的内存;
malloc就是一种内存分配器,负责堆内存的分配与回收;

同样我们可以使用mmap和munmap来创建和删除虚拟内存区域,以达到内存的申请与释放;

观察第一章第三小节中的虚拟地址空间描述图,每个进程都有一个称为运行时堆的虚拟内存区域,操作系统内核维护着一个变量brk,指向了堆的顶部;并提供系统调用brk(void* addr)和sbrk(incr)来修改变量brk的值,从而实现堆内存的扩张与收缩;

brk函数将brk指针直接设置为某个地址,而sbrk函数将brk从当前位置移动incr所指定的增量;(如果将incr设置为0,则可以获得当前brk指向的地址)

因此我们也可以使用brk()或sbrk()来动态分配/释放内存块;

需要注意的一点是:系统为每一个进程所分配的资源不是无限的,包括可映射的内存空间,即堆内存并不是无限大的;所以当调用malloc将堆内存都分配完时,malloc会使用mmap函数额外再申请一个虚拟内存区域(由此发现,使用malloc申请的内存也并不一定是在堆上)

1.内存分配器设计思路

内存分配器用于处理堆上的内存分配或释放请求;

要实现分配器必须考虑以下几个问题:

1.空闲块组织:如何记录空闲块;如何标记内存块是否空闲;
2.分配:如何选择一个合适的空闲块来处理分配请求;
3.分割:空闲块一般情况会大于实际的分配请求,我们如何处理这个空闲块中的剩余部分;
4.回收:如何处理一个刚刚被释放的块;

思考1:空闲块组织
内存分配与释放请求时完全随机的,最终会造成堆内存被分割为若干个内存小块,其中有些处于已分配状态,有些处于空闲状态;我们需要额外的空间来标记内存状态以及内存块大小;
下图为malloc设计思路:
clipboard.png

注:图中显示额外使用4字节记录当前内存块属性,其中3比特记录是否空闲,29比特记录内存块大小;实际malloc头部格式可能会根据版本等调整;不论我们使用malloc分配多少字节的内存,实际malloc分配的内存都会多几个字节;
注:空闲内存块可能会被组织为一个链表结构,由此可以遍历所有空闲内存块,直到查找到一个满足条件的为止;

思考2:如何选择合适的空闲块
在处理内存分配请求时,需要查找空闲内存链表,找到一个满足申请条件的空闲内存块,选择什么查找算法;而且很有可能存在多个符合条件的空闲内存块,此时如何选择?
目前有很多比较成熟的算法,如首次适配,最佳适配,最差适配等;

思考3:如何分配
在查找到满足条件的空闲内存块时,此内存一般情况会比实际请求分配的内存空间要大;全部分配给用户,浪费空间;因此一般会将此空闲内存块切割为两个小块内存,一块分配给用户,一块标记为新的空闲内存

思考4:如何回收:
当用户调用free()函数释放内存时,需要将此块内存重新标记为空闲内存,并且插入空闲链表;然而需要注意的是,此块内存可能能够与其他空闲内存拼接为更大的空闲内存;此时还需要算法来处理空闲内存的合并;

思考5:内存分配效率问题:
用户请求分配内存时,需要遍历空闲内存链表,直到查找到一个满足申请条件的空闲内存;由此可见,算法复杂度与链表长度成正比;
我们可以将空闲内存按照空间大小组织为多个空闲链表,内存大小相近的形成一个链表;此时只需要根据申请内存大小查找相应空闲链表即可;
更进一步的,空闲内存只会被切割为固定大小,如2^n字节,每种字节大小的空闲内存形成一个链表;(用户实际分配的内存是2^n字节,大于用户实际请求)

总结:任何内存分配器都需要额外的空间(数据结构)记录每个内存块大小及其分配状态;

第三章 内存池

C/C++下内存管理是让几乎每一个程序员头疼的问题,分配足够的内存、追踪内存的分配、在不需要的时候释放内存——这个任务相当复杂。而直接使用系统调用malloc/free、new/delete进行内存分配和释放,有以下弊端:

调用malloc/new,系统需要根据“最先匹配”、“最优匹配”或其他算法在内存空闲块表中查找一块空闲内存,调用free/delete,系统可能需要合并空闲内存块,这些都会产生额外的开销;
频繁使用时会产生大量内存碎片,从而降低程序运行效率;
容易造成内存泄漏;

内存池(memory pool)是代替直接调用malloc/free、new/delete进行内存管理的常用方法,当我们申请内存空间时,首先到我们的内存池中查找合适的内存块,而不是直接向操作系统申请,优势在于:

比malloc/free进行内存申请/释放的方式快
不会产生或很少产生堆碎片
可避免内存泄漏

内存池一般会组织成如下结构:
结构中主要包含block、list 和pool这三个结构体,block结构包含指向实际内存空间的指针,前向和后向指针让block能够组成双向链表;list结构中free指针指向空闲 内存块组成的链表,used指针指向程序使用中的内存块组成的链表,size值为内存块的大小,list之间组成单向链表;pool结构记录list链表的头和尾。

当用户申请内存时,只需要根据所申请内存的大小,遍历list链表,查看是否存在相匹配的size;

clipboard.png

第四章 切入主题——PHP内存管理

PHP并没有直接使用现有的malloc/free来管理内存的分配和释放,而是重新实现了一套内存管理方案;

PHP采取“预分配方案”,提前向操作系统申请一个chunk(2M,利用到hugepage特性),并且将这2M内存切割为不同规格(大小)的若干内存块,当程序申请内存时,直接查找现有的空闲内存块即可;

PHP将内存分配请求分为3种情况:

huge内存:针对大于2M-4K的分配请求,直接调用mmap分配;

large内存:针对小于2M-4K,大于3K的分配请求,在chunk上查找满足条件的若干个连续page;

small内存:针对小于3K的分配请求;PHP拿出若干个页切割为8字节大小的内存块,拿出若干个页切割为16字节大小的内存块,24字节,32字节等等,将其组织成若干个空闲链表;每当有分配请求时,只在对应的空闲链表获取一个内存块即可;

1.PHP内存管理器数据模型

1.1结构体

PHP需要记录申请的所有chunk,需要记录chunk中page的使用情况,要记录每种规格内存的空闲链表,要记录使用mmap分配的huge内存,等等…………

于是有了以下两个结构体:
_zend_mm_heap记录着内存管理器所需的所有数据:

//省略了结构体中很多字段
struct _zend_mm_heap {
    //统计
    size_t             size;                    /* current memory usage */
    size_t             peak;                    /* peak memory usage */
    //由于“预分配”方案,实际使用内存和向操作系统申请的内存大小是不一样的;
    size_t             real_size;               /* current size of allocated pages */
    size_t             real_peak;               /* peak size of allocated pages */
 
    //small内存分为30种;free_slot数组长度为30;数组索引上挂着内存空闲链表
    zend_mm_free_slot *free_slot[ZEND_MM_BINS]; /* free lists for small sizes */
 
    //内存限制
    size_t             limit;                   /* memory limit */
    int                overflow;                /* memory overflow flag */
 
    //记录已分配的huge内存
    zend_mm_huge_list *huge_list;               /* list of huge allocated blocks */
 
    //PHP会分配若干chunk,记录当前主chunk首地址
    zend_mm_chunk     *main_chunk;
     
    //统计chunk数目
    int                chunks_count;            /* number of alocated chunks */
    int                peak_chunks_count;       /* peak number of allocated chunks for current request */ 
}

_zend_mm_chunk记录着当前chunk的所有数据

struct _zend_mm_chunk {
    //指向heap
    zend_mm_heap      *heap;
    //chunk组织为双向链表
    zend_mm_chunk     *next;
    zend_mm_chunk     *prev;
    //当前chunk空闲page数目
    uint32_t           free_pages;              /* number of free pages */
    //当前chunk最后一个空闲的page位置
    uint32_t           free_tail;               /* number of free pages at the end of chunk */
    //每当申请一个新的chunk时,这个chunk的num会递增
    uint32_t           num;
    //预留
    char               reserve[64 - (sizeof(void*) * 3 + sizeof(uint32_t) * 3)];
    //指向heap,只有main_chunk使用
    zend_mm_heap       heap_slot;               /* used only in main chunk */
    //记录512个page的分配情况;0代表空闲,1代表已分配
    zend_mm_page_map   free_map;                /* 512 bits or 64 bytes */
    //记录每个page的详细信息,
    zend_mm_page_info  map[ZEND_MM_PAGES];      /* 2 KB = 512 * 4 */
};

1.2small内存

前面讲过small内存分为30种规格,每种规格的空闲内存都挂在_zend_mm_heap结构体的free_slot数组上;
30种规格内存如下:

//宏定义:第一列表示序号(称之为bin_num),第二列表示每个small内存的大小(字节数);
//第四列表示每次获取多少个page;第三列表示将page分割为多少个大小为第一列的small内存;
#define ZEND_MM_BINS_INFO(_, x, y) \
    _( 0,    8,  512, 1, x, y) \
    _( 1,   16,  256, 1, x, y) \
    _( 2,   24,  170, 1, x, y) \
    _( 3,   32,  128, 1, x, y) \
    _( 4,   40,  102, 1, x, y) \
    _( 5,   48,   85, 1, x, y) \
    _( 6,   56,   73, 1, x, y) \
    _( 7,   64,   64, 1, x, y) \
    _( 8,   80,   51, 1, x, y) \
    _( 9,   96,   42, 1, x, y) \
    _(10,  112,   36, 1, x, y) \
    _(11,  128,   32, 1, x, y) \
    _(12,  160,   25, 1, x, y) \
    _(13,  192,   21, 1, x, y) \
    _(14,  224,   18, 1, x, y) \
    _(15,  256,   16, 1, x, y) \
    _(16,  320,   64, 5, x, y) \
    _(17,  384,   32, 3, x, y) \
    _(18,  448,    9, 1, x, y) \
    _(19,  512,    8, 1, x, y) \
    _(20,  640,   32, 5, x, y) \
    _(21,  768,   16, 3, x, y) \
    _(22,  896,    9, 2, x, y) \
    _(23, 1024,    8, 2, x, y) \
    _(24, 1280,   16, 5, x, y) \
    _(25, 1536,    8, 3, x, y) \
    _(26, 1792,   16, 7, x, y) \
    _(27, 2048,    8, 4, x, y) \
    _(28, 2560,    8, 5, x, y) \
    _(29, 3072,    4, 3, x, y)
 
#endif /* ZEND_ALLOC_SIZES_H */

只有这个宏定义有些功能不好用程序实现,比如bin_num=15时,获得此种small内存的字节数?分配此种small内存时需要多少page呢?
于是有了以下3个数组的定义:

//bin_pages是一维数组,数组大小为30,数组索引为bin_num,
//数组元素为ZEND_MM_BINS_INFO宏中的第四列
#define _BIN_DATA_PAGES(num, size, elements, pages, x, y) pages,
static const uint32_t bin_pages[] = {
  ZEND_MM_BINS_INFO(_BIN_DATA_PAGES, x, y)
};
//bin_elements是一维数组,数组大小为30,数组索引为bin_num,
//数组元素为ZEND_MM_BINS_INFO宏中的第三列
#define _BIN_DATA_ELEMENTS(num, size, elements, pages, x, y) elements,
static const uint32_t bin_elements[] = {
  ZEND_MM_BINS_INFO(_BIN_DATA_ELEMENTS, x, y)
};
//bin_data_size是一维数组,数组大小为30,数组索引为bin_num,
//数组元素为ZEND_MM_BINS_INFO宏中的第二列
#define _BIN_DATA_SIZE(num, size, elements, pages, x, y) size,
static const uint32_t bin_data_size[] = {
  ZEND_MM_BINS_INFO(_BIN_DATA_SIZE, x, y)
};

2.PHP small内存分配方案

2.1设计思路

上一节提到PHP将small内存分为30种不同大小的规格;
每种大小规格的空闲内存会组织为链表,挂在数组_zend_mm_heap结构体的free_slot[bin_num]索引上;

clipboard.png

回顾下free_slot字段的定义:

zend_mm_free_slot *free_slot[ZEND_MM_BINS];
 
struct zend_mm_free_slot {
    zend_mm_free_slot *next_free_slot;
};

可以看出空闲内存链表的每个节点都是一个zend_mm_free_slot结构体,其只有一个next指针字段;

思考:对于8字节大小的内存块,其next指针就需要占8字节的空间,那用户的数据存储在哪里呢?
答案:free_slot是small内存的空闲链表,空闲指的是未分配内存,此时是不需要存储其他数据的;当分配给用户时,此节点会从空闲链表删除,也就不需要维护next指针了;用户可以在8字节里存储任何数据;

思考:假设调用 void*ptr=emalloc(8)分配了一块内存;调用efree(ptr)释放内存时,PHP如何知道这块内存的字节数呢?如何知道这块内存应该插入哪个空闲链表呢?
思考1:第二章指出,任何内存分配器都需要额外的数据结构来标志其管理的每一块内存:空闲/已分配,内存大小等;PHP也不例外;可是我们发现使用emalloc(8)分配内存时,其分配的就只是8字节的内存,并没有额外的空间来存储这块内存的任何属性;
思考2:观察small内存宏定义ZEND_MM_BINS_INFO;我们发现对于每一个page,其只可能被分配为同一种规格;不可能存在一部分分割为8字节大小,一部分分割为16字节大小;也就是说每一个page的所有small内存块属性是相同的;那么只需要记录每一个page的属性即可;
思考3:large内存是同样的思路;申请large内存时,可能需要占若干个page的空间;但是同一个page只会属于一个large内存,不可能将一个page的一部分分给某个large内存;
答案:不管page用于small内存还是large内存分配,只需要记录每一个page的属性即可,PHP将其记录在zend_mm_chunk结构体的zend_mm_page_info map[ZEND_MM_PAGES]字段;长度为512的int数组;对任一块内存,只要能计算出属于哪一个页,就能得到其属性(内存大小);

2.2入口API

//内存分配对外统一入口API为_emalloc;函数内部直接调用zend_mm_alloc_heap,
//其第一个参数就是zend_mm_heap结构体(全局只有一个),第二个参数就是请求分配内存大小
void*  _emalloc(size_t size)
{
    return zend_mm_alloc_heap(AG(mm_heap), size);
}
//可以看出其根据请求内存大小size判断分配small内存还是large内存,还是huge内存
static void *zend_mm_alloc_heap(zend_mm_heap *heap, size_t size)
{
    void *ptr;
 
    if (size <= ZEND_MM_MAX_SMALL_SIZE) {
        
        ptr = zend_mm_alloc_small(heap, size, ZEND_MM_SMALL_SIZE_TO_BIN(size));   
        //注意ZEND_MM_SMALL_SIZE_TO_BIN这个宏定义
        
        return ptr;
    } else if (size <= ZEND_MM_MAX_LARGE_SIZE) {
        ptr = zend_mm_alloc_large(heap, size);
        return ptr;
    } else {
        return zend_mm_alloc_huge(heap, size);
    }
}
 
 //使用到的宏定义如下
#define ZEND_MM_CHUNK_SIZE (2 * 1024 * 1024)               /* 2 MB  */
#define ZEND_MM_PAGE_SIZE  (4 * 1024)                      /* 4 KB  */
#define ZEND_MM_PAGES      (ZEND_MM_CHUNK_SIZE / ZEND_MM_PAGE_SIZE)  /* 512 */
#define ZEND_MM_FIRST_PAGE (1)
#define ZEND_MM_MAX_SMALL_SIZE      3072
#define ZEND_MM_MAX_LARGE_SIZE      (ZEND_MM_CHUNK_SIZE - (ZEND_MM_PAGE_SIZE * ZEND_MM_FIRST_PAGE))

2.3计算规格(bin_num)

我们发现在调用zend_mm_alloc_small时,使用到了ZEND_MM_SMALL_SIZE_TO_BIN,其定义了一个函数,用于将size转换为bin_num;即请求7字节时,实际需要分配8字节,bin_num=1;请求37字节时,实际需要分配40字节,bin_num=4;即根据请求的size计算满足条件的最小small内存规格的bin_num;

#define ZEND_MM_SMALL_SIZE_TO_BIN(size)  zend_mm_small_size_to_bin(size)
 
static zend_always_inline int zend_mm_small_size_to_bin(size_t size)
{
 
    unsigned int t1, t2;
 
    if (size <= 64) {
        /* we need to support size == 0 ... */
        return (size - !!size) >> 3;
    } else {
        t1 = size - 1;
        t2 = zend_mm_small_size_to_bit(t1) - 3;
        t1 = t1 >> t2;
        t2 = t2 - 3;
        t2 = t2 << 2;
        return (int)(t1 + t2);
        //看到这一堆t1,t2,脑子里只有一个问题:我是谁,我在哪,这是啥;
    }
}

1)先分析size小于64情况:看看small内存前8组大小定义,8,16,24,32,48,56,64;很简单,就是等差数列,递增8;所以对于每个size只要除以8就可以了(右移3位);但是对于size=8,16,24,32,40,48,56,64这些值,需要size-1然后除以8才满足;考虑到size=0的情况,于是有了(size - !!size) >> 3这个表达式;

2)当size大于64时,情况就复杂了:small内存的字节数变化为,64,80,96,112,128,160,192,224,256,320,384,448,512……;递增16,递增32,递增64……;

还是先看看二进制吧:

clipboard.png

我们将size每4个分为一组,第一组比特序列长度为7,第二组比特序列长度为8,……;(即我们可以根据比特序列长度获得sise属于哪一组;思考一下,递增16,32时,为什么只会加四次呢?)

那我们可以这么算:1)计算出size属于第几组;2)计算size在组内的偏移量;3)计算组开始位置。思路就是这样,但是计算方法并不统一,只要找规律计算出来即可。

//计算当前size属于哪一组;也就是计算比特序列长度;也就是计算最高位是1的位置;
 
//从低到高位查找也行,O(n)复杂度;使用二分查号,复杂度log(n)
 
//size最大为3072(不知道的回去看small内存宏定义);将size的二进制看成16比特的序列;
//先按照8二分,再按照4或12二分,再按照2/6/10/16二分……
 
//思路:size与255比较(0xff)比较,如果小于,说明高8位全是0,只需要在低8位查找即可;
//…………
 
/* higher set bit number (0->N/A, 1->1, 2->2, 4->3, 8->4, 127->7, 128->8 etc) */
static zend_always_inline int zend_mm_small_size_to_bit(int size)
{
    int n = 16;
    if (size <= 0x00ff) {n -= 8; size = size << 8;}
    if (size <= 0x0fff) {n -= 4; size = size << 4;}
    if (size <= 0x3fff) {n -= 2; size = size << 2;}
    if (size <= 0x7fff) {n -= 1;}
    return n;
}

2.4开始分配了

前面说过small空闲内存会形成链表,挂在zen_mm_heap字段free_slot[bin_num]上;

最初请求分配时,free_slot[bin_num]可能还没有初始化,指向null;此时需要向chunk分配若干页,将页分割为大小相同的内存块,形成链表,挂在free_slot[bin_num]

static zend_always_inline void *zend_mm_alloc_small(zend_mm_heap *heap, size_t size, int bin_num)
{
    //空闲链表不为null,直接分配
    if (EXPECTED(heap->free_slot[bin_num] != NULL)) {
        zend_mm_free_slot *p = heap->free_slot[bin_num];
        heap->free_slot[bin_num] = p->next_free_slot;
        return (void*)p;
    } else {
    //先分配页
        return zend_mm_alloc_small_slow(heap, bin_num;
    }
}
//分配页;切割;形成链表
static zend_never_inline void *zend_mm_alloc_small_slow(zend_mm_heap *heap, uint32_t bin_num)
{
    zend_mm_chunk *chunk;
    int page_num;
    zend_mm_bin *bin;
    zend_mm_free_slot *p, *end;
 
    //分配页(页数目是small内存宏定义第四列);放在下一节large内存分配讲解
    bin = (zend_mm_bin*)zend_mm_alloc_pages(heap, bin_pages[bin_num]);
 
    if (UNEXPECTED(bin == NULL)) {
        /* insufficient memory */
        return NULL;
    }
     
    //之前提过任何内存分配器都需要额外的数据结构记录每块内存的属性;分析发现PHP每个page的所有内存块属性都是相同的;且存储在zend_mm_chunk结构体的map字段(512个int)
    //bin即页的首地址;需要计算bin是当前chunk的第几页:1)得到chunk首地址;2)得到bin相对chunk首地址偏移量;3)除以页大小
    chunk = (zend_mm_chunk*)ZEND_MM_ALIGNED_BASE(bin, ZEND_MM_CHUNK_SIZE);
    page_num = ZEND_MM_ALIGNED_OFFSET(bin, ZEND_MM_CHUNK_SIZE) / ZEND_MM_PAGE_SIZE;
     
    //记录页属性;后面分析(对于分配的每个页都要记录属性)
    chunk->map[page_num] = ZEND_MM_SRUN(bin_num);
    if (bin_pages[bin_num] > 1) {
        uint32_t i = 1;
 
        do {
            chunk->map[page_num+i] = ZEND_MM_NRUN(bin_num, i);
            i++;
        } while (i < bin_pages[bin_num]);
    }
 
    //切割内存;形成链表(bin_data_size,bin_elements是上面介绍过的small内存相关数组)
    end = (zend_mm_free_slot*)((char*)bin + (bin_data_size[bin_num] * (bin_elements[bin_num] - 1)));
    heap->free_slot[bin_num] = p = (zend_mm_free_slot*)((char*)bin + bin_data_size[bin_num]);
    do {
        p->next_free_slot = (zend_mm_free_slot*)((char*)p + bin_data_size[bin_num]);
        p = (zend_mm_free_slot*)((char*)p + bin_data_size[bin_num]);
    } while (p != end);
 
    /* terminate list using NULL */
    p->next_free_slot = NULL;
 
    /* return first element */
    return (char*)bin;
}

2.5说说记录页属性的map

1)对任意地址p,如何计算页号?

地址p减去chunk首地址获得偏移量;偏移量除4K即可;问题是如何获得chunk首地址?我们看看源码:

chunk = (zend_mm_chunk*)ZEND_MM_ALIGNED_BASE(bin, ZEND_MM_CHUNK_SIZE);
page_num = ZEND_MM_ALIGNED_OFFSET(bin, ZEND_MM_CHUNK_SIZE) / ZEND_MM_PAGE_SIZE;
 
#define ZEND_MM_ALIGNED_OFFSET(size, alignment) \
    (((size_t)(size)) & ((alignment) - 1))
#define ZEND_MM_ALIGNED_BASE(size, alignment) \
    (((size_t)(size)) & ~((alignment) - 1))
#define ZEND_MM_SIZE_TO_NUM(size, alignment) \
    (((size_t)(size) + ((alignment) - 1)) / (alignment))

我们发现计算偏移量或chunk首地址时,需要两个参数:size,地址p;alignment,调用时传的是ZEND_MM_CHUNK_SIZE(2M);

其实PHP在申请chunk时,额外添加了一个条件:chunk首地址2M字节对齐;

clipboard.png

如图,2M字节对齐时,给定任意地址p,p的低21位即地址p相对于chunk首地址的偏移量;

那如何保证chunk首地址2M字节对齐呢?分析源码:

//chunk大小为size 2M;chunk首地址对齐方式 2M
static void *zend_mm_chunk_alloc_int(size_t size, size_t alignment)
{
    void *ptr = zend_mm_mmap(size);
 
    if (ptr == NULL) {
        return NULL;
    } else if (ZEND_MM_ALIGNED_OFFSET(ptr, alignment) == 0) { //2M对齐,直接返回
        return ptr;
    } else {
        size_t offset;
 
        //没有2M对齐,先释放,再重新分配2M+2M-4K空间
        //重新分配大小为2M+2M也是可以的(减4K是因为操作系统分配内存按页分配的,页大小4k)
        //此时总能定位一段2M的内存空间,且首地址2M对齐
        zend_mm_munmap(ptr, size);
        ptr = zend_mm_mmap(size + alignment - REAL_PAGE_SIZE);
 
        //分配了2M+2M-4K空间,需要释放前面、后面部分空间。只保留中间按2M字节对齐的chunk即可
        offset = ZEND_MM_ALIGNED_OFFSET(ptr, alignment);
        if (offset != 0) {
            offset = alignment - offset;
            zend_mm_munmap(ptr, offset);
            ptr = (char*)ptr + offset;
            alignment -= offset;
        }
        if (alignment > REAL_PAGE_SIZE) {
            zend_mm_munmap((char*)ptr + size, alignment - REAL_PAGE_SIZE);
        }
        return ptr;
    }
}
//理论分析,申请2M空间,能直接2M字节对齐的概率很低;但是实验发现,概率还是蛮高的,这可能与内核分配内存有关;

2)每个页都需要记录哪些属性?

chunk里的某个页,可以分配为large内存,large内存连续占多少个页;可以分配为small内存,对应的是哪种规格的small内存(bin_num)

//29-31比特表示当前页分配为small还是large
//当前页用于large内存分配
#define ZEND_MM_IS_LRUN                  0x40000000
//当前页用于small内存分配
#define ZEND_MM_IS_SRUN                  0x80000000
 
//对于large内存,0-9比特表示分配的页数目
#define ZEND_MM_LRUN_PAGES_MASK          0x000003ff
#define ZEND_MM_LRUN_PAGES_OFFSET        0
 
//对于small内存,0-4比特表示bin_num
#define ZEND_MM_SRUN_BIN_NUM_MASK        0x0000001f
#define ZEND_MM_SRUN_BIN_NUM_OFFSET      0
 
//count即large内存占了多少个页
#define ZEND_MM_LRUN(count)              (ZEND_MM_IS_LRUN | ((count) << ZEND_MM_LRUN_PAGES_OFFSET))
#define ZEND_MM_SRUN(bin_num)            (ZEND_MM_IS_SRUN | ((bin_num) << ZEND_MM_SRUN_BIN_NUM_OFFSET))

再回顾一下small内存30种规格的宏定义,bin_num=16、17、20-29时,需要分配大于1个页;此时不仅需要记录bin_num,还需要记录其对应的页数目

#define ZEND_MM_SRUN_BIN_NUM_MASK        0x0000001f
#define ZEND_MM_SRUN_BIN_NUM_OFFSET      0
 
#define ZEND_MM_SRUN_FREE_COUNTER_MASK   0x01ff0000
#define ZEND_MM_SRUN_FREE_COUNTER_OFFSET 16
 
#define ZEND_MM_NRUN_OFFSET_MASK         0x01ff0000
#define ZEND_MM_NRUN_OFFSET_OFFSET       16
 
//当前页分配为small内存;0-4比特存储bin_num;16-25存储当前规格需要分配的页数目;
#define ZEND_MM_SRUN_EX(bin_num, count)  (ZEND_MM_IS_SRUN | ((bin_num) << ZEND_MM_SRUN_BIN_NUM_OFFSET) |
        ((count) << ZEND_MM_SRUN_FREE_COUNTER_OFFSET))
 
//29-31比特表示同时属于small内存和large内存;0-4比特存储bin_num;16-25存储偏移量
//对于bin_num=29,需要分配3个页,假设为10,11,12号页
//map[10]=ZEND_MM_SRUN_EX(29,3);map[11]=ZEND_MM_NRUN(29,1);map[12]=ZEND_MM_NRUN(29,2);
#define ZEND_MM_NRUN(bin_num, offset)    (ZEND_MM_IS_SRUN | ZEND_MM_IS_LRUN | ((bin_num) << ZEND_MM_SRUN_BIN_NUM_OFFSET) |
        ((offset) << ZEND_MM_NRUN_OFFSET_OFFSET))

3.large内存分配:

需要从chunk中查找连续pages_count个空闲的页;zend_mm_chunk结构体的free_map为512个比特,记录着每个页空闲还是已分配;

以64位机器为例,free_map又被分为8组;每组64比特,看作uint32_t类型;

#define ZEND_MM_CHUNK_SIZE (2 * 1024 * 1024)               /* 2 MB  */
#define ZEND_MM_PAGE_SIZE  (4 * 1024)                      /* 4 KB  */
#define ZEND_MM_PAGES      (ZEND_MM_CHUNK_SIZE / ZEND_MM_PAGE_SIZE)  /* 512 */
 
typedef zend_ulong zend_mm_bitset;    /* 4-byte or 8-byte integer */
 
#define ZEND_MM_BITSET_LEN      (sizeof(zend_mm_bitset) * 8)       /* 32 or 64 */
#define ZEND_MM_PAGE_MAP_LEN    (ZEND_MM_PAGES / ZEND_MM_BITSET_LEN) /* 16 or 8 */
static void *zend_mm_alloc_pages(zend_mm_heap *heap, uint32_t pages_count)
{
    //获取main_chunk
    zend_mm_chunk *chunk = heap->main_chunk;
    uint32_t page_num, len;
    int steps = 0;
 
    //其实就是最佳适配算法
    while (1) {
        //free_pages记录当前chunk的空闲页数目
        if (UNEXPECTED(chunk->free_pages < pages_count)) {
            goto not_found;
        } else {
            /* Best-Fit Search */
            int best = -1;
            uint32_t best_len = ZEND_MM_PAGES;
 
            //从free_tail位置开始,后面得页都是空闲的
            uint32_t free_tail = chunk->free_tail;
            zend_mm_bitset *bitset = chunk->free_map;
            zend_mm_bitset tmp = *(bitset++);
            uint32_t i = 0;
            //从第一组开始遍历;查找若干连续空闲页;i实际每次递增64;
            //最佳适配算法;查找到满足条件的间隙,空闲页数目大于pages_count;
            //best记录间隙首位置;best_len记录间隙空闲页数目
            while (1) {
                //注意:(zend_mm_bitset)-1,表示将-1强制类型转换为64位无符号整数,即64位全1(表示当前组的页全被分配了)
                while (tmp == (zend_mm_bitset)-1) {
                    i += ZEND_MM_BITSET_LEN;
                    if (i == ZEND_MM_PAGES) {
                        if (best > 0) {
                            page_num = best;
                            goto found;
                        } else {
                            goto not_found;
                        }
                    }
                    tmp = *(bitset++); //当前组的所有页都分配了,递增到下一组
                }
                //每一个空闲间隙,肯定有若干个比特0,查找第一个比特0的位置:
                //假设当前tmp=01111111(低7位全1,高位全0);则zend_mm_bitset_nts函数返回8
                page_num = i + zend_mm_bitset_nts(tmp); 函数实现后面分析
                 
                //tmp+1->10000000;  tmp&(tmp+1)  其实就是把tmp的低8位全部置0,只保留高位
                tmp &= tmp + 1;
                 
                //如果此时tmp == 0,说明从第个页page_num到当前组最后一个页,都是未分配的;
                //否则,需要找出这个空闲间隙另外一个0的位置,相减才可以得出空闲间隙页数目
                while (tmp == 0) {
                    i += ZEND_MM_BITSET_LEN; //i+64,如果超出free_tail或者512,说明从page_num开始后面所有页都是空闲的;否则遍历下一组
                    if (i >= free_tail || i == ZEND_MM_PAGES) {
                        len = ZEND_MM_PAGES - page_num;
                        if (len >= pages_count && len < best_len) {   //从page_num处开始后面页都空闲,且剩余页数目小于已经查找到的连续空闲页数目,直接分配
                            chunk->free_tail = page_num + pages_count;
                            goto found;
                        } else {  //当前空闲间隙页不满足条件
                              
                            chunk->free_tail = page_num;
                            if (best > 0) { //之前有查找到空闲间隙符合分配条件
                                page_num = best;
                                goto found;
                            } else {  //之前没有查找到空闲页满足条件,说明失败
                                goto not_found;
                            }
                        }
                    }
                    tmp = *(bitset++); //遍历下一组
                }
                 
                //假设最初tmp=1111000001111000111111,tmp&=tmp+1后,tmp=1111000001111000 000000
                //上面while循环进不去;且page_num=7+i;
                //此时需从低到高位查找第一个1比特位置,为11,11+i-(7+i)=4,即是连续空闲页数目
                len = i + zend_ulong_ntz(tmp) - page_num;
                if (len >= pages_count) { //满足分配条件,记录
                    if (len == pages_count) {
                        goto found;
                    } else if (len < best_len) {
                        best_len = len;
                        best = page_num;
                    }
                }
                 
                //上面计算后tmp=1111000001111000 000000;发现这一组还有一个空闲间隙,拥有5个空闲页,下一个循环肯定需要查找出来;
                //而目前低10比特其实已经查找过了,那么需要将低10比特全部置1,以防再次查找到;
                //tmp-1:1111000001110111 111111; tmp |= tmp - 1:1111000001111111 111111
                tmp |= tmp - 1;
            }
        }
 
not_found:
        ………………
found:
     
    //查找到满足条件的连续页,设置从page_num开始pages_count个页为已分配
    chunk->free_pages -= pages_count;
    zend_mm_bitset_set_range(chunk->free_map, page_num, pages_count);
    //标志当前页用于large内存分配,分配数目为pages_count
    chunk->map[page_num] = ZEND_MM_LRUN(pages_count);
    //更新free_tail
    if (page_num == chunk->free_tail) {
        chunk->free_tail = page_num + pages_count;
    }
    //返回当前第一个page的首地址
    return ZEND_MM_PAGE_ADDR(chunk, page_num);
}
 
//4K大小的字节数组
struct zend_mm_page {
    char               bytes[ZEND_MM_PAGE_SIZE];
};
 
//偏移page_num*4K
#define ZEND_MM_PAGE_ADDR(chunk, page_num) \
    ((void*)(((zend_mm_page*)(chunk)) + (page_num)))

看看PHP是如何高效查找0比特位置的:依然是二分查找

static zend_always_inline int zend_mm_bitset_nts(zend_mm_bitset bitset)
{
    int n=0;
//64位机器才会执行
#if SIZEOF_ZEND_LONG == 8
    if (sizeof(zend_mm_bitset) == 8) {
        if ((bitset & 0xffffffff) == 0xffffffff) {n += 32; bitset = bitset >> Z_UL(32);}
    }
#endif
    if ((bitset & 0x0000ffff) == 0x0000ffff) {n += 16; bitset = bitset >> 16;}
    if ((bitset & 0x000000ff) == 0x000000ff) {n +=  8; bitset = bitset >>  8;}
    if ((bitset & 0x0000000f) == 0x0000000f) {n +=  4; bitset = bitset >>  4;}
    if ((bitset & 0x00000003) == 0x00000003) {n +=  2; bitset = bitset >>  2;}
    return n + (bitset & 1);
}

4.huge内存分配:

//
#define ZEND_MM_ALIGNED_SIZE_EX(size, alignment) \
    (((size) + ((alignment) - Z_L(1))) & ~((alignment) - Z_L(1)))
 
//会将size扩展为2M字节的整数倍;直接调用分配chunk的函数申请内存
//huge内存以n*2M字节对齐的
static void *zend_mm_alloc_huge(zend_mm_heap *heap, size_t size)
{
    size_t new_size = ZEND_MM_ALIGNED_SIZE_EX(size, MAX(REAL_PAGE_SIZE, ZEND_MM_CHUNK_SIZE));
     
    void *ptr = zend_mm_chunk_alloc(heap, new_size, ZEND_MM_CHUNK_SIZE);
    return ptr;
}

5.内存释放

ZEND_API void ZEND_FASTCALL _efree(void *ptr)
{
    zend_mm_free_heap(AG(mm_heap), ptr);
}
 
static zend_always_inline void zend_mm_free_heap(zend_mm_heap *heap, void *ptr)
{
    //计算当前地址ptr相对于chunk的偏移
    size_t page_offset = ZEND_MM_ALIGNED_OFFSET(ptr, ZEND_MM_CHUNK_SIZE);
 
    //偏移为0,说明是huge内存,直接释放
    if (UNEXPECTED(page_offset == 0)) {
        if (ptr != NULL) {
            zend_mm_free_huge(heap, ptr);
        }
    } else {
        //计算chunk首地址
        zend_mm_chunk *chunk = (zend_mm_chunk*)ZEND_MM_ALIGNED_BASE(ptr, ZEND_MM_CHUNK_SIZE);
        //计算页号
        int page_num = (int)(page_offset / ZEND_MM_PAGE_SIZE);
        //获得页属性信息
        zend_mm_page_info info = chunk->map[page_num];
 
        //small内存
        if (EXPECTED(info & ZEND_MM_IS_SRUN)) {
            zend_mm_free_small(heap, ptr, ZEND_MM_SRUN_BIN_NUM(info));
        }
        //large内存
        else /* if (info & ZEND_MM_IS_LRUN) */ {
            int pages_count = ZEND_MM_LRUN_PAGES(info);
             //将页标记为空闲
            zend_mm_free_large(heap, chunk, page_num, pages_count);
        }
    }
}

static zend_always_inline void zend_mm_free_small(zend_mm_heap *heap, void *ptr, int bin_num)
{
    zend_mm_free_slot *p;
    //插入空闲链表头部即可
    p = (zend_mm_free_slot*)ptr;
    p->next_free_slot = heap->free_slot[bin_num];
    heap->free_slot[bin_num] = p;
}


6.zend_mm_heap和zend_mm_chunk

PHP有一个全局唯一的zend_mm_heap,其是zend_mm_chunk一个字段;

zend_mm_chunk至少需要空间2k+;和zend_mm_chunk存储在哪里?

这两个结构体其实是存储在chunk的第一个页,即chunk的第一个页始终是分配的,且用户不能申请的;

申请的多个chunk之间是形成双向链表的;如下图所示:

clipboard.png

static zend_mm_heap *zend_mm_init(void)
{
    //将分配的2M空间,强制转换为zend_mm_chunk*;并初始化zend_mm_chunk结构体
    zend_mm_chunk *chunk = (zend_mm_chunk*)zend_mm_chunk_alloc_int(ZEND_MM_CHUNK_SIZE, ZEND_MM_CHUNK_SIZE);
    zend_mm_heap *heap;
 
    heap = &chunk->heap_slot;
    chunk->heap = heap;
    chunk->next = chunk;
    chunk->prev = chunk;
    chunk->free_pages = ZEND_MM_PAGES - ZEND_MM_FIRST_PAGE;
    chunk->free_tail = ZEND_MM_FIRST_PAGE;
    chunk->num = 0;
    chunk->free_map[0] = (Z_L(1) << ZEND_MM_FIRST_PAGE) - 1;
    chunk->map[0] = ZEND_MM_LRUN(ZEND_MM_FIRST_PAGE);
    heap->main_chunk = chunk;
    heap->cached_chunks = NULL;
    heap->chunks_count = 1;
    heap->peak_chunks_count = 1;
    heap->cached_chunks_count = 0;
    heap->avg_chunks_count = 1.0;
    heap->last_chunks_delete_boundary = 0;
    heap->last_chunks_delete_count = 0;
    heap->huge_list = NULL;
    return heap;
}

7. PHP内存管理器初始化流程:

PHP虚拟机什么时候初始化内管理器呢?heap与chunk又是什么时候初始化呢?
下图为PHP内存管理器初始化流程;
有兴趣同学可以在相关函数处加断点,跟踪内存管理器初始化流程;

clipboard.png

8. PHP内存管理总结:

1)需要明白一点:任何内存分配器都需要额外的数据结构来记录内存的分配情况;

2)内存池是代替直接调用malloc/free、new/delete进行内存管理的常用方法;内存池中空闲内存块组织为链表结果,申请内存只需要查找空闲链表即可,释放内存需要将内存块重新插入空闲链表;

3)PHP采用预分配内存策略,提前向操作系统分配2M字节大小内存,称为chunk;同时将内存分配请求根据字节大小分为small、huge、large三种;

4)small内存,采用“分离存储”思想;将空闲内存块按照字节大小组织为多个空闲链表;

5)large内存每次回分配连续若干个页,采用最佳适配算法;

6)huge内存直接使用mmap函数向操作系统申请内存(申请大小是2M字节整数倍);

7)chunk中的每个页只会被切割为相同规格的内存块;所以不需要再每个内存块添加头部,只需要记录每个页的属性即可;

8)如何方便根据地址计算当前内存块属于chunk中的哪一个页?PHP分配的chunk都是2M字节对齐的,任意地址的低21位即是相对chunk首地址,除以页大小则可获得页号;

结束语

本文首先简单介绍了计算机操作系统内存相关知识,然后描述了malloc内存分配器设计思路,以及内存池的简单理论;最后从源码层面详细分析了PHP内存管理器的实现;相信通过这篇文章,大家对内存管理页有了一定的了解;
对于PHP源码有兴趣的同学,欢迎加入我们的微信群,我们可以一起探讨与学习;

clipboard.png

同时欢迎关注微博:
图片描述

查看原文

赞 48 收藏 43 评论 2

书旅 发布了文章 · 2020-12-09

数据结构与算法系列之散列表(一)(GO)

关于散列表的代码实现及下边实践部分的代码实现均可从我的Github获取(欢迎star^_^)

散列思想

概念

散列表(Hash Table),也可以叫它哈希表或者Hash表

散列表用的是数组支持按照下标随机访问数据的特性,所以散列表其实就是数组的一种扩展,由数组演化而来。可以说,如果没有数组,就没有散列表

举例

假设全校有1000名学生,为了方便记录他们的期末成绩,会给每个学生一个编号,编号从1~1000。现在如果要实现通过编号来找到具体的学生

可以把这1000个学生的信息放在数组里。编号为1的学生,放到数组中下标为1的位置;编号为2的学生,放到数组中下标为2的位置。以此类推,编号为k的学生放到数组中下标为k的位置

因为编号跟数组下标一一对应,当我们需要查询编号为x的学生的时候,只需要将下标为x的数组元素取出来就可以了,时间复杂度就是O(1)

实际上,这个例子已经用到了散列的思想。在这个例子里,编号是自然数,并且与数组的下标形成一一映射,所以利用数组支持根据下标随机访问的特性,查找的时间复杂度是O(1) ,就可以实现快速查找编号对应的学生信息

但是,上边这个例子用到的散列思想不够明显,改一下

假设编号不能设置得这么简单,要加上年级、班级这些更详细的信息,所以这里把编号的规则稍微修改了一下,用8位数字来表示。比如05110067,其中,前两位05表示年级,中间两位11表示班级,最后四位还是原来的编号1到1000。这个时候该如何存储学生信息,才能够支持通过编号来快速查找学生信息呢?

思路还是跟前面类似。尽管不能直接把编号作为数组下标,但可以截取编号的后四位作为数组下标,来存取学生信息数据。当通过编号查询学生信息的时候,用同样的方法,取编号的后四位,作为数组下标,来读取数组中的数据

这就是典型的散列思想。其中,学生的编号叫作键(key)或者关键字。用它来标识一个学生。把学生编号转化为数组下标的映射方法就叫作散列函数(或“Hash函数”“哈希函数”),而散列函数计算得到的值就叫作散列值(或“Hash 值”“哈希值”)

总结

散列表用的就是数组支持按照下标随机访问的时候,时间复杂度是O(1)的特性。通过散列函数把元素的键值映射为下标,然后将数据存储在数组中对应下标的位置。当按照键值查询元素时,用同样的散列函数,将键值转化数组下标,从对应的数组下标的位置取数据

散列函数

概念

散列函数,顾名思义,它是一个函数。可以把它定义成hash(key) ,其中key表示元素的键值,hash(key)的值表示经过散列函数计算得到的散列值

在上边的例子中,编号就是数组下标,所以hash(key)就等于 key。改造后的例子,写成散列函数就像下边这样

func hash(key string) int {
    hashValue,_ := strconv.Atoi(key[len(key)-4:])

    return hashValue
}

散列函数基本要求

上边的的例子,散列函数比较简单,也比较容易想到。但是,如果学生的编号是随机生成的6位数字,又或者用的是a到z之间的字符串,这种情况,散列函数就会复杂一些

散列函数设计的基本要求

  • 散列函数计算得到的散列值是一个非负整数
  • 如果key1 = key2,那hash(key1) == hash(key2)
  • 如果key1 ≠ key2,那hash(key1) ≠ hash(key2)

第一点理解起来应该没有任何问题。因为数组下标是从0开始的,所以散列函数生成的散列值也要是非负整数。第二点也很好理解。相同的key,经过散列函数得到的散列值也应该是相同的

第三点理解起来可能会有问题。这个要求看起来合情合理,但是在真实的情况下,要想找到一个不同的key对应的散列值都不一样的散列函数,几乎是不可能的。即便像业界著名的MD5、SHA、CRC等哈希算法,也无法完全避免这种散列冲突。而且,因为数组的存储空间有限,也会加大散列冲突的概率

所以,几乎无法找到一个完美的无冲突的散列函数,即便能找到,付出的时间成本、计算成本也是很大的,所以针对散列冲突问题,需要通过其他途径来解决

散列冲突

开放寻址法

开放寻址法的核心思想是,如果出现了散列冲突,就重新探测一个空闲位置,将其插入
重新探测一个空闲位置的方法有好几个,这里以线性探测举例

当往散列表中插入数据时,如果某个数据经过散列函数散列之后,存储位置已经被占用了,就从当前位置开始,依次往后查找,看是否有空闲位置,直到找到为止。看下图

从图中可以看出,散列表的大小为10,在元素x插入散列表之前,已经有6个元素插入到散列表中。x经过hash算法之后,被散列到下标为7的位置,但是这个位置已经有数据了,所以就产生了冲突。于是就顺序地往后一个一个找,看有没有空闲的位置,遍历到尾部都没有找到空闲的位置,于是再从表头开始找,直到找到空闲位置2,于是将其插入到这个位置

在散列表中查找元素的过程类似插入过程。通过散列函数求出要查找元素的键值对应的散列值,然后比较数组中下标为散列值的元素和要查找的元素。如果相等,则说明就是我们要找的元素;否则就顺序往后依次查找。如果遍历到数组中的空闲位置,还没有找到,就说明要查找的元素并没有在散列表中

散列表和数组一样,也支持插入、查找、删除操作,但是对于线性探测方法解决散列冲突,在进行删除操作时比较特殊,不能单纯地把要删除的元素设置为空

上边在说散列表的查找操作时,通过线性探测的方式找到一个空闲位置,然后就认定散列表中不存在该数据。如果说这个空闲位置是后来删除的,就会导致原来的线性探测有问题,可能本来存在的数据,以为不存在了

这种情况下,就需要将删除的元素加一个删除的标记。在进行线性探测的时候,如果遇到删除标记的元素,则继续向下探测

小伙伴肯定已经看出问题了,当散列表中插入的数据越来越多时,散列冲突发生的可能性就会越来越大,空闲位置会越来越少,线性探测的时间就会越来越久。极端情况下,可能需要探测整个散列表,所以最坏情况下的时间复杂度为O(n)。同理,在删除和查找时,也有可能会线性探测整张散列表,才能找到要查找或者删除的数据

对于开放寻址冲突解决方法,除了线性探测方法之外,还有另外两种比较经典的探测方法,二次探测(Quadratic probing)和双重散列(Double hashing)

二次探测

二次探测跟线性探测很像,线性探测每次探测的步长是1,它探测的下标序列就是hash(key)+0,hash(key)+1,hash(key)+2……而二次探测探测的步长就变成了原来的“二次方”,它探测的下标序列是hash(key)+0,hash(key)+1^2,hash(key)+2^2,hash(key)+3^2……

双重散列

双重散列意思就是不仅要使用一个散列函数。使用一组散列函数 hash1(key),hash2(key),hash3(key)……先用第一个散列函数,如果计算得到的存储位置已经被占用,再用第二个散列函数,依次类推,直到找到空闲的存储位置

开放寻址法总结

不管采用哪种探测方法,当散列表中空闲位置不多的时候,散列冲突的概率就会大大提高。为了尽可能保证散列表的操作效率,一般情况下,会尽可能保证散列表中有一定比例的空闲槽位。用装载因子(load factor)来表示空位的多少

装载因子的计算公式

散列表的装载因子 = 填入表中的元素个数 / 散列表的长度

装载因子越大,说明空闲位置越少,冲突越多,散列表的性能会下降

链表法

链表法是一种更加常用的散列冲突解决办法,相比开放寻址法,它比较简单。在散列表中,每个“桶(bucket)”或者“槽(slot)”会对应一条链表,所有散列值相同的元素都放到相同槽位对应的链表中

当插入的时候,只需要通过散列函数计算出对应的散列槽位,将其插入到对应链表中即可,所以插入的时间复杂度是O(1)。当查找、删除一个元素时,同样通过散列函数计算出对应的槽,然后遍历链表查找或者删除

对于查找和删除操作,时间复杂度跟链表的长度k成正比,也就是 O(k)。对于散列比较均匀的散列函数来说,理论上讲,k=n/m,其中n表示散列中数据的个数,m表示散列表中“槽”的个数

实践

假设我们有10万条URL访问日志,如何按照访问次数给URL排序?

遍历10万条数据,以URL为key,访问次数为value,存入散列表,同时记录下访问次数的最大值K,时间复杂度O(N)

如果K不是很大,可以使用桶排序,时间复杂度O(N)。如果 K 非常大(比如大于10万),就使用快速排序,复杂度O(NlogN)

由于文章篇幅的原因,代码实现,我放在了github上,需要的可以自取(GO实现)

有两个字符串数组,每个数组大约有10万条字符串,如何快速找出两个数组中相同的字符串?

以第一个字符串数组构建散列表,key 为字符串,value 为出现次数。再遍历第二个字符串数组,以字符串为 key 在散列表中查找,如果 value 大于零,说明存在相同字符串。时间复杂度 O(N)

查看原文

赞 0 收藏 0 评论 0

书旅 发布了文章 · 2020-12-03

数据结构与算法系列之跳表(GO)

详细了解跳表

前边的一篇文章中分享了二分查找算法,里边有说到二分查找算法依赖数组的随机访问特性,只能用数组来实现。如果数据存储在链表中就没法用二分查找算法了

本篇文章分享的「跳表」,可以实现类似二分的查找算法,时间复杂度也是「O(logn)」

假设有一个有序的单链表,如果想从该链表中查找某一个数据,只能从头到尾的遍历查找,它的时间复杂度是O(n)。如何提高查找的效率?

假设我像下图这样,在原来的单链表上增加一级索引,每两个节点取一个结点到「索引层」。其中down为指向下一级节点的指针

假设现在需要查找【16】这个节点。可以先遍历索引层节点,当遍历到【13】这个节点时,发现下一个节点为【17】,那么我们要找的节点【16】肯定就在它们中间。此时,通过索引节点【13】的down指针,下降到原始链表层中继续遍历,此时只需要再遍历两个节点就找到了我们要查找的【16】。如果直接在原始链表中查找【16】,需要遍历10个结点,现在只需要遍历7个结点

从上边可以发现,加了一层索引之后,查找一个节点需要遍历的节点数减少了,也就意味着查找的效率提高了,如果我们增加更多的索引层,效率会不会提高更多?

假设现在在原来的一级索引上边按同样的方法,再增加一层索引,如果再查【16】这个节点,只需要遍历6个结点了。查找一个节点需要遍历的节点又减少了

上边的例子中,原始链表的节点数量并不多,每增加一层索引,查询效率提高的并不明显,下边假设原始链表中有64个节点,建立5级索引

从图中可以看出,原来没有索引的时候,查找【62】这个节点需要遍历62个节点,现在只需要遍历11个节点,速度提高了很多。当链表的长度n比较大时,比如1000、10000的时候,在构建索引之后,查找效率的提升就会非常明显。「这种链表加多级索引的结构,就是跳表」

跳表的时间复杂度

如果链表里有n个结点,可以有多少级索引?

按照上边例子中的那种方式,每两个结点会抽出一个结点作为上一级索引的结点,那第一级索引的结点个数大约就是n/2,第二级索引的结点个数大约就是n/4,第三级索引的结点个数大约就是n/8,依次类推,也就是说,第k级索引的结点个数是第k-1级索引的结点个数的1/2,那第k级索引结点的个数就是 n/(2^k)

假设索引有h级,最高级的索引有2个结点。通过上面的公式,可以得到n/(2^h)=2,从而求得 h=log2n-1(2为底)。如果包含原始链表这一层,整个跳表的高度就是log2n(2为底)。「在跳表中查询某个数据的时候,如果每一层都要遍历m个结点,那在跳表中查询一个数据的时间复杂度就是O(m*logn)」

这个m的值是多少?按照上边的例子中这种索引结构,我们每一级索引都最多只需要遍历3个结点,也就是说m=3,解释一下为啥是3

假设要查找的数据是x,在第k级索引中,我们遍历到y结点之后,发现x大于y,小于后面的结点z,所以我们通过y的 down 指针,从第k级索引下降到第k-1 级索引。在第k-1级索引中,y和 z之间只有3个结点(包含 y 和 z),所以,我们在k-1级索引中最多只需要遍历3个结点,依次类推,每一级索引都最多只需要遍历3个结点

通过上面的分析,得到m=3,所以「在跳表中查询任意数据的时间复杂度就是O(logn)」。这个查找的时间复杂度跟二分查找是一样的。换句话说,其实是基于单链表实现了二分查找。不过,天下没有免费的午餐,这种查询效率的提升,前提是建立了很多级索引,是一种空间换时间的思路

跳表的空间复杂度分析

假设原始链表大小为n,那第一级索引大约有n/2个结点,第二级索引大约有n/4个结点,以此类推,每上升一级就减少一半,直到剩下2个结点。如果我们把每层索引的结点数写出来,就是一个等比数列

`n/2, n/4, n/8,...,8, 4, 2
`

这几级索引的结点总和就是n/2+n/4+n/8…+8+4+2=n-2。所以,「跳表的空间复杂度是 O(n)」。也就是说,如果将包含n个结点的单链表构造成跳表,我们需要额外再用接近n个结点的存储空间

有没有办法降低索引占用的内存空间呢?

降低跳表空间复杂度的方法

前面都是每两个结点抽一个结点到上级索引,如果我们每三个结点或五个结点,抽一个结点到上级索引,就不用那么多索引结点了

从图中可以看出,第一级索引需要大约n/3个结点,第二级索引需要大约n/9个结点。每往上一级,索引结点个数都除以3。为了方便计算,假设最高一级的索引结点个数是1。我们把每级索引的结点个数都写下来,也是一个等比数列

`n/3, n/9, n/27, ... , 9, 3, 1
`

通过等比数列求和公式,总的索引结点大约就是n/3+n/9+n/27+…+9+3+1=n/2。尽管空间复杂度还是 O(n),但比上面的每两个结点抽一个结点的索引构建方法,要减少了一半的索引节点存储空间

实际上,在平时开发中,不必太在意索引占用的额外空间。在数据结构和算法中,习惯性地把要处理的数据看成「整数」,但是在实际的开发中,原始链表中存储的有可能是「很大的对象」,而索引结点只需要「存储关键值」和几个指针,并不需要存储对象,「所以当对象比索引结点大很多时,那索引占用的额外空间就可以忽略了」

跳表操作

跳表是个「动态数据结构」,不仅支持查找操作,还支持动态的插入、删除操作,而且插入、删除操作的时间复杂度也是「O(logn)」

插入

在单链表中,一旦定位好要插入的位置,插入结点的时间复杂度是很低的,就是 O(1)。但是,为了保证原始链表中数据的有序性,需要先找到要插入的位置,这个查找操作就会比较耗时

对于纯粹的单链表,需要遍历每个结点,来找到插入的位置。但是,对于跳表来说,查找某个节点的时间复杂度是 O(logn),所以这里查找某个数据应该插入的位置,方法也是类似的,时间复杂度也是O(logn)

删除

「如果要删除的结点在索引中有出现,除了要删除原始链表中的结点,还要删除索引中的」。因为单链表中的删除操作需要拿到要删除结点的前驱结点,然后通过指针操作完成删除。所以在查找要删除的结点的时候,一定要获取前驱结点。当然,如果用的是双向链表,就不需要考虑这个问题了

索引动态更新

当不停地往跳表中插入数据时,如果不更新索引,就有可能出现某2个索引结点之间数据非常多的情况。极端情况下,跳表还会退化成单链表

作为一种动态数据结构,它需要某种手段来维护索引与原始链表大小之间的平衡,也就是说,如果链表中结点多了,索引结点就相应地增加一些,避免复杂度退化,以及查找、插入、删除操作性能下降

「后边会分享到的红黑树、AVL树这样的平衡二叉树,它们是通过左右旋的方式保持左右子树的大小平衡」,而跳表是通过「随机函数」来维护前面提到的“平衡性”

当我们往跳表中插入数据的时候,可以选择同时将这个数据插入到部分索引层中。如何选择加入哪些索引层,就是随机函数要干的事情

通过一个随机函数,来决定将这个结点插入到哪几级索引中,比如随机函数生成了值K,那就将这个结点添加到「第一级到第K级」这K级索引中

随机函数的选择很有讲究,从概率上来讲,能够保证跳表的索引大小和数据大小平衡性,不至于性能过度退化

跳表的应用

「为什么Redis要用跳表来实现有序集合,而不是红黑树?」

Redis中的有序集合是通过跳表来实现的,严格点讲,其实还用到了「散列表」。后边的文章会分享到散列表,所以现在暂且忽略这部分。如果你了解Redis中的有序集合,它支持的核心操作主要有下面这几个

  • 插入一个数据;
  • 删除一个数据;
  • 查找一个数据;
  • 按照区间查找数据(比如查找值在 [100, 356] 之间的数据);
  • 迭代输出有序序列

其中,插入、删除、查找以及迭代输出有序序列这几个操作,红黑树也可以完成,时间复杂度跟跳表是一样的。但是,「按照区间来查找数据这个操作,红黑树的效率没有跳表高」

对于按照区间查找数据这个操作,跳表可以做到O(logn)的时间复杂度定位区间的起点,然后在原始链表中顺序往后遍历就可以了。这样做非常高效

Redis之所以用跳表来实现有序集合,还有其他原因,比如,跳表更容易代码实现。虽然跳表的实现也不简单,但比起红黑树来说还是好懂、好写多了,而简单就意味着可读性好,不容易出错。还有,跳表更加灵活,它可以通过改变索引构建策略,有效平衡执行效率和内存消耗

不过,跳表也不能完全替代红黑树。因为红黑树比跳表的出现要早一些,很多编程语言中的Map类型都是通过红黑树来实现的。做业务开发的时候,直接拿来用就可以了,不用费劲自己去实现一个红黑树,但是跳表并没有一个现成的实现,所以在开发中,如果你想使用跳表,必须要自己实现

查看原文

赞 0 收藏 0 评论 0

书旅 发布了文章 · 2020-11-23

LeetCode069-x的平方根-easy

标签:二分
题目:x的平方根
题号:69
题干:实现 int sqrt(int x) 函数。计算并返回 x 的平方根,其中 x 是非负整数。由于返回类型是整数,结果只保留整数的部分,小数部分将被舍去

示例1:
输入: 4
输出: 2

示例2:
输入: 8
输出: 2
解释: 8的平方根是 2.82842..., 由于返回类型是整数,小数部分将被舍去

原题比较简单,因此这里做简单的变形,求x的平方根,要求可以获取指定精度的结果

示例:
输入:8 0.000001
输出:2.828427
说明:0.000001表示精确到小数点后六位

思路:

  • 要求x的平方根,最容易想到的就是从1到x,一个一个的试,这种方法的时间复杂度是O(n),这不是我们想要的
  • 二分查找算法,显然是用来查找的算法,我们的目的也算是查找。如果你看过我的上一篇二分查找的文章,里边有说到什么情况下会想到用二分查找,比如我们要查找的一系列数是有序的等,我们要求x的平方根,那肯定在[1, x]这个区间找(x>1),它显然是有序的。这自然 会想到用二分来查找
  • 使用二分查找算法,需要两个游标low和high。需要考虑他们的初始值是什么?以及当mid不满足时,low和hight如何变化?
  • 既然是求x的平方根,x的值有两种情况。第一种是x<1,这种情况,我们要求的结果肯定在[0, 1]这区间内,所以就可以初始化low = 0,hight=1,那mid就为(low+hight)/2
  • x的第二种情况就很简单了,当x>1时,它的平方根的值肯定在[1, n]区间内,所以low = 1, high=x,mid = (low+hight)/2
  • 不难想到,当x-mid*mid < 0时,说明mid太大,那此时我们就应该在[1, mid]之间找,即,此时让hight=mid-1
  • 如果x-midmid > 0,这时要考虑x-midmid是否大于我们要求的精度,如果大于这个精度,那此时low = mid+1
  • 有了以上这些条件,基本上这道题就出来了。如果你按照上边的思路把代码写出来之后会发现是有问题的,当x<1的时候程序就不能正常运行了。原因是low和high变化的时候,如果x<1的时候,结果肯定是小于1的,如果直接让high或low为mid加1或减1肯定就不对了
  • 因此,当x-mid*mid < 0时,应该让high=(mid+high)/2

代码实现


func SolutionSqrt(num float64, precision float64) float64 {
  if num < 0 {
    fmt.Println("参数不合法")
    return -1.000
  }

  var low,high float64
  if num > 1 {
    low = 1
    high = num
  } else {
    low = num
    high = 1
  }

  return Sqrt(num, low, high, precision)
}

func Sqrt(num float64, low float64, high float64, precision float64) float64 {
  mid := (low+high) / 2

  if num - (mid * mid) < 0 {
    return Sqrt(num, low, (mid+high)/2, precision)
  } else {
    if num - (mid * mid) > precision {
      return Sqrt(num, (low + mid)/2, high, precision)
    }

    return mid
  }
}
查看原文

赞 0 收藏 0 评论 0

书旅 关注了用户 · 2020-11-23

LNMPRG源码研究 @php7internal

一群热爱代码的人 研究Nginx PHP Redis Memcache Beanstalk 等源码 以及一群热爱前端的人
希望交流的朋友请加微信 289007301 注明:思否 拉到交流群,也可关注公众号:LNMPRG源码研究

《PHP7底层设计与源码分析》勘误https://segmentfault.com/a/11...

《Redis5命令设计与源码分析》https://item.jd.com/12566383....

景罗 陈雷 李乐 黄桃 施洪宝 季伟滨 闫昌 李志 王坤 肖涛 谭淼 张仕华 方波 周生政 熊浩含 张晶晶(女) 李长林 朱栋 张晶晶(男) 陈朝飞 巨振声 杨晓伟 闫小坤 韩鹏 夏达 周睿 李仲伟 张根红 景罗 欧阳 孙伟 李德 twosee

关注 11608

书旅 赞了文章 · 2020-11-23

【PHP7底层设计与源码分析】部分勘误

1、序

clipboard.png

zal 改为 zval
2、33页
从图3-1中我们看出,虽然char a只占了1字节,int b只占了4字节,但是long c并不是紧跟着b,而是根据8字节对齐后,c和b之间空了3字节
改为
从图3-1中我们看出,虽然char a只占了1字节,int b只占了4字节,但是b并不是紧跟着a,而是根据8字节对齐后,a和b之间空了3字节

3、图4-6 动态字符串赋值后$a 与 $b 关系图
更正为:

clipboard.png

4、图4-7 常量字符串赋值后$a 与 $b 关系图
更正为:

clipboard.png

5、图4-9 引用类型$a 与 $b 关系图
更正为:

clipboard.png

6、图4-10 copy on write过程示意图
更正为:

clipboard.png

7、图4-11 整形转成字符串
更正为:

clipboard.png

8、图4-13 opcode组装中字符串处理示意图
更正为:

clipboard.png

9、图9-4替换为下图:

clipboard.png

10、58页 图3-17下面的代码修改为: 
代码更正为
for($i = 0; $i <= 10002; $i++){
$a[$i] = array($i."_string");
$a[$i][] = &$a[$i];
unset($a[$i]);
}

11、图3-4 PHP5中_zval_struct的大小
更正为:


clipboard.png

12、图3-5 PHP5中_zval_struct实际大小
更正为:


clipboard.png

13、图3-6 PHP5中变量实际占用的内存大小
更正为:


clipboard.png

14、图3-16 gc_globals的结构
更正为:


clipboard.png
15、4.2.2节 示例2代码有一处错误,更改前为:
图片描述
https://segmentfault.com/img/...
clipboard.png

更改后为:
图片描述
https://segmentfault.com/img/...
clipboard.png

16、
120页和122页代码修改为:

for($i=0;$i<4;$i++){
   $arr[$i] = 1;//packed array
}

以下是读者赵禹反馈, 感谢赵禹!

17、第4章 字符串:页码83页 php_request_shutdown方法名写成了 php_request_shotdow。

18、第6章 面向对象 : 页码138页,6.1.3接口中接口类可以通过extends继承,写成了 extend继承。

以下是读者Rai4over反馈:
19、 第108页,示例代码为:

$arr[] = 'foo';

改为

$a[] = 'foo';

感谢读者Rai4over

查看原文

赞 17 收藏 10 评论 20

书旅 赞了文章 · 2020-11-18

Nginx源码入门指南

李乐

引子

  我们为什么需要学习Nginx呢?高性能,高稳定,优雅的模块化编程等就不提了,就说一个理由:Nginx是目前最受欢迎的web服务器,据统计,全球平均每3个网站,就有一个使用Nginx。如果你不懂Nginx,日常很多工作可能都无法开展。

  比如,最近看到过这么一句话:"Nginx master进程接收到客户端请求,转发给worker进程处理"。如果你不懂Nginx,就有可能也闹出类似的笑话。

  比如,当你需要搭建一套webserver时,可能基本的一些配置都一头雾水。location匹配规则你清楚吗,正则匹配、最大前缀匹配和精确匹配的顺序以及优先级你能记得住吗?反向代理proxy_pass以及fastcgi_pass你知道怎么配置吗?限流负载均衡等基本功能又怎么配置呢?一大堆配置有时真的让人脑仁疼。

  比如,线上环境Nginx曾出现"no live upstreams"。顾名思义,Nginx认为所有上游节点都挂掉了,此时Nginx直接向客户端返回502,而不会请求上游节点。这时候你该想想,Nginx是依据什么判断上游节点都挂掉了?出现这种错误后,Nginx又是怎么恢复的呢?

  再比如,线上环境高峰期Nginx还出现过这种错误:"Resource temporarily unavailable",对应的错误码是EAGAIN。非阻塞读写socket遇到EAGAIN不是通常稍等再尝试吗?其实通过源码里的一句注释就能瞬间明白了:"Linux returns EAGAIN instead of ECONNREFUSED for unix sockets if listen queue is full"。原来如此,Nginx和FPM之间是通过域套接字建立连接的,监听队列满了,系统直接返回的是EAGAIN,而不是我们平时了解的ECONNREFUSED;而Nginx在发起连接connect时,如果返回EAGAIN直接结束请求返回502。

  本篇文章旨在让你对Nginx能够有个系统的认识,了解其核心功能的实现思路,以及如何切入Nginx源码的学习。在遇到问题时,至少让你直到可以去哪寻找你想要的答案。主要涉及以下几个方面:

  • 如何开始Nginx源码的学习;
  • 模块化编程;
  • master与worker进程模型;
  • Nginx事件驱动模型;
  • HTTP处理流程之11个阶段;
  • location匹配规则;
  • upstream与负载均衡;
  • proxy_pass;
  • fastcgi_pass与FPM;
  • 限流;
  • 案例分析:502问题分析。

如何开始Nginx源码的学习

  作为一名初学者,如何去上手阅读Nginx源码呢?这还不简单,从main方法入手,一行一行看呗。如何你这么做了,也坚持了一段时间,我给你点个赞,至少我是做不到的。不过,我相信大部分人是坚持不了几天的。数十万行Nginx源码,岂是短时间就能研究透的?如果长时间都没有取得明显成效,大多数人都会选择放弃吧。

  我一般是怎么阅读源码的呢?

  1)动手GDB,动手GDB,动手GDB;重要的话说三遍;

  逻辑比较晦涩,各种判断分支太复杂,回调handler不知道是什么。GDB调试其实真的很简单,b打断点,p命令看变量名称,bt命令看调用栈,c继续执行至下一个断点,n执行到下一行。笔者一般常用的也就这几个命令。

  2)带着问题去阅读,最好能带着答案去阅读。

  带着问题去阅读,就有了一条主线,只需要关注你需要关注的。比如Nginx是一个事件驱动程序,那么第一个问题就是去探索他的事件循环,事件循环中无非就是通过epoll_wait()等待事件的发生,然后执行事件回调handler。其余逻辑都可不必过多关注。

  有了答案,你就能更容易的切入到源码中,去探索他的实现思路。去哪里寻找答案呢?官网的开发者指南就比较详细的介绍了Nginx诸多功能的实现细节 ,包括代码布局介绍,基本数据结构介绍,事件驱动模型介绍,
HTTP处理流程基本概念介绍等等。通过这些介绍我们就能得到一些问题的答案。比如,事件循环的切入点是ngx_process_events_and_timers()函数,那你是不是就能在这个函数打断点,跟踪事件循环执行链路(http://nginx.org/en/docs/dev/...)。

  配置不明白,也可以查看官方文档 ,注意配置是按照模块分类的。我们以配置"keepalive_timeout"为例,官方文档有很清楚的介绍,http://nginx.org/en/docs/http...

  英文难以阅读怎么办?还是尽量阅读官方文档吧,更新及时,准确性高。或者,Nginx作为目前使用最广泛的web服务器,网络上相关博客文档也是非常多的,搜索"Nginx 事件循环",立刻就能得到你想要的答案。不过需要注意的是,网络上找到的答案,无法保证正确性,最好自己验证下。

  3)从点,到线,再到面,再到点

  同样的以事件循环为例,你从官方文档或者博客得到切入点是函数ngx_process_events_and_timers(),只有这一点信息怎么办?全局搜索代码,查看该函数的调用链路,比如我通过understand可以很容易得出其调用链,如下图:

image

  从一个切入点,你就能得到一条执行链路;不断的去探索新的问题,逐步你就能掌握了整个系统。

  待你对整个系统有了一定了解,还需要再度回归到具体的点上。毕竟,第一阶段阅读源码时,由于整体的掌握度不够,跳过了很多实现细节。

  比如,事件结构体ngx_event_s包括数十个标识类字段,当初都没有深究具体含义;比如,当我配置了N多个location匹配规则时,Nginx是从头到尾一个个遍历匹配吗?效率是不是优点低呢?比如,Nginx的多进程模型,master进程是如何管理以及监控work进程的,Nginx的平滑升级又是怎么实现的?进程间通信以及信号处理你了解吗?再比如,Nginx通过锁来解决多个worker的惊群效应,那么锁的实现原理是什么呢?

模块化编程

  在学习Nginx源码之前,最好了解一下其模块块编程思想。一个模块实现一个小小的功能,所有模块组合成了强大的Nginx。比如下表几个功能模块:

模称功能
ngx_epoll_module基于epoll的事件处理模块
ngx_http_limit_req_module按请求qps限流
ngx_http_proxy_module按HTTP协议转发请求到上游
ngx_http_fastcgi_module按fastcgi协议转发请求到上游
ngx_http_upstream_ip_hash_moduleiphsh负载均衡
…………

  Nginx模块被划分为几大类,比如核心模块NGX_CORE_MODULE,事件模块NGX_EVENT_MODULE,HTTP模块NGX_HTTP_MODULE。结构体ngx_module_s定义了Nginx模块,我们重点需要关注这几个(类)字段:

  1. 钩子函数,比如init_master/exit_master在master进程启动/退出时回调;init_process/exit_process在work进程启动/退出时回调;init_module在模块初始化时候调用;
  2. 指令数组commands,其定义了该模块可以解析哪些配置;这个很好理解,功能由各个模块实现,与功能对应的配置也应该由各个模块解析处理;
  3. 模块上下文ctx,查看源码的话你会发现其类型为void*,那是因为不同类型的模块ctx定义不一样。ctx结构通常都定义了配置创建以及初始化回调;另外,事件模块还会定义事件处理(添加事件,修改事件,删除事件,事件循环)回调;HTTP模块还定义了HTTP处理流程的回调,这些回调会在HTTP流程 11个执行阶段调用。

  总结一句话,Nginx的框架已定义,主流程已知,各个模块只需要实现并注册流程中的回调即可,Nginx会在合适的时机执行这些回调。

  最后再来一副脑图简单列一下重点知识,还需要读者去进一步研究探索:

image

master/worker进程模型

  在讲解master/worker进程模型之前,我们先思考这么一个问题:假如配置worker_processes=1(work进程数目),执行下面几条命令后,Nginx还能否正常提供服务。

//master_pid即master进程id,work_pid即work进程pid

kill master_pid
kill -9 master_pid
kill work_pid
kill -9 work_pid

  注意,kill默认 发送SIGTERM(15)信号,用于通知进程需要被关闭,目标进程可以捕获该信号并做相应清理任务后退出;kill - 9表示强制杀死该进程,信号不能被捕获或忽略,同时接收该信号的进程在收到这个信号时不能执行任何清理。

  好了,请短暂的思考一分钟,或者自己动手验证下。下面我们揭晓答案:

------ 实验一:kill master_pid -----------
#ps aux | grep nginx
root      2314  0.0  0.0  25540  1472 ?        Ss   Aug18   0:00 nginx: master process /nginx/nginx-1.15.0/output/sbin/nginx-c /nginx/nginx-1.15.0/conf/nginx.conf
nobody   15243  0.0  0.0  31876  5880 ?        S    22:40   0:00 nginx: worker process

#kill 2314

#curl http://127.0.0.1/test -H "Host:proxypass.test.com"
curl: (7) Failed to connect to 127.0.0.1 port 80: Connection refused

#ps aux | grep nginx
//空,没有进程输出

------ 实验二:kill -9 master_pid -----------
# ps aux | grep nginx
root     15911  0.0  0.0  20544   680 ?        Ss   22:50   0:00 nginx: master process /nginx/nginx-1.15.0/output/sbin/nginx-c /nginx/nginx-1.15.0/conf/nginx.conf
nobody   15914  0.0  0.0  26880  5156 ?        S    22:50   0:00 nginx: worker process

kill -9 15911

# curl http://127.0.0.1/test -H "Host:proxypass.test.com"
hello world

#ps aux | grep nginx
nobody   15914  0.0  0.0  26880  5652 ?        S    22:50   0:00 nginx: worker process

------ 实验三:kill work_pid -----------
# ps aux | grep nginx
root     15995  0.0  0.0  20544   676 ?        Ss   22:52   0:00 nginx: master process /nginx/nginx-1.15.0/output/sbin/nginx-c /nginx/nginx-1.15.0/conf/nginx.conf
nobody   15997  0.0  0.0  26880  5152 ?        S    22:52   0:00 nginx: worker process

# kill 15997

# curl http://127.0.0.1/test -H "Host:proxypass.test.com"
hello world

# ps aux | grep nginx
root     15995  0.0  0.0  20544   860 ?        Ss   22:52   0:00 nginx: master process /nginx/nginx-1.15.0/output/sbin/nginx-c /nginx/nginx-1.15.0/conf/nginx.conf
nobody   16021  0.0  0.0  26880  5648 ?        S    22:52   0:00 nginx: worker process

------ 实验四:kill -9 work_pid -----------
# ps aux | grep nginx
root     15995  0.0  0.0  20544   860 ?        Ss   22:52   0:00 nginx: master process /nginx/nginx-1.15.0/output/sbin/nginx-c /nginx/nginx-1.15.0/conf/nginx.conf
nobody   16021  0.0  0.0  26880  5648 ?        S    22:52   0:00 nginx: worker process

# kill -9 16021

# curl http://127.0.0.1/test -H "Host:proxypass.test.com"
hello world

# ps aux | grep nginx
root     15995  0.0  0.0  20544   860 ?        Ss   22:52   0:00 nginx: master process /nginx/nginx-1.15.0/output/sbin/nginx-c /nginx/nginx-1.15.0/conf/nginx.conf
nobody   16076  0.0  0.0  26880  5648 ?        S    22:54   0:00 nginx: worker process
  • 实验一:kill master_pid,结果是Connection refused,只因master和work进程都退出了,没有进程监听80端口;kill的是master进程,为什么work进程也退出了呢?其实是master进程让work进程退出的。
  • 实验二:kill -9 master_pid,Nginx还能正常提供HTTP服务,此时master进程退出,work进程还在;到这里就能说明,HTTP请求由work进程处理,与master进程无关;另外,为什么这里master进程没有让work进程退出呢?这就是kill -9的特点了,master进程不能捕获该信号做清理工作;
  • 实验三:kill work_pid,发现Nginx还是能正常提供HTTP服务,而且work进程竟然还健在?再仔细看看,kill之后的work进程是16021,之前是15997;原来是Nginx启动了新的work进程,其实是master进程在监听到work进程退出后,会拉起新的work进程;
  • 实验四:kill -9 work_pid同实验三。

  通过上面的四个实验,我们可以初步得到以下结论:1)master进程在监听到work进程退出后,会拉起新的进程,而master进程在退出时(kill方式),会销毁所有work进程;其实就一句话,master进程主要用于管理work进程。2)work进程用于接收并处理HTTP请求。

  下面我们将简要介绍master/work进程处理流程。

  master进程被fork后,会执行主处理函数ngx_master_process_cycle函数,主要工作如下图所示:

image

  注意我们上面的描述『master进程被fork后』,被谁fork呢?其实这是标准的daemon守护进程启动方式:两次fork+setsid。

  可以看到,master进程在主循环中等待信号的到达,信号处理函数为ngx_signal_handler。另外需要清楚的是,子进程在退出时,会向父进程发送信号SIGCHLD;master进程就是通过该信号来监听work进程的异常退出,从而拉起新的work进程。

  最后还有一个问题,master进程在接收到退出信号时,如何告知work进程退出呢?这里可以使用信号吗?其实这里是通过socketpair向work进程发送消息的。至于为什么不用信号呢,就和work进程事件处理框架有关了。

  最后总结下,操作Nginx时,常用的信号如下列表:

信号Nginx内置shell说明
SIGUSR1nginx -s reopen重新打开文件,可配合mv实现日志切割
SIGHUPnginx -s reload重新加载配置
SIGQUITnginx -s quit平滑退出
SIGTERMnginx -s stop强制退出
SIGUSR2 + SIGWINCH + SIGQUIT平滑升级二进制程序

  work进程用于接收并处理HTTP请求,主函数为ngx_worker_process_cycle。需要注意,master进程创建socket并启动监听,work进程只是将listen_fd加入到自己的监听列表(epoll_ctl)。问题来了,如果多个work进程同时监听listen_fd的可读事件,新的连接请求到达时,Linux内核会唤醒哪个work进程并交由其处理呢?这就是所谓的『惊群』效应了。

  Nginx是通过『锁』实现的,work进程在监听listen_fd的可读事件之前,需要获取到锁;没有锁,work进程会将listen_fd从自己的监听列表中移除。读者可以阅读函数ngx_process_events_and_timers(事件循环主函数),
加锁函数为为ngx_trylock_accept_mutex,基于共享内存 + 原子操作实现。

  在加锁的时候,Nginx还有一些负载均衡策略;每个work进程启动的时候,都会初始化若干ngx_connection_t结构,连接数可通过worker_connections配置,如果当前work进程的空闲连接数小于总数的1/8,则会在近几次事件循环中不获取锁。

  新版本Linux内核已经解决了惊群效应,不需要加锁了;是否加锁也可以通过accept_mutex配置,官方解释如下:

There is no need to enable accept_mutex on systems that support the EPOLLEXCLUSIVE flag (1.11.3) or when using reuseport.

  好了,到这里你对master/worker进程模型也有了一定了解了,我们可以简单画个示意图:

image

  最后,脑图奉上:

image

Nginx事件驱动模型

  Nginx作为webserver,就是接收客户端请求,转发到上游,再转发上游响应结果给客户端,其中必然伴随着大量的IO交互,没有一个高效的IO复用模型如何能行?另外在等待客户端请求,等待上游响应时,通常还伴随着一些超时事件(时间事件)。

  我们所说的事件驱动,就是指IO事件以及时间事件驱动,没有事件时候服务通常会阻塞等待事件的发生。不止是Nginx,一般事件驱动程序都是如下类似的套路:

for {
    //查找最近的时间事件timer
    
    //epoll_wait等待IO事件的发生(最大等待时间timer)
    
    //处理IO事件
    
    //处理时间事件
}

  1)查找最近的时间事件:一般肯定存在N多个时间事件,那么这些时间事件最好是按照触发时间排好序的,不然每次都需要遍历。通常会选择红黑树或者最小堆实现。

  2)等待IO事件的发生:目前都是基于IO多路复用模型,比如Linux系统使用epoll实现;对于epoll,最好研究下其红黑树+双向链表+水平触发/边缘触发。epoll的使用还是非常简单的,就3个API:

//创建epoll对象
int epoll_create(int size);

//往epoll添加需要监听的fd(这时候就依赖红黑树了)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

//阻塞等待事件发生,最大等待timeout时间
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

  上一小节讲过,worker进程的主函数是ngx_worker_process_cycle,事件循环主函数是ngx_process_events_and_timers,从这两个函数中很容易找到其事件循环逻辑。

static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data){
    for ( ;; ) {
        //事件循环
        ngx_process_events_and_timers(cycle);
        
        //信号处理,如reopen,quit等
    }
}

void ngx_process_events_and_timers(ngx_cycle_t *cycle){
    //查找最近的时间事件
   timer = ngx_event_find_timer();
   
   //抢锁,成功后才监听listen_fd
   //(参照函数ngx_enable_accept_events,基于epoll_ctl将listen_fd加入epoll)
   ngx_trylock_accept_mutex(cycle)
   
   //阻塞等待IO事件的发生,底层基于epoll_wait实现
   (void) ngx_process_events(cycle, timer, flags);
   
   //accept处理连接事件
   ngx_event_process_posted(cycle, &ngx_posted_accept_events);
   
   //处理时间事件
   ngx_event_expire_timers();
   
   //处理普通读写事件
   ngx_event_process_posted(cycle, &ngx_posted_events);
}

  事件循环我们有一定了解了,还需要关注几个核心结构体,比如连接对象ngx_connection_t,事件对象ngx_event_t。worker进程在启动时刻,就会创建若干个连接对象以及事件对象,创建的数目可通过worker_connections配置。连接对象与事件对象示意图如下:

image

  最后,脑图奉上:

image

HTTP处理流程之11个阶段

  Nginx处理客户端连接请求事件的handler是ngx_event_accept,同时对于listen_fd,还设置有连接初始化函数ngx_http_init_connection,该函数开始了HTTP请求的解析工作:

void ngx_http_init_connection(ngx_connection_t *c)
{
    c->read = ngx_http_wait_request_handler;
    c->write->handler = ngx_http_empty_handler;
 
    ngx_add_timer(rev, c->listening->post_accept_timeout);
}

  解析HTTP请求的处理流程如下图所示:

image

  下面就开始处理HTTP请求了。Nginx将HTTP请求处理流程分为11个阶段,绝大多数HTTP模块都会将自己的handler添加到某个阶段(将handler添加到全局唯一的数组phases中),nginx处理HTTP请求时会挨个调用每个阶段的handler。需要注意的是其中有4个阶段不能添加自定义handler。11个阶段定义如下:

typedef enum {
    NGX_HTTP_POST_READ_PHASE = 0, 
  
    NGX_HTTP_SERVER_REWRITE_PHASE,  //server块中配置了rewrite指令,重写url
  
    NGX_HTTP_FIND_CONFIG_PHASE,   //查找匹配的location配置;不能自定义handler;
    NGX_HTTP_REWRITE_PHASE,       //location块中配置了rewrite指令,重写url
    NGX_HTTP_POST_REWRITE_PHASE,  //检查是否发生了url重写,如果有,重新回到FIND_CONFIG阶段;不能自定义handler;
  
    NGX_HTTP_PREACCESS_PHASE,     //访问控制,比如限流模块会注册handler到此阶段
  
    NGX_HTTP_ACCESS_PHASE,        //访问权限控制,比如基于ip黑白名单的权限控制,基于用户名密码的权限控制等
    NGX_HTTP_POST_ACCESS_PHASE,   //根据访问权限控制阶段做相应处理;不能自定义handler;
  
    NGX_HTTP_TRY_FILES_PHASE,     //只有配置了try_files指令,才会有此阶段;不能自定义handler;
    NGX_HTTP_CONTENT_PHASE,       //内容产生阶段,返回响应给客户端
  
    NGX_HTTP_LOG_PHASE            //日志记录
} ngx_http_phases;
  • NGX_HTTP_POST_READ_PHASE:第一个阶段,ngx_http_realip_module模块会注册handler到该阶段(nginx作为代理服务器时有用,后端以此获取客户端原始IP),而该模块默认不会开启,需要通过--with-http_realip_module启动;
  • NGX_HTTP_SERVER_REWRITE_PHASE:server块中配置了rewrite指令时,该阶段会重写url;
  • NGX_HTTP_FIND_CONFIG_PHASE:查找匹配的location配置;该阶段不能自定义handler;
  • NGX_HTTP_REWRITE_PHASE:location块中配置了rewrite指令时,该阶段会重写url;
  • NGX_HTTP_POST_REWRITE_PHASE:该阶段会检查是否发生了url重写,如果有,重新回到FIND_CONFIG阶段,否则直接进入下一个阶段;该阶段不能自定义handler;
  • NGX_HTTP_PREACCESS_PHASE:访问控制,比如限流模块ngx_http_limit_req_module会注册handler到该阶段;
  • NGX_HTTP_ACCESS_PHASE:访问权限控制,比如基于ip黑白名单的权限控制,基于用户名密码的权限控制等;
  • NGX_HTTP_POST_ACCESS_PHASE:该阶段会根据访问权限控制阶段做相应处理,不能自定义handler;
  • NGX_HTTP_TRY_FILES_PHASE:只有配置了try_files指令,才会有此阶段,不能自定义handler;
  • NGX_HTTP_CONTENT_PHASE:内容产生阶段,用于产生响应结果;ngx_http_fastcgi_module模块就处于该阶段;
  • NGX_HTTP_LOG_PHASE:该阶段会记录日志;

  HTTP模块通常都有回调postconfiguration,用于注册本模块的handler到某个处理阶段,Nginx在解析完成http配置块后,会遍历所有HTTP模块并注册handler到相应阶段,后续处理HTTP请求时只需遍历执行该所有handler即可。

  注意,这里的图是二维数组,后续还会将其转化微一维数组,便于遍历执行所有handler。

image

  在这11个阶段里,我们需要重点关注内容产生阶段NGX_HTTP_CONTENT_PHASE,这是HTTP请求处理的第10个阶段,用于产生响应结果;一般情况有3个模块注册handler到此阶段:ngx_http_static_module、ngx_http_autoindex_module和ngx_http_index_module。

  但是当我们配置了proxy_pass和fastcgi_pass时,执行流程会有所不同。此时会设置请求的回调content_handler,当Nginx执行到内容产生阶段时,如果content_handler不为空,则执行此回调,不再执行其他handler。

ngx_int_t ngx_http_core_content_phase(ngx_http_request_t *r,
    ngx_http_phase_handler_t *ph)
{
    if (r->content_handler) {  //如果请求对象的content_handler字段不为空,则调用
        r->write_event_handler = ngx_http_request_empty_handler;
        ngx_http_finalize_request(r, r->content_handler(r));
        return NGX_OK;
    }
 
    rc = ph->handler(r);  //否则执行内容产生阶段handler
}

  HTTP处理流程就简单介绍到这里,还有很多细节需要读者继续探索。最后,脑图奉上。

image

location匹配规则

  location用于匹配特定的请求URI,配置解析见文档http://nginx.org/en/docs/http...。基本语法为:

location [=|~|~*|^~] uri{……}
location @name { ... }

其中:

  • “=”用于定义精确匹配规则,请求URI与配置的uri模式完全匹配才能生效;
  • “ ~ ”和“ ~* ”分别定义区分大小写的正则匹配规则和不区分大小写的正则匹配规则,正则匹配成功时,立即结束location查找过程;
  • “^~”用于定义最大前缀匹配规则,该类型location即使匹配成功也不会结束location查找过程,依然会查找匹配长度更长的location。另外,只包含uri的location依然为最大前缀匹配。
  • “@”用于定义命令location,此类型location不能匹配常规客户端请求,只能用于内部请求重定向。

  以“ ^~ ”开始的匹配模式与只包含uri的匹配模式都表示最大前缀匹配规则,这两者有什么区别呢?以“ ^~ ”开始的location在匹配成功时,不会再执行后续的正则匹配,直接选择该location配置。只包含uri的location在匹配成功时,依然会执行后续的正则匹配,只有当正则匹配不成功时,才会选择该location;否则,会选择正则类型location。

  另外,通常使用“location /”用于定义通用匹配,任何未匹配到其他location的请求都会匹配到。如果某请求URI未匹配到任何location,Nginx会返回404。

  每个虚拟server都可以配置多个location块;客户端请求到达时,需要遍历所有location,检测请求URI是否与location配置相匹配。那么当location配置数目较多时,匹配效率如何保障?遍历方式显然是不可行的。解析完成location配置时,多个location的存储结构是双向链表,该结构需要进行再处理,优化为查找匹配效率更高的结构。

  思考下,正则匹配只能逐个遍历,没有更优的查找匹配算法或数据结构;因此,所有正则匹配的location不需要特殊处理,只是从双向链表中裁剪出来,另外存储在regex_locations数组。

  双向链表中最后只剩下精确匹配与最大前缀匹配,该类型location查找只能基于字符串匹配。Nginx将剩余的这些location存储为树形结构(三叉树),每个节点node都有三个子节点,left、tree和right。left小于node;right大于node;tree与node前缀相同,且tree节点的uri长度大于node节点的uri长度。三叉树的转换过程由函数ngx_http_init_static_location_trees实现,这里不做过多介绍。三叉树结构类似于下图所示:

image

  location匹配过程处于HTTP处理流程第3阶段NGX_HTTP_FIND_CONFIG_PHASE,location匹配逻辑由函数ngx_http_core_find_location实现,有兴趣的读者可以查看学习。

  最后,脑图奉上:

image

upstream与负载均衡

  Nginx反向代理是其最重要最常用的功能:Nginx转发客户端请求到上游服务,接手上游服务响应并转发给客户端。而upstream使得Nginx可以成为一台反向代理服务器,并且upstream还提供了负载均衡能力,可以将请求按照某种策略均匀的分发到上游服务。

  提到upstream,就不得不提一个很重要的结构体ngx_http_upstream_t,该结构主要包括Nginx与上游的连接对象,读写事件,缓冲区buffer,请求创建/请求处理等回调函数,等等,如下所示:

struct ngx_http_upstream_s {
    //读写事件处理handler
    ngx_http_upstream_handler_pt     read_event_handler;
    ngx_http_upstream_handler_pt     write_event_handler;

    //该结构封装了连接获取/释放handler,不同负载均衡策略对应不同handler
    ngx_peer_connection_t            peer;

   //各种缓冲区buffer

   //回调handler:创建请求,处理上游返回头等的回调。
   //proxypass与fastcgipass都实现了这些回调
    ngx_int_t                      (*create_request)(ngx_http_request_t *r);
    ngx_int_t                      (*reinit_request)(ngx_http_request_t *r);
    ngx_int_t                      (*process_header)(ngx_http_request_t *r);
    void                           (*abort_request)(ngx_http_request_t *r);
    void                           (*finalize_request)(ngx_http_request_t *r,
                                         ngx_int_t rc);
    ngx_int_t                      (*rewrite_redirect)(ngx_http_request_t *r,
                                         ngx_table_elt_t *h, size_t prefix);
    ngx_int_t                      (*rewrite_cookie)(ngx_http_request_t *r,
                                         ngx_table_elt_t *h);
};

  upstream流程主要包含的步骤以及处理函数如下表:

步骤处理函数
初始化upstreamngx_http_upstream_create()、 ngx_http_upstream_init()
与上游建立连接ngx_http_upstream_connect()
发送请求到上游ngx_http_upstream_send_request()
处理上游响应头ngx_http_upstream_process_header()
处理上游响应体(边接收边转发)ngx_http_upstream_send_response()
结束请求ngx_http_upstream_finalize_reques()
可能的重试ngx_http_upstream_next()

  针对upstream处理流程,我们需要思考以下几个问题:

  • proxypass与fastcgipass处理回调是在什么时候设置的?我们已proxypass为例,在配置proxy_pass指令后,内容产生阶段处理函数为ngx_http_proxy_handler;该处理函数开启了upstream的主流程,创建并初始化upstream,同时设置了请求处理回调。
  • 负载均衡对应的处理回调是在什么时候确定的?同样的,也是在解析配置文件的时候,根据不同的配置指令,初始化不同的负载均衡策略。

  默认的负载均衡策略为加权轮询,其初始化函数为ngx_http_upstream_init_round_robin;我们也可以通过配置指令设置指定的负载均衡策略,如下表:

指令初始化函数
默认ngx_http_upstream_init_round_robin
hash keyngx_http_upstream_init_hash
ip_hashngx_http_upstream_init_ip_hash
least_connngx_http_upstream_init_least_conn
  • Nginx在转发请求包体到上游服务时候,

是需要接收到完整的请求体之后转发,还是可以边接收边转发呢?可以通过指令proxy_request_buffering配置:

Syntax:    proxy_request_buffering on | off;
Default:    
proxy_request_buffering on;
Context:    http, server, location
This directive appeared in version 1.7.11.

When buffering is enabled, the entire request body is read from the client before sending the request to a proxied server.

When buffering is disabled, the request body is sent to the proxied server immediately as it is received. In this case, the request cannot be passed to the next server if nginx already started sending the request body.

  这里要特别注意,当proxy_request_buffering设置为off时,如果请求转发给上游出错(或者上游处理错误等),该请求将不能再转发给其他上游进行重试。因为Nginx没有缓存请求体。

  • Nginx处理完上游响应头部后,就可以开始将返回结果转发给客户端,并不需要等到完整接收上游响应体,只需要边接受响应体,边返回给客户端即可。详细的逻辑可以参考函数ngx_http_upstream_send_response()。
  • Nginx在某个上游服务器不可用的情况下,可以重新选择一个上游服务器,并建立连接,将请求转发给新的上游服务器。具体在这几个阶段出现错误时,都可以进行重试:1)连接upstream失败;2)发送请求到上游服务器失败;3)处理上游响应头失败。需要注意,如果Nginx已经开始处理上游响应体,此时出现错误,则会直接结束这次与上游服务器的交互,并结束这次请求,不会再选择新的上游服务器重试;这里是因为Nginx可能已经发送了部分数据到客户端。

  另外还需要关注这两个配置,用于配置在什么错误下重试,以及重试次数:

Syntax:    proxy_next_upstream_tries number;
Default:    
proxy_next_upstream_tries 0;
Context:    http, server, location
This directive appeared in version 1.7.5.

Limits the number of possible tries for passing a request to the next server. The 0 value turns off this limitation.


Syntax:    proxy_next_upstream error | timeout | invalid_header | http_500 | http_502 | http_503 | http_504 | http_403 | http_404 | http_429 | non_idempotent | off ...;
Default:    
proxy_next_upstream error timeout;
Context:    http, server, location

Specifies in which cases a request should be passed to the next server:
  • 线上环境,Nginx与上游服务器通常建立的是长连接,与长连接紧密相关的有三个配置:1)keepalive connections,限制最大空闲连接数(不是最大可建立的连接数),高并发情况下实际建立的连接数可能比这多;2)keepalive_requests number,Nginx版本需高于1.15.3,每个连接上最多可处理请求数;3)keepalive_timeout timeout,Nginx版本需高于1.15.3,空闲连接超时时间。

  长连接由模块ngx_http_upstream_keepalive_module实现,配置后会替换上面介绍的负载均衡初始化函数为ngx_http_upstream_init_keepalive_peer。

  • 将Nginx作为反向代理服务器时,还需要注意与上游服务器交互的一些超时配置;默认的超时时间是60秒,对于大部分业务来说都是过长的。次如:
Syntax:    proxy_connect_timeout time;
Default: proxy_connect_timeout 60s;

Syntax:    proxy_send_timeout time;
Default: proxy_send_timeout 60s;

Syntax:    proxy_read_timeout time;
Default: proxy_read_timeout 60s;

  曾经线上环境就看到过一起非常不合理的配置,超时时间都是60秒,重试次数proxy_next_upstream_tries不限制。业务高峰期出现突发慢请求,处理时间超过60秒,网关记录HTTP状态504;同时又选择下一台上游服务器重试,直到遍历完所有的上游服务器为止。最终这个请求的响应时间为900秒!

  • 最后不得不提的是Nginx的上游服务器剔除机制;当同一台上游服务器失败次数过多时,Nginx会短暂认为该上游服务器不可用,在一定时间内不会再将请求转发到该上游服务器。该策略由两个配置决定(详情:http://nginx.org/en/docs/http...):
Syntax:    server address [parameters];
Default:    —
Context:    upstream

max_fails=number
sets the number of unsuccessful attempts to communicate with the server that should happen in the duration set by the fail_timeout parameter to consider the server unavailable for a duration also set by the fail_timeout parameter. 
By default, the number of unsuccessful attempts is set to 1. 
The zero value disables the accounting of attempts. 
What is considered an unsuccessful attempt is defined by the proxy_next_upstream, fastcgi_next_upstream, uwsgi_next_upstream, scgi_next_upstream, memcached_next_upstream, and grpc_next_upstream directives.

fail_timeout=time
sets the time during which the specified number of unsuccessful attempts to communicate with the server should happen to consider the server unavailable;
and the period of time the server will be considered unavailable.
By default, the parameter is set to 10 seconds.

  需要特别注意,一旦Nginx认为所有上游服务器都不可用,在接收到客户端请求时,会直接返回502,不会尝试请求任何一台上游服务起。此时Nginx会记录日志"no live upstreams"。所以,线上环境,最好将非核心或者一些慢接口隔离到不同的upstream,以防这些接口的错误影响其他核心接口。

  upstream的主流程以及一些注意点上面也介绍了,对于想了解一些实现细节的,可以参照下面的脑图,去探索源码:

image

fastcgi_pass与FPM

  当我们配置了fastcgi_pass指令后,Nginx会将请求转发给上游FPM处理。fastcgi协议用于Nginx与FPM之间的交互,不同于HTTP协议(以"\r\n"作为分隔符解析),fastcgi协议在发送请求之前,先发送固定结构的头部信息,包含该请求数据的类型以及长度等等。

  fastcgi协议消息头定义如下:

typedef struct {
    u_char  version; //FastCGI协议版本
    u_char  type;    //消息类型
    u_char  request_id_hi; //请求ID
    u_char  request_id_lo;
    u_char  content_length_hi; //内容长度
    u_char  content_length_lo;
    u_char  padding_length;    //内容填充长度
    u_char  reserved;          //保留
} ngx_http_fastcgi_header_t;

  Nginx对fastcgi消息类型定义如下:

#define NGX_HTTP_FASTCGI_BEGIN_REQUEST  1
#define NGX_HTTP_FASTCGI_ABORT_REQUEST  2
#define NGX_HTTP_FASTCGI_END_REQUEST    3
#define NGX_HTTP_FASTCGI_PARAMS         4
#define NGX_HTTP_FASTCGI_STDIN          5
#define NGX_HTTP_FASTCGI_STDOUT         6
#define NGX_HTTP_FASTCGI_STDERR         7
#define NGX_HTTP_FASTCGI_DATA           8

  一般情况下,最先发送的是BEGIN_REQUEST类型的消息,然后是PARAMS和STDIN类型的消息;当FastCGI响应处理完后,将发送STDOUT和STDERR类型的消息,最后以END_REQUEST表示请求的结束。

  fastcgi协议的请求与响应结构示意图如下:

image

  配置fastcgi_pass指令后,会设置内容产生阶段处理函数为ngx_http_fastcgi_handler;函数ngx_http_fastcgi_create_request创建fastcgi请求;
函数ngx_http_fastcgi_process_record解析FPM的响应结果。

  我们可以GDB调试打印输出/输入数据,便于更好的理解fastcgi协议。添加断点如下:

Num     Type           Disp Enb Address            What
1       breakpoint     keep y   0x0000000000418f05 in ngx_process_events_and_timers at src/event/ngx_event.c:203 inf 3, 2, 1
    breakpoint already hit 17 times
2       breakpoint     keep y   0x000000000045b7fa in ngx_http_fastcgi_create_request at src/http/modules/ngx_http_fastcgi_module.c:735 inf 3, 2, 1
    breakpoint already hit 4 times
3       breakpoint     keep y   0x000000000045c2af in ngx_http_fastcgi_create_request at src/http/modules/ngx_http_fastcgi_module.c:1190 inf 3, 2, 1
    breakpoint already hit 4 times
4       breakpoint     keep y   0x000000000045a573 in ngx_http_fastcgi_process_record at src/http/modules/ngx_http_fastcgi_module.c:2145 inf 3, 2, 1
    breakpoint already hit 1 time

  执行到ngx_http_fastcgi_create_request函数结尾(断点3),打印r->upstream->request_bufs三个buf:

(gdb) p *r->upstream->request_bufs->buf->pos@1000
$18 =
\001\001\000\001\000\b\000\000                  //8字节头部,type=1(BEGIN_REQUEST)
\000\001\000\000\000\000\000\000                //8字节BEGIN_REQUEST数据包
\001\004\000\001\002\222\006\000                //8字节头部,type=4(PARAMS),数据内容长度=2*256+146=658(不是8字节整数倍,需要填充6个字节)
\017\025SCRIPT_FILENAME/home/xiaoju/test.php    //key-value,格式为:keylen+valuelen+key+value
\f\000QUERY_STRING\016\004REQUEST_METHODPOST
\f!CONTENT_TYPEapplication/x-www-form-urlencoded
\016\002CONTENT_LENGTH19
\v\tSCRIPT_NAME/test.php
\v\nREQUEST_URI//test.php
\f\tDOCUMENT_URI/test.php
\r\fDOCUMENT_ROOT/home/xiaoju
\017\bSERVER_PROTOCOLHTTP/1.1
\021\aGATEWAY_INTERFACECGI/1.1
\017\vSERVER_SOFTWAREnginx/1.6.2
\v\tREMOTE_ADDR127.0.0.1
\v\005REMOTE_PORT54276
\v\tSERVER_ADDR127.0.0.1
\v\002SERVER_PORT80
\v\tSERVER_NAMElocalhost
\017\003REDIRECT_STATUS200
\017dHTTP_USER_AGENTcurl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.27.1 zlib/1.2.3 libidn/1.18 libssh2/1.4.2
\t\tHTTP_HOSTlocalhost
\v\003HTTP_ACCEPT*/*
\023\002HTTP_CONTENT_LENGTH19
\021!HTTP_CONTENT_TYPEapplication/x-www-form-urlencoded
\000\000\000\000\000\000           //6字节内容填充
\001\004\000\001\000\000\000\000   //8字节头部,type=4(PARAMS),表示PARAMS请求结束
\001\005\000\001\000\023\005\000   //8字节头部,type=5(STDIN),请求体数据长度19个字节
 
 
(gdb) p *r->upstream->request_bufs->next->buf->pos@20
$19 = "name=hello&gender=1"   //HTTP请求体,长度19字节,需填充5个字节
 
 
(gdb) p *r->upstream->request_bufs->next->next->buf->pos@20
$20 =
\000\000\000\000\000            //5字节填充
\001\005\000\001\000\000\000    //8字节头部,type=5(STDIN),表示STDIN请求结束

  执行到方法ngx_http_fastcgi_process_record,打印读入请求数据:

p *f->pos@1000
$26 =
\001\006\000\001\000\377\001\000  //8字节头部,type=6(STDOUT),返回数据长度为255字节(需要填充1个字节)
Set-Cookie: PHPSESSID=3h9lmb2mvp6qlk1rg11id3akd3; path=/\r\n    //返回数据内容,以换行符分隔
Expires: Thu, 19 Nov 1981 08:52:00 GMT\r\n
Cache-Control: no-store, no-cache, must-revalidate\r\n
Pragma: no-cache\r\n
Content-type: text/html; charset=UTF-8\r\n
\r\n
{\"ret-name\":\"ret-hello\",\"ret-gender\":\"ret-1\"}
\000
\001\003\000\001\000\b\000\000   //8字节头部,type=3(END_REQUEST),表示fastcgi请求结束,数据长度为8
\000\000\000\000\000\000\000\000    //8字节END_REQUEST数据

proxy_pass

  当我们配置了proxy_pass指令后,Nginx会将请求转发给上游HTTP服务处理;此时设置内容产生阶段处理函数为ngx_http_proxy_handler。

  这里我们简单介绍下长连接的配置以及注意事项。下面配置使得Nginx与上游HTTP服务保持长连接:

upstream  proxypass.test.com{
        server 127.0.0.1:8080;
        keepalive 10;
}
server {
      listen 80;
      server_name proxypass.test.com ;

      access_log /home/nginx/logs/proxypass.test.com_access.log main;
      location /  {
          proxy_pass http://proxypass.test.com;
          proxy_http_version 1.1;
          proxy_set_header Connection "keep-alive";
      }
}

  注意upstream配置块里的配置指令keepalive,官方文档如下:

Syntax:    keepalive connections;
This directive appeared in version 1.1.4.

The connections parameter sets the maximum number of idle keepalive connections to upstream servers that are preserved in the cache of each worker process.

It should be particularly noted that the keepalive directive does not limit the total number of connections to upstream servers that an nginx worker process can open. The connections parameter should be set to a number small enough to let upstream servers process new incoming connections as well.

  每个work都有长连接缓存池,而keepalive配置的就是缓存池忠最大空闲连接的数目,注意是空闲连接,并不是限制Nginx与上游HTTP建立的连接总数目。

  proxy_http_version配置使用1.1版本HTTP协议;proxy_set_header配置HTTP头部Connection为keep-alive(Nginx默认Connection:close,即使使用的是1.1版本HTTP协议)。

  在使用长连接时,一定要特别注意;如果上游主动关闭连接,而此时恰好Nginx发起请求,可能会出现502(线上曾经出现过偶发502现象),而且出现502的概率较低,现场难以捕获。

  长连接上游为什么会主动关闭连接呢?比如,在Golang服务中,通常会配置IdleTimeout,当长连接长时间空闲时后,Golang会主动关闭该长连接。如下面的抓包实例,前两次请求公用同一个TCP连接,但是当长时间没有新的请求到达时,Golang会主动关闭该长连接。如果此时恰好Nginx发起新的请求,就有可能造成异常情况。

//建立连接
10:24:16.240952 IP 127.0.0.1.31451 > 127.0.0.1.8080: Flags [S], seq 388101088, win 43690, length 0
10:24:16.240973 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [S.], seq 347995396, ack 388101089, win 43690, length 0
10:24:16.240994 IP 127.0.0.1.31451 > 127.0.0.1.8080: Flags [.], ack 1, win 86, length 0

//第一次请求
10:24:16.241052 IP 127.0.0.1.31451 >127.0.0.1.8080: Flags [P.], seq 1:111, ack 1, win 86, length 110: HTTP: GET /test HTTP/1.1
10:24:16.241061 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [.], ack 111, win 86, length 0
10:24:17.242332 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [P.], seq 1:129, ack 111, win 86, length 128: HTTP: HTTP/1.1 200 OK
10:24:17.242371 IP 127.0.0.1.31451 > 127.0.0.1.8080: Flags [.], ack 129, win 88, length 0

//隔几秒第二次请求,没有新建连接
10:24:24.536885 IP 127.0.0.1.31451 > 127.0.0.1.8080: Flags [P.], seq 111:221, ack 129, win 88, length 110: HTTP: GET /test HTTP/1.1
10:24:24.536914 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [.], ack 221, win 86, length 0
10:24:25.537928 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [P.], seq 129:257, ack 221, win 86, length 128: HTTP: HTTP/1.1 200 OK
10:24:25.537957 IP 127.0.0.1.31451 > 127.0.0.1.8080: Flags [.], ack 257, win 90, length 0

//上游主动断开长连接
10:25:25.538408 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [F.], seq 257, ack 221, win 86, length 0
10:25:25.538760 IP 127.0.0.1.31451 > 127.0.0.1.8080: Flags [F.], seq 221, ack 258, win 90, length 0
10:25:25.538792 IP 127.0.0.1.8080 > 127.0.0.1.31451: Flags [.], ack 222, win 86, length 0

  Nginx在与上游建立长连接时,也有一个配置,用于设置长连接超时时间:

Syntax:    keepalive_timeout timeout;
Default:    
keepalive_timeout 60s;
Context:    upstream
This directive appeared in version 1.15.3.

Sets a timeout during which an idle keepalive connection to an upstream server will stay open.

  需要注意,这是配置在upstream配置块的,而且必须Nginx版本高于1.15.3。

  http/server/location配置块还有一个配置keepalive_timeout,其配置的是与客户端的长连接超时时间:

Syntax:    keepalive_timeout timeout [header_timeout];
Default:    
keepalive_timeout 75s;
Context:    http, server, location

The first parameter sets a timeout during which a keep-alive client connection will stay open on the server side. The zero value disables keep-alive client connections. 

限流

  限流的目的是通过对并发访问/请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页)、排队等待(秒杀)、或者降级(返回兜底数据或默认数据)。通常限流策略有:限制瞬时并发数(如Nginx的ngx_http_limit_conn_module模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Nginx的ngx_http_limit_req_module模块,用来限制每秒的平均速率)。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

  常用的限流算法有计数器(简单粗暴,升级版是滑动窗口算法)漏桶算法,以及令牌桶算法。下面简单介绍令牌桶算法。如下图所示,令牌桶是一个存放固定容量令牌的桶,按照固定速率r往桶里添加令牌;桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃;当一个请求达到时,会尝试从桶中获取令牌;如果有,则继续处理请求;如果没有则排队等待或者直接丢弃。如下图:

image

  限流模块会注册handler到HTTP处理流程的NGX_HTTP_PREACCESS_PHASE阶段。如ngx_http_limit_req_module模块注册ngx_http_limit_req_handler;函数ngx_http_limit_req_handler执行限流算法,判断是否超出配置的限流速率,从而进行丢弃或者排队或者通过。

  ngx_http_limit_req_module模块提供以下配置指令,供用户配置限流策略:

//每个配置指令主要包含两个字段:名称,解析配置的处理方法
static ngx_command_t  ngx_http_limit_req_commands[] = {
 
    //一般用法:limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
    //$binary_remote_addr表示远程客户端IP;
    //zone配置一个存储空间(需要分配空间记录每个客户端的访问速率,超时空间限制使用lru算法淘汰;注意此空间是在共享内存分配的,所有worker进程都能访问)
    //rate表示限制速率,此例为1qps
    { ngx_string("limit_req_zone"),
      ngx_http_limit_req_zone,
     },
 
    //用法:limit_req zone=one burst=5 nodelay;
    //zone指定使用哪一个共享空间
    //超出此速率的请求是直接丢弃吗?burst配置用于处理突发流量,表示最大排队请求数目,当客户端请求速率超过限流速率时,请求会排队等待;而超出burst的才会被直接拒绝;
    //nodelay必须与burst一起使用;此时排队等待的请求会被优先处理;否则假如这些请求依然按照限流速度处理,可能等到服务器处理完成后,客户端早已超时
    { ngx_string("limit_req"),
      ngx_http_limit_req,
     },
 
    //当请求被限流时,日志记录级别;用法:limit_req_log_level info | notice | warn | error;
    { ngx_string("limit_req_log_level"),
      ngx_conf_set_enum_slot,
     },
 
    //当请求被限流时,给客户端返回的状态码;用法:limit_req_status 503
    { ngx_string("limit_req_status"),
      ngx_conf_set_num_slot,
    },
};

  Nginx限流算法依赖两个数据结构:红黑树和LRU队列。红黑树提供高效率的增删改查,LRU队列用于实现数据淘汰。

  我们假设限制每个客户端IP($binary_remote_addr)请求速率。当用户第一次请求时,会新增一条记录,并以客户端IP地址的hash值作为key存储在红黑树中,同时存储在LRU队列中;当用户再次请求时,会从红黑树中查找这条记录并更新,同时移动记录到LRU队列首部。存储空间(limit_req_zone配置)不够时,会淘汰记录,每次都是从尾部删除。

  下面我们通过两个实验进一步理解Nginx限流策略的配置以及限流现象。

  • 实验一:

  配置限速1qps,允许请求被排队处理,配置burst=5,即最多允许5个请求排队等待处理。

http{
    limit_req_zone $binary_remote_addr zone=test:10m rate=1r/s;
 
    server {
        listen       80;
        server_name  localhost;
        location / {
            limit_req zone=test burst=5;
            root   html;
            index  index.html index.htm;
        }
}

  使用ab并发发起10个请求,ab -n 10 -c 10 http://xxxxx

  查看服务端access日志;根据日志显示第一个请求被处理,2到5四个请求拒绝,6到10五个请求被处理;为什么会是这样的结果呢?

xx.xx.xx.xxx - - [22/Sep/2018:23:41:48 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:48 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:48 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:48 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:48 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:49 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:50 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:51 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:52 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [22/Sep/2018:23:41:53 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"

  查看ngx_http_log_module,注册handler到NGX_HTTP_LOG_PHASE阶段(HTTP请求处理最后一个阶段);因此实际情况应该是这样的:10个请求同时到达,第一个请求到达直接被处理,第2到6个请求到达,排队延迟处理(每秒处理一个);第7到10个请求被直接拒绝,因此先打印access日志;

  ab统计的响应时间见下面,最小响应时间87ms,最大响应时间5128ms,平均响应时间为1609ms:

              min  mean[+/-sd] median   max
Connect:       41   44   1.7     44      46
Processing:    46 1566 1916.6   1093    5084
Waiting:       46 1565 1916.7   1092    5084
Total:         87 1609 1916.2   1135    5128
  • 试验二

  实验一配置burst后,虽然突发请求会被排队处理,但是响应时间过长,客户端可能早已超时;因此添加配置nodelay,使得Nginx紧急处理等待请求,以减小响应时间:

http{
    limit_req_zone $binary_remote_addr zone=test:10m rate=1r/s;
 
    server {
        listen       80;
        server_name  localhost;
        location / {
            limit_req zone=test burst=5 nodelay;
            root   html;
            index  index.html index.htm;
        }
}

  同样ab发起请求,查看服务端access日志;第一个请求直接处理,第2到6个五个请求排队处理(配置nodelay,Nginx紧急处理),第7到10四个请求被拒绝:

xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 200 612 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"
xx.xx.xx.xxx - - [23/Sep/2018:00:04:47 +0800] "GET / HTTP/1.0" 503 537 "-" "ApacheBench/2.3"

  ab统计的响应时间见下面,最小响应时间85ms,最大响应时间92ms,平均响应时间为88ms:

              min  mean[+/-sd] median   max
Connect:       42   43   0.5     43      43
Processing:    43   46   2.4     47      49
Waiting:       42   45   2.5     46      49
Total:         85   88   2.8     90      92

  ngx_http_limit_conn_module限流模块比较简单,这里不再介绍。

  最后,脑图奉上:

image

案例分析:502问题介绍

  生产环境通常存在各种各样的异常HTTP状态码,比如499,504,502,500,400,408等,每个状态码的含义还是需要清楚。这里我们简单介绍下最常见的502问题排查。

  502的含义是NGX_HTTP_BAD_GATEWAY,即网关错误。通常的原因是上游没有监听,或者上游主动断开连接。在出现502时候,通常Nginx都会记录错误日志;比如:

2020/11/02 21:35:24 [error] 20921#0: *40 upstream prematurely closed connection while reading response header from upstream, client: 127.0.0.1, server: proxypass.test.com, request: "GET /test HTTP/1.1", upstream: "http://127.0.0.1:8080/test", host: "proxypass.test.com"

  通过日志瞬间就明白了,在Nginx等待上游返回结果时候,上游主动关闭连接了。上游为什么会主动关闭连接呢?这原因就复杂了,比如上游是golang服务时,假如配置了WriteTimeout=3秒,当请求处理时间超过3秒时,Golang服务会主动关闭连接。如下面抓包实例:

//三次握手建立连接
21:57:13.586604 IP 127.0.0.1.31519 > 127.0.0.1.8080: Flags [S], seq 574987506, win 43690, length 0
21:57:13.586627 IP 127.0.0.1.8080 > 127.0.0.1.31519: Flags [S.], seq 3599212930, ack 574987507, win 43690, length 0
21:57:13.586649 IP 127.0.0.1.31519 > 127.0.0.1.8080: Flags [.], ack 1, win 86, length 0

//发送请求
21:57:13.586735 IP 127.0.0.1.31519 > 127.0.0.1.8080: Flags [P.], seq 1:111, ack 1, win 86, length 110: HTTP: GET /test HTTP/1.1
21:57:13.586743 IP 127.0.0.1.8080 > 127.0.0.1.31519: Flags [.], ack 111, win 86, length 0

//请求处理5秒超时;没有响应,上游直接断开连接
21:57:18.587918 IP 127.0.0.1.8080 > 127.0.0.1.31519: Flags [F.], seq 1, ack 111, win 86, length 0
21:57:18.588169 IP 127.0.0.1.31519 > 127.0.0.1.8080: Flags [F.], seq 111, ack 2, win 86, length 0
21:57:18.588184 IP 127.0.0.1.8080 > 127.0.0.1.31519: Flags [.], ack 112, win 86, length 0

  再比如Nginx错误日志:

connect() to unix:/tmp/php-fcgi.sock failed (11: Resource temporarily unavailable) while connecting to upstream

  表明Nginx在通过域套接字连接上游FPM进程时,返回EAGAIN(11);查看Nginx源码中的注释:

Linux returns EAGAIN instead of ECONNREFUSED for unix sockets if listen queue is full

  Linux域套接字在队列溢出时,接收到连接请求会返回EAGAIN,而此时Nginx直接结束该请求,并返回502。

总结

  想一篇文章完全介绍清楚Nginx源码是不可能的,因此本文针对Nginx部分知识点,做了一个概括介绍,主要告诉读者一些基本的设计思想,以及可以去源码哪里寻找你想要的答案。当然,限于篇幅以及能力问题,还有很多知识点或者细节是没有介绍到的。结合这些脑图,剩下的就需要读者自己去探索研究了。

查看原文

赞 12 收藏 6 评论 0

书旅 发布了文章 · 2020-11-17

你以为只是简单的排序?(二)

上一篇文章中分享了冒泡排序、插入排序、选择排序这三种排序算法,它们的时间复杂度都是O(n^2),比较高,适合小规模数据的排序。这篇文章,分享两种时间复杂度为O(nlogn)的排序算法,归并排序快速排序。这两种排序算法适合大规模的数据排序,更加的常用一些

归并排序

归并排序思想

归并排序核心思想:将待排序的数据分成前后两个部分,然后对前后两个部分的数据分别进行排序,再将前后两部分合并,得到的结果就是排好序的数据了

语言很抽象,看图

归并排序使用的就是分治思想。分治,顾名思义,就是分而治之,将一个大问题分解成小的子问题来解决。小的子问题解决了,大问题也就解决了

分治思想跟之前的一篇文章中的递归很像,但是,分治是一种解决问题的处理思想,递归是一种编程技巧。归并排序用的是分治思想,可以用递归来实现

归并排序实现

如果你看过上一篇递归的文章,上边有介绍到,解递归问题,要分析出递归公式,然后找到递归终止条件,有了这两个,基本上递归代码就出来了。有了上边的那个图,基本上可以得到下边的递推公式

递归公式
MergeSort(start, end) = merge(MergeSort(start,(start+end)/2), MergeSort((start+end)/2 + 1, end))

递归终止条件
start >= end

MergeSort(start, end)表示的是给下标start到end之间的数组中的数据进行排序,将这个这个排序分成了两个子问题,一个是MergeSort(start,(start+end)/2),另一个是MergeSort((start+end)/2 + 1), end),当把这两个子问题排好序之后,再将它们合并,就把下标start到end之间的数据排好序了

最外层的那个merge就是对两个已经有序的数组进行合并,具体过程如下:
用两个游标i和j,分别指向两个待merge数组(arr)的第一个元素(这两个数组各自是有序的),比较游标对应位置元素的大小。如果arr[i]<arr[j],则将arr[i]放入到一个新的数组中tmp,然后i向后移动一位,否则,将a[j]放入tmp中,j向后一位。重复以上过程,直到有一个子数组空了,然后把另一个子数组中的数据追加到tmp的尾部,以上过程,即可将两个分别有序的数组合并成一个有序数组。具体过程如图:

代码实现

func MergeSort(arr []int) []int {
    if len(arr) <= 0 {
        fmt.Println("参数不合法")
        return nil
    }

    //递归终止条件,当拆分后的数组长度少于两个元素的时候
    if len(arr) < 2 {
        return arr
    }

    midIndex := len(arr) / 2
    leftArr := MergeSort(arr[0:midIndex])
    rightArr := MergeSort(arr[midIndex:])
    result := merge(leftArr, rightArr)

    return result
}

func merge(leftArr, rightArr []int) []int {
    var mergeRes []int
    leftIndex, rightIndex := 0, 0
    leftLength, rightLength := len(leftArr), len(rightArr)

    for leftIndex < leftLength && rightIndex < rightLength {
        if leftArr[leftIndex] > rightArr[rightIndex] {
            mergeRes = append(mergeRes, rightArr[rightIndex])
            rightIndex++
        } else {
            mergeRes = append(mergeRes, leftArr[leftIndex])
            leftIndex++
        }
    }

    if leftIndex == leftLength{
        for rightIndex < rightLength {
            mergeRes = append(mergeRes, rightArr[rightIndex])
            rightIndex++
        }
    }
    if rightIndex == rightLength {
        for leftIndex < leftLength {
            mergeRes = append(mergeRes, leftArr[leftIndex])
            leftIndex++
        }
    }

    return mergeRes
}

归并排序性能分析

首先归并排序是稳定排序算法,这个主要取决于merge操作,也就是将两个有序数组合并成一个有序数组。在合并的过程中,当两个数组中有相同的元素时,先把前边那部分数组中元素放入到tmp中,这样就可以保证相同的元素,在合并前后顺序不变

归并排序的时间复杂度是O(nlogn)。假设对n个元素进行归并排序需要的时间是T(n),那分解成两个子数组排序的时间都是 T(n/2)。merge函数合并两个有序子数组的时间复杂度是O(n)。所以,套用前面的公式,归并排序的时间复杂度的计算公式就是

T(1) = C;   n=1 时,只需要常量级的执行时间,所以表示为 C。
T(n) = 2*T(n/2) + n; n>1

根据上边的公式,经过一定的数学推导可以得到T(n)=Cn+nlog2n(log以2为底的n)。所以得到归并排序的时间复杂度就是O(nlogn)

从归并排序的原理图中可以看出来,归并排序的执行效率与要排序的原始数组的有序程度无关,所以其时间复杂度是非常稳定的,不管是最好情况、最坏情况,还是平均情况,时间复杂度都是O(nlogn)

归并排序和下边的快速排序相比,虽然时间复杂度都是O(nlogn),归并排序应用却不是那么广泛,因为它不是原地排序。归并排序中合并过程需要借助额外的存储空间

递归代码的空间复杂度并不能像时间复杂度那样累加,尽管每次合并操作都需要申请额外的内存空间,但在合并完成之后,临时开辟的内存空间就被释放掉了。在任意时刻,CPU 只会有一个函数在执行,也就只会有一个临时的内存空间在使用。临时内存空间最大也不会超过n个数据的大小,所以空间复杂度是 O(n)

快速排序

快速排序思想

快速排序的实现也是分治的思想,有点像归并排序,但是他们的实现思路还是完全不一样的

快速排序核心思想:如果要排序数组中下标从start到end之间的一组数据,选择start到end之间的任意一个数据作为分区点(pivot)

然后遍历start到end之间的数据,将小于分区点(pivot)的数据放左边,大于分区点的数据放右边,将分区点位置的数据放中间。经过上边的操作之后,分区点之前的数据都小于分区点这个位置的数据,分区点右边的数据都大于分区点这个位置的数据

可以先不用纠结为什么经过一次分区之后变成这个样子了。往下看

根据递归的思想,如果我们用递归排序下标从start到pivot-1之间的数据和下标从pivot+1到end之间的数据,直到区间缩小为 1,就说明所有的数据都有序了

所以,按照上边的思想,可以得到下边这个递归公式

公式:
QuickSort(start...end) = QuickSort(start...pivot-1) + QuickSort(pivot+1...end)

终止条件:
start >= end

根据递归的公式,写出来的代码是这个样子

func QuickSort(arr []int, start int, end int) {
    if start >= end {
        return
    }

    pivot := partition(arr, start, end)
    QuickSort(arr, start, pivot-1)
    QuickSort(arr, pivot+1, end)
}

还记得上边的归并排序不,在归并排序里边有一个merge()方法,是将两个有序的数组合并成一个有序的数组。而这里用到了一个partition()函数,就是上边说到的,随机选择一个元素作为分区点(pivot)(一般情况下,可以选择start到end区间的最后一个元素),然后对A[start…end]分区,函数返回分区点(pivot)的下标

知道这个partition()函数是做什么的,下边就是各显神通的去实现它了。如果不考虑空间复杂度的话,就有一个非常简单的方法。申请两个临时数组A和B,然后遍历待分区的数组arr,将小于pivot分区点对应下标值的放到A数组,大于的放到B数组,然后将A数组和B数组中的数据分别顺序的放入到arr中即可。看图:

如果按照这种思路实现的话,partition()函数就需要很多额外的内存空间,所以快排就不是原地排序算法了。如果希望快排是原地排序算法,那它的空间复杂度得是O(1),那partition()分区函数就不能占用太多额外的内存空间,我们就需要在arr的原地完成分区操作

下边的这个就非常巧妙的实现了空间复杂度为O(1)的情况下,完成了分区的操作(反正我是不知道别人是怎么想出来的,没别的法子,理解它,文字描述可能比较难理解,不慌,看图)

func partition(arr []int, start int, end int) int {
    pivotValue := arr[end]
    i := start
    for j:=start;j < end;j++ {
        if arr[j] < pivot {
            arr[i], arr[j] = arr[j],arr[i]
            i++
        }
    }
    arr[i], arr[end] = arr[end], arr[i]

    return i
}

上边是通过游标i,将arr[start...end]分成两个部分,arr[start...i-1]的元素都是小于pivot的,可以叫它“已处理区间”,arr[i...end]是“未处理区间”。每次都从未处理的区间arr[i...end]中取一个元素arr[j],与pivotValue对比,如果小于pivotValue,则将其加入到已处理区间的尾部,也就是arr[i]的位置。文字描述不好理解,看图

快速排序实现

原理上边都介绍了,下边是完整的代码实现

package quickSort

func QuickSort(arr []int, start int, end int) {
    if start >= end {
        return
    }

    pivot := partition(arr, start, end)
    QuickSort(arr, start, pivot-1)
    QuickSort(arr, pivot+1, end)
}


func partition(arr []int, start int, end int) int {
    pivotValue := arr[end]
    i := start
    for j:=start;j < end;j++ {
        if arr[j] < pivotValue {
            arr[i], arr[j] = arr[j],arr[i]
            i++
        }
    }
    arr[i], arr[end] = arr[end], arr[i]

    return i
}

快速排序性能分析

上边使用了一种巧妙的方法,在空间复杂度为O(1)的情况下,实现了快速排序,所以快排是一个稳定的排序算法

因为分区的过程涉及交换操作,如果数组中有两个相同的元素,比如序列 6,8,7,6,3,5,9,4,在经过第一次分区操作之后,两个6的相对先后顺序就会改变(跟着上边的分区算法图,很容易可以推出来)。所以,快速排序并不是一个稳定的排序算法

快排也是用递归来实现的。对于递归代码的时间复杂度,归并排序中分析出来的公式,这里也还是适用的。如果每次分区操作,都能正好把数组分成大小接近相等的两个小区间,那快排的时间复杂度递推求解公式跟归并是相同的。所以,快排的时间复杂度也是O(nlogn)

T(1) = C;   n=1 时,只需要常量级的执行时间,所以表示为 C。
T(n) = 2*T(n/2) + n; n>1

但是,公式成立的前提是每次分区操作,我们选择的pivot都很合适,正好能将大区间对等地一分为二。但实际上这种情况是很难实现的

如果数组中的数据原来已经是有序的了,比如 1,3,5,6,8。如果每次选择最后一个元素作为pivot,那每次分区得到的两个区间都是不均等的。我们需要进行大约n次分区操作,才能完成快排的整个过程。每次分区我们平均要扫描大约n/2个元素,这种情况下,快排的时间复杂度就从O(nlogn)退化成了O(n^2)

快排在大部分情况下的时间复杂度都可以做到O(nlogn),只有在极端情况下,才会退化到O(n^2)

归并排序和快速排序对比

两种排序思想的区别

快排和归并用的都是分治思想,递推公式和递归代码也非常相似,那它们的区别在哪里呢?

可以发现,归并排序的处理过程是由下到上的,先处理子问题,然后再合并。而快排正好相反,它的处理过程是由上到下的,先分区,然后再处理子问题

性能上的差异

归并排序虽然是稳定的、时间复杂度为O(nlogn)的排序算法,但是它是非原地排序算法。归并之所以是非原地排序算法,主要原因是合并函数无法在原地执行。快速排序通过设计巧妙的原地分区函数,可以实现原地排序,解决了归并排序占用太多内存的问题

查看原文

赞 0 收藏 0 评论 0

书旅 发布了文章 · 2020-11-14

你以为只是简单的排序?(一)

一直在犹豫要不要写排序的文章,因为真的烂大街了。可是一旦细看,还真是很多值的思考的地方,所以还是选择记录一下

以下完整代码,均可从这里获取

https://github.com/Rain-Life/data-struct-by-go/tree/master/sort

排序算法效率分析

了解如何分析一个排序算法,可以帮助我们在实际工作场景中选择合适的排序算法,比如,如果排序的数据比较少,可以选择冒泡或插入排序,如果排序的数据量较大,选择归并或快速排序,虽然它们两两的时间复杂度是相同的,但是还是有很大的区别的,下边会对它们做对比

排序算法执行效率

一般分析一个排序算法的复杂度,我们都是去分析它的时间复杂度,时间复杂度反应的是数据规模n很大的时候的一个增长趋势。所以,通常在分析时间复杂度的时候会忽略到系数、常数、低阶。但是,在实际开发场景中,可能我们排序的数据并不多,因此,在对排序算法进行分析的时候,还是需要将系数、常数、低阶也考虑进来

分析一个排序算法的时间复杂度的时候,通常会分析它的最好情况、最坏情况以及平均情况下的时间复杂度。因为对于要排序的数据,它的有序度,对排序算法的执行时间是有影响的,所以,要想选择最合适的排序算法,这些情况的时间复杂度都应该考虑到(其实不光是排序,在实现任何一个算法的时候,当有多种方式可供选择的时候,都应该分析多重情况下的时间复杂度)

下边要分享的三个排序算法都是基于比较的排序算法,基于比较的排序算法的在执行过程中,一般涉及两种操作,一个是比较大小,一个是数据交换。因此,在对比这几种排序算法的时候,比较次数和移动次数也应该考虑进去。这也是为什么基于比较排序的算法,我们通常不会选择冒泡排序,而选择插入排序

排序算法内存消耗

算法的内存消耗可以通过空间复杂度来衡量。针对排序算法的空间复杂度,有一个新的概念是原地排序。原地排序算法,就是特指空间复杂度是O(1)的排序算法。下边要分享的三种排序算法,都是原地排序算法

排序算法稳定性

只靠执行效率和内存消耗来衡量排序算法的好坏是不够的。针对排序算法,还有一个重要的度量指标,稳定性。意思是,如果待排序的序列中存在值相等的元素,经过排序之后,相等元素之间原有的先后顺序不变

如:1、8、6、5、5、7、2、3,按照大小排序之后是:1、2、3、5、5、6、7、8

这组数据里有两个5。经过某种排序算法排序之后,如果两个5的前后顺序没有改变,那我们就把这种排序算法叫作稳定的排序算法;如果前后顺序发生变化,那对应的排序算法就叫作不稳定的排序算法

冒泡排序

冒泡排序优化

冒泡排序的实现思想,相信大家都非常的熟悉了

每次冒泡操作都会对相邻的两个元素进行比较,看是否满足大小关系要求。如果不满足就让它俩互换。一次冒泡会让至少一个元素移动到它应该在的位置,重复n次,就完成了n个数据的排序工作

实际上,冒泡过程还可以优化。当某次冒泡操作已经没有数据交换时,说明已经达到完全有序,不用再继续执行后续的冒泡操作。如图

下边是优化后的代码:

func BubbleSort(arr []int) {
    flag := false
    n := len(arr)
    for i := 0; i < n; i++ {
        flag = false//如果某一次冒泡,没有出现数据交换,说明已经有序,不用再继续冒泡了
        for j := 0; j < n-i-1; j++ {
            if arr[j] > arr[j+1] {
                tmp := arr[j]
                arr[j] = arr[j+1]
                arr[j+1] = tmp
                flag = true
            }
        }
        if !flag {
            break
        }
    }

    for _, v := range arr {
        fmt.Printf("%v\t", v)
    }
}

冒泡排序算法分析

首先冒泡排序是一个原地排序算法,因为冒泡排序只涉及相邻数据的交换,需要常量级的临时空间,所以空间复杂度是O(1)

在冒泡排序中,只有交换才可以改变两个元素的前后顺序。为了保证冒泡排序算法的稳定性,当有相邻的两个元素大小相等的时候,我们不做交换,相同大小的数据在排序前后不会改变顺序,所以冒泡排序是稳定的排序算法

在最好的情况下,也就是待排数据是完全有序的,那只需要进行一次冒泡操作即可,所以最好情况下的时间复杂度是O(n)

最坏情况下是待排数据是完全无序的,这个时候就需要n次冒泡,所以最坏情况下的时间复杂度是O(n^2)

平均情况下的时间复杂度的分析涉及比较复杂的推导,不是这里的重点(我也不会,手动狗头。如果你想了解,可以看这里),冒泡排序算法的平均时间复杂度是O(n^2)

插入排序

插入排序思想

插入排序是如何来的?假设现在有一个有序的数组,让你往里边插入一个数据之后,保持数组是有序的。我们都会想到通过遍历来找到待插入数据的位置,然后进行数据的移动。通过这种方式就可以保证这个数组有序。借鉴上边这种插入方法,于是就有了插入排序

插入排序的思想是:将待排序的数组分成两个区间,有序区和无序区。刚开始的时候,有序区只有第一个元素。插入排序的过程就是每次从无序区中取出一个元素,放入到有序区中对应的位置,保证插入到有序区中之后,有序区依然是有序的。不断的重复这个过程,直到无序区为空

文字描述比较抽象,见下图(从小到大排序)

插入排序也是包含两种操作,一种是比较,一种是移动。下边是代码实现

func InsertSort(arr []int)  {
    if len(arr) <= 0 {
        fmt.Println("待排数据不合法")
    }
    n := len(arr)

    for i := 1; i < n; i++ {//i是待排区的元素
        value := arr[i]
        j := i-1
        for ; j >= 0; j-- { //j遍历的是已排区的每一个元素
            if arr[j] > value {
                arr[j+1] = arr[j] //如果满足条件,将前一个值赋给后边这个
            } else {
                break
            }
        }
        arr[j+1] = value
    }

    for _, v := range arr {
        fmt.Printf("%v\t", v)
    }
}

插入排序算法分析

插入排序也不需要额外的存储空间,空间复杂度是O(1),所以它是原地排序算法

在插入排序中,对于值相同的元素,我们可以选择将后面出现的元素,插入到前面出现元素的后面,这样就可以保持原有的前后顺序不变,所以插入排序是稳定的排序算法

如果待排序的数据是完全有序的,并不需要搬移任何数据。如果从尾到头在有序数据组里面查找插入位置,每次只需要比较一个数据就能确定插入的位置。所以这种情况下,最好是时间复杂度为O(n)。注意,这里是从尾到头遍历已经有序的数据

如果数组是倒序的,每次插入都相当于在数组的第一个位置插入新的数据,所以需要移动大量的数据,所以最坏情况时间复杂度为O(n^2)平均时间复杂度也是O(n^2)

选择排序

选择排序思想

选择排序的思想和插入排序的思想有些类似,选择排序是每次从无序区中选择一个最小的元素放入到有序区中,具体如图:

代码实现如下

func SelectionSort(arr []int) {
    n := len(arr)
    if n <= 0 {
        fmt.Println("待排数据不合法")
    }

    for i := 0; i < n - 1; i++ {
        for j := i+1; j < n ; j++ {
            if arr[i] > arr[j] {
                arr[i],arr[j] = arr[j], arr[i]
            }
        }
    }

    for _, v := range arr {
        fmt.Printf("%v\t", v)
    }
}

选择排序算法分析

选择排序的空间复杂度也是O(1),是原地排序算法。选择排序的最好情况和最坏情况的时间复杂度都是O(n^2),这个很简单,看一下它的执行过程就知道了

选择排序不是一个稳定排序,选择排序每次都要找剩余未排序元素中的最小值,并和前面的元素交换位置,这样破坏了稳定性

比如7,3,5,7,1,9 这样一组数据,使用选择排序算法来排序的话,第一次找到最小元素1,与第一个7交换位置,那第一个7和中间的7顺序就变了,所以就不稳定了

这样一看,选择排序和前边两个排序算法相比就差一些了

三种排序算法对比

这里就先不提选择排序了,因为和前两种相比,它明显逊色一些

冒泡排序不管怎么优化,元素交换的次数是一个固定值。插入排序是同样的,不管怎么优化,元素移动的次数也是一个固定值。但是,从冒泡和插入排序的代码上看,冒泡排序的数据交换次数比插入排序要复杂,冒泡排序需要3个赋值操作,而插入排序只需要1个

冒泡排序的交换操作
if arr[j] > arr[j+1] {
    tmp := arr[j]
    arr[j] = arr[j+1]
    arr[j+1] = tmp
    flag = true
}

选择排序的交换操作
if arr[j] > value {
    arr[j+1] = arr[j]
}

假设把一次赋值操作的时间算作一个单位时间,那冒泡排序的交换操作需要3个单位时间,而插入排序只需要1个单位时间

如果需要排序的数据比较少,可能这样的差别可以忽略不计,但是数据多起来之后,插入排序节省的时间还是比较明显的

是否是原地排序是否是稳定排序最好情况最坏情况平均情况
冒泡排序O(n)O(n^2)O(n^2)
插入排序O(n)O(n^2)O(n^2)
选择排序O(n^2)O(n^2)O(n^2)

这三种时间复杂度为 O(n2) 的排序算法中,冒泡排序、选择排序,可能就纯粹停留在理论的层面了,学习的目的也只是为了开拓思维,实际开发中应用并不多,但是插入排序还是挺有用的。有些编程语言中的排序函数的实现原理会用到插入排序算法

image.png

查看原文

赞 0 收藏 0 评论 0

书旅 发布了文章 · 2020-11-11

数据结构与算法系列之递归(GO)

以下完整代码均可从这里获取

https://github.com/Rain-Life/data-struct-by-go/tree/master/recursion/step

理解递归

已经不知道是第几次被递归阻断我学习数据结构的道路了,每次学到递归,我都自我怀疑,是我脑子有问题吗?我是真的学不明白它!

发现之前理解递归过于刻板和传统,看递归的时候总是按照机器的执行顺序一直的往每一层递归里边进,不断的下一层、下一层、下一层,直到自己彻底崩溃,自己的CPU也没把一个完整的递归给走完

我们的脑子总是想着跟着机器的执行,不停的往每一层里边进。其实完全可以不用关心每一层,只要假定下一层能够正确的返回,然后该怎么走就怎么走,保证最深的一层递归逻辑正确就行,也就是递归终止的条件

因为不管是递归的哪一层,他们执行的都是一样的代码,唯一不一样的只是数据而已,保证了递归的逻辑是正确的,每一层的的结果就是正确的,直到递归的终止条件被满足,然后每一层都会得到正确的结果

下边进入主题

递归应用十分广泛,可以说,如果没法完全的理解递归的思想,后边的一些进阶的数据结构根本没法学,或者非常吃力。比如深度优先搜索、二叉树的遍历等等

基本上大多数的关于数据结构的书,在介绍递归的时候都是拿斐波那契数列来举例的,其实我个人觉得虽然题目经典,但对我了解递归帮助不是很大,下边分享一个生活中的示例帮助理解递归

假设你现在在爬山,已经爬到了山腰上,假设你现在想知道自己在第多少级台阶上应该怎么办

此时递归就派上用场了,那如果你知道你前边一级的台阶是第多少级就行了,知道了它是第多少级,在它的级数上加一就知道你所在的位置是第几级台阶了。但是你前边的这一级也不知道是第几级,毕竟离山底有点远,没法知道。那就继续的往前推,前一级的前一级台阶是第几级台阶,直到第一级台阶,然后就可以一级一级的把数字传回来,直到你的前一级台阶告诉你他在第几级,你就知道了自己在第几级了

整个过程就是一个递归的过程,往前推的过程是”递“,回传的时候就是”归“。所有的递归问题都是可以用递归来表示的,对于上边的例子,用公式来表示就是

f(n) = f(n-1) + 1,其中f(1)=1

f(n)是你想自己自己在哪一级,f(n-1)就是你的前一级台阶是第几级,f(1)表示第一级台阶我们知道它是第一级。有了公式写递归代码就很轻松了

func Recursion(int n) int {
    if n==1 {
        return 1
    }
    
    return f(n-1) + 1
}

什么情况下适合使用递归

只要一个问题可以满足下边三个条件,那就可以考虑使用递归

一个问题的解可以分解成几个子问题的解

子问题就是规模更小的问题,比如前边的求自己在哪一级台阶的问题,可以分解成”前边一级是哪一级“这样的子问题

子问题除了数据规模不一样,求解思路必须完全一样

还是以上边的例子为例,求自己在哪一级台阶,和求解前一级台阶在哪一级的思路是完全一样的

存在递归终止条件

把问题分解为子问题,把子问题再分解为子子问题,一层一层分解下去,不能存在无限循环,这就需要有终止条件

还是上边的例子,第一级台阶是明确知道是第一级的也就是 f(1)=1,这就是递归的终止条件

编写递归代码

写递归的代码,最关键的就是写出递归公式、找到递归终止条件,剩下的将递归公式转化成代码就简单了

示例

以Leetcode上边的一道题为例

假如这里有n个台阶,每次你可以跨1个台阶或者2个台阶,请问走这n个台阶有多少种走法?

如果有7个台阶,你可以 2,2,2,1 这样子上去,也可以 1,2,1,1,2 这样子上去,总之走法有很多,下边就是考虑如何通过代码来实现

经过思考我们可以知道,实际上,可以根据第一步的走法,把所有的走法分为两类

  • 第一类是第一步走了1个台阶
  • 第二类是第一步走了2个台阶

所以,n个台阶的走法就等于先走1阶后,n-1个台阶的走法加上先走2阶后,n-2个台阶的走法。用公式表示就是:

f(n) = f(n-1) + f(n-2)

有了递归公式,现在就差终止条件。首先,当只有一个台阶的时候,那肯定就只有一种走法,所以f(1) = 1

但是,只有这一个递归终止条件足够吗?肯定是不够的,比如现在考虑n=2和n=3的情况,靠这一个终止条件是否能够求出来

n=2
f(2) = f(1) + f(0)

如果递归终止条件只有一个f(1)=1,那 f(2)就无法求解了。所以除了f(1)=1 这一个递归终止条件外,还要有f(0)=1,表示走0个台阶有一种走法,不过这样子看起来就不符合正常的逻辑思维了。所以,可以把f(2)=2作为一种终止条件,表示走2个台阶,有两种走法,一步走完或者分两步来走

所以,最终得到的递归终止条件就是(可以找几个数字验证一下)

f(1)=1,f(2)=2

有了公式和递归终止条件,代码就很容易了

func StepEasy(n int) int {
    if n==1 {
        return 1
    }
    if n==2 {
        return 2
    }

    return StepEasy(n-1) + StepEasy(n-2)
}

分析

上边的这个例子,人脑几乎没办法把整个“递”和“归”的过程一步一步都想清楚

计算机擅长做重复的事情,所以递归正和它的胃口。而我们人脑更喜欢平铺直叙的思维方式。当我们看到递归时,我们总想把递归平铺展开,脑子里就会循环,一层一层往下调,然后再一层一层返回,试图想搞清楚计算机每一步都是怎么执行的,这样就很容易被绕进去

《数据结构与算法之美》

如果一个递归问题想试图去通过大脑去走一遍递归过程,实际上是进入了一个思维误区。很多时候,理解起来比较吃力,主要原因就是自己给自己制造了这种理解障碍

如果一个问题 A 可以分解为若干子问题 B、C、D,你可以假设子问题 B、C、D 已经解决,在此基础上思考如何解决问题 A。而且,你只需要思考问题 A 与子问题 B、C、D 两层之间的关系即可,不需要一层一层往下思考子问题与子子问题,子子问题与子子子问题之间的关系。屏蔽掉递归细节,这样子理解起来就简单多了

因此,编写递归代码的关键是,只要遇到递归,我们就把它抽象成一个递推公式,不用想一层层的调用关系,不要试图用人脑去分解递归的每个步骤

总结

写递归代码的关键就是找到如何将大问题分解为小问题的规律,并且基于此写出递推公式,然后再推敲终止条件,最后将递推公式和终止条件翻译成代码

递归防坑指南

1、防止堆栈溢出

为什么递归代码容易造成堆栈溢出?

分享一个自己实际遇到的一个情况,之前公司举办过一次黑客马拉松的程序比赛,是一道求解数独的题,求解数独的过程就用到了递归,给你一些已知数,求解数独

如果给的已知数太少,就会导致你的解数独过程的递归次数特别多

在“”那一篇文章中有分享到,函数调用会使用栈来保存临时变量。每调用一个函数,都会将临时变量封装为栈帧压入内存栈,等函数执行完成返回时,才出栈。系统栈或者虚拟机栈空间一般都不大。如果递归求解的数据规模很大,调用层次很深,一直压入栈,就会有堆栈溢出的风险

那么如何防止呢?

如何预防堆栈溢出?

也很简单,可以通过在代码中限制递归调用的最大深度的方式来解决这个问题。递归调用超过一定深度(比如 1000)之后,我们就不继续往下再递归了,直接返回报错

以上边的那个台阶为例

var depth = 0
func StepEasy(n int) int {
    depth++
    if depth > 100 {
        panic("递归次数太多")
    }
    
    if n==1 {
        return 1
    }
    if n==2 {
        return 2
    }

    return StepEasy(n-1) + StepEasy(n-2)
}

但这种做法并不能完全解决问题,因为最大允许的递归深度跟当前线程剩余的栈空间大小有关,事先无法计算。如果实时计算,代码过于复杂,就会影响代码的可读性。所以,如果最大深度比较小,比如 10、50,就可以用这种方法,否则这种方法并不是很实用

2、避免重复计算

使用递归时还会出现重复计算的问题,还是以走台阶的例子为例,假设n=6,递归分解一下的话,大概如下:
image.png

从图中,可以直观地看到,想要计算 f(5),需要先计算 f(4) 和 f(3),而计算 f(4) 还需要计算 f(3),因此,f(3) 就被计算了很多次,这就是重复计算问题

为了避免重复计算,我们可以通过一个数据结构(比如散列表)来保存已经求解过的 f(k)。当递归调用到 f(k) 时,先看下是否已经求解过了。如果是,则直接从散列表中取值返回,不需要重复计算,这样就能避免刚讲的问题了

上边那个走台阶的问题,最终就可以优化成这样

package step

import "fmt"

//假如这里有 n 个台阶,每次你可以跨 1 个台阶或者 2 个台阶,请问走这 n 个台阶有多少种走法?如果有 7 个台阶,
//你可以 2,2,2,1 这样子上去,也可以 1,2,1,1,2 这样子上去,总之走法有很多,那如何用编程求得总共有多少种走法呢?

var depth = 0

var mapList = map[int]int{}

func Step(n int) int {
    depth++
    if depth > 100 {
        panic("递归次数太多")
    }
    if n == 1 {
        return 1
    } else if n==2 {
        return 2
    }

    if mapList[n] != 0 {
        return mapList[n]
    }
    res := Step(n-1)+ Step(n-2)
    mapList[n] = res
    fmt.Printf("step(%d) = %d", n, mapList[n])
    fmt.Println()
    return res
}

总结

除了堆栈溢出、重复计算这两个常见的问题。递归代码还有很多别的问题

在时间效率上,递归代码里多了很多函数调用,当这些函数调用的数量较大时,就会积聚成一个可观的时间成本。在空间复杂度上,因为递归调用一次就会在内存栈中保存一次现场数据,所以在分析递归代码空间复杂度时,需要额外考虑这部分的开销,比如我们前面讲到的电影院递归代码,空间复杂度并不是 O(1),而是O(n)

以上内容参考《数据结构与算法之美》-递归篇,对里边的例子进行了简单的改变,代码通过go来实现,总结性文字皆来自原文

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 10 次点赞
  • 获得 5 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 4 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2020-03-10
个人主页被 2.1k 人浏览