ltxlong

ltxlong 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

ltxlong 收藏了文章 · 11月14日

docker环境搭建elasticsearch

docker搭建系列

本文主要讲如何使用使用docker搭建elasticsearch。

下载镜像

这里利用hangxin1940搭好的镜像,不过是es的1.4.2版本。

docker pull hangxin1940/docker-elasticsearch-cn:v1.6.0

启动容器

docker run -d -p 9200:9200 -p 9300:9300 --name es hangxin1940/docker-elasticsearch-cn:v1.6.0

查看es(这里的ip是docker的default machine的ip)

访问http://192.168.99.100:9200/

{
status: 200,
name: "node1",
cluster_name: "cn-out-of-box",
version: {
number: "1.6.0",
build_hash: "cdd3ac4dde4f69524ec0a14de3828cb95bbb86d0",
build_timestamp: "2015-06-09T13:36:34Z",
build_snapshot: false,
lucene_version: "4.10.4"
},
tagline: "You Know, for Search"
}

查看集群状态

http://192.168.99.100:9200/_plugin/head/
图片描述
也可以用命令行

curl -XGET http://192.168.99.100:9200/_cluster/health?pretty

返回

{
  "cluster_name" : "cn-out-of-box",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 1,
  "active_shards" : 1,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 1,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0
}

这里目前只是单节点的,后续弄成集群看看。

查看插件

http://192.168.99.100:9200/_plugin/oob
图片描述

增删改查

增加

curl -XPUT 'http://192.168.99.100:9200/twitter/tweet/1' -d '{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elastic Search"
}'

返回

{"_index":"twitter","_type":"tweet","_id":"1","_version":1,"created":true}%

查询

curl -XGET 'http://192.168.99.100:9200/twitter/tweet/1'

返回

{"_index":"twitter","_type":"tweet","_id":"1","_version":1,"found":true,"_source":{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elastic Search"
}}%

图片描述
高级查询:选择字段

curl -XGET 'http://192.168.99.100:9200/twitter/tweet/1?fields=message,user&pretty=true'

返回

{
  "_index" : "twitter",
  "_type" : "tweet",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "fields" : {
    "message" : [ "trying out Elastic Search" ],
    "user" : [ "kimchy" ]
  }
}

高级查询:选择格式

curl -XGET 'http://192.168.99.100:9200/twitter/tweet/1?fields=message,user&format=yaml'

返回

---
_index: "twitter"
_type: "tweet"
_id: "1"
_version: 1
found: true
fields:
  message:
  - "trying out Elastic Search"
  user:
  - "kimchy"

更新

curl -X PUT http://192.168.99.100:9200/twitter/tweet/1 -d '{"message": "hello world", "user": "codecraft"}'

返回

{"_index":"twitter","_type":"tweet","_id":"1","_version":2,"created":false}%

这个是覆盖更新,不是局部更新:

 ~  curl -XGET 'http://192.168.99.100:9200/twitter/tweet/1'
{"_index":"twitter","_type":"tweet","_id":"1","_version":2,"found":true,"_source":{"message": "hello world", "user": "codecraft"}}%

删除

curl -XDELETE 'http://192.168.99.100:9200/twitter/tweet/1'

返回

{"found":true,"_index":"twitter","_type":"tweet","_id":"1","_version":3}%

查看mapping

{
    "twitter": {
        "mappings": {
            "tweet": {
                "properties": {
                    "message": {
                        "type": "string"
                    }, 
                    "post_date": {
                        "type": "date", 
                        "format": "dateOptionalTime"
                    }, 
                    "user": {
                        "type": "string"
                    }
                }
            }
        }
    }
}

索引分析

http://192.168.99.100:9200/twitter/_analyze?field=message&text=hello%20world

{
    "tokens": [
        {
            "token": "hello", 
            "start_offset": 0, 
            "end_offset": 5, 
            "type": "<ALPHANUM>", 
            "position": 1
        }, 
        {
            "token": "world", 
            "start_offset": 6, 
            "end_offset": 11, 
            "type": "<ALPHANUM>", 
            "position": 2
        }
    ]
}

参考

查看原文

ltxlong 收藏了文章 · 11月9日

PHP-FPM 与 Nginx 的通信机制总结

PHP-FPM 介绍

CGI 协议与 FastCGI 协议
每种动态语言( PHP,Python 等)的代码文件需要通过对应的解析器才能被服务器识别,而 CGI 协议就是用来使解释器与服务器可以互相通信。PHP 文件在服务器上的解析需要用到 PHP 解释器,再加上对应的 CGI 协议,从而使服务器可以解析到 PHP 文件。

由于 CGI 的机制是每处理一个请求需要 fork 一个 CGI 进程,请求结束再kill掉这个进程,在实际应用上比较浪费资源,于是就出现了CGI 的改良版本 FastCGI,FastCGI 在请求处理完后,不会 kill 掉进程,而是继续处理多个请求,这样就大大提高了效率。

PHP-FPM 是什么
PHP-FPM 即 PHP-FastCGI Process Manager, 它是 FastCGI 的实现,并提供了进程管理的功能。进程包含 master 进程和 worker 进程两种;master 进程只有一个,负责监听端口,接收来自服务器的请求,而 worker 进程则一般有多个(具体数量根据实际需要进行配置),每个进程内部都会嵌入一个 PHP 解释器,是代码真正执行的地方。

Nginx 与 php-fpm 通信机制

当我们访问一个网站(如 www.test.com)的时候,处理流程是这样的:

www.test.com
        |
        |
      Nginx
        |
        |
路由到 www.test.com/index.php
        |
        |
加载 nginx 的 fast-cgi 模块
        |
        |
fast-cgi 监听 127.0.0.1:9000 地址
        |
        |
www.test.com/index.php 请求到达 127.0.0.1:9000
        |
        |
     等待处理...

Nginx 与 php-fpm 的结合

在 Linux 上,nginx 与 php-fpm 的通信有 tcp socket 和 unix socket 两种方式。

tcp socket 的优点是可以跨服务器,当 nginx 和 php-fpm 不在同一台机器上时,只能使用这种方式。

Unix socket 又叫 IPC(inter-process communication 进程间通信) socket,用于实现同一主机上的进程间通信,这种方式需要在 nginx配置文件中填写 php-fpm 的 socket 文件位置。

两种方式的数据传输过程如下图所示:
20190128194727.png

二者的不同:

由于 Unix socket 不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。所以其效率比 tcp socket 的方式要高,可减少不必要的 tcp 开销。不过,unix socket 高并发时不稳定,连接数爆发时,会产生大量的长时缓存,在没有面向连接协议的支撑下,大数据包可能会直接出错不返回异常。而 tcp 这样的面向连接的协议,可以更好的保证通信的正确性和完整性。

Nginx 与 php-fpm 结合只需要在各自的配置文件中做设置即可:

1、 Nginx 中的配置

以 tcp socket通信为例

server {
    listen       80; #监听 80 端口,接收http请求
    server_name  www.test.com; #就是网站地址
    root /usr/local/etc/nginx/www/huxintong_admin; # 准备存放代码工程的路径
    #路由到网站根目录 www.test.com 时候的处理
    location / {
        index index.php; #跳转到 www.test.com/index.php
        autoindex on;
    }   

    #当请求网站下 php 文件的时候,反向代理到 php-fpm
    location ~ \.php$ {
        include /usr/local/etc/nginx/fastcgi.conf; #加载 nginx 的 fastcgi 模块
        fastcgi_intercept_errors on;
        fastcgi_pass   127.0.0.1:9000; # tcp 方式,php-fpm 监听的 IP 地址和端口
       # fasrcgi_pass /usr/run/php-fpm.sock # unix socket 连接方式
    }

}

2、 php-fpm 的配置

listen = 127.0.0.1:9000
# 或者下面这样
listen = /var/run/php-fpm.sock
注意,在使用 unix socket 方式连接时,由于 socket 文件本质上是一个文件,存在权限控制的问题,所以需要注意 nginx
进程的权限与 php-fpm 的权限问题,不然会提示无权限访问。(在各自的配置文件里设置用户)
通过以上配置即可完成 php-fpm 与 nginx 的通信。

在应用中的选择

如果是在同一台服务器上运行的 nginx 和 php-fpm,且并发量不高(不超过1000),选择unix socket,以提高 nginx 和 php-fpm 的通信效率。
如果是面临高并发业务,则考虑选择使用更可靠的 tcp socket,以负载均衡、内核优化等运维手段维持效率。

若并发较高但仍想用 unix socket 时,可通过以下方式提高 unix socket 的稳定性。

1)将sock文件放在 /dev/shm 目录下,此目录下将 sock 文件放在内存里面,内存的读写更快。

2)提高 backlog

backlog 默认位 128,1024 这个值换成自己正常的 QPS,配置如下。

nginx.conf 文件中

server {
        listen 80 default backlog = 1024;
       }

php-fpm.conf 文件中

listen.backlog = 1024

3)增加 sock 文件和 php-fpm 实例

在 /dev/shm 新建一个 sock 文件,在 nginx 中通过 upstream 模块将请求负载均衡到两个 sock 文件,并且将两个 sock 文件分别对应到两套 php-fpm 实例上。

查看原文

ltxlong 收藏了文章 · 11月4日

Nginx面试中最常见的18道题 抱佛脚必备

Nginx的并发能力在同类型网页服务器中的表现,相对而言是比较好的,因此受到了很多企业的青睐,我国使用Nginx网站的知名用户包括腾讯、淘宝、百度、京东、新浪、网易等等。Nginx是网页服务器运维人员必备技能之一,下面为大家整理了一些比较常见的Nginx相关面试题,仅供参考:

1、请解释一下什么是Nginx?

Nginx是一个web服务器和反向代理服务器,用于HTTPHTTPSSMTPPOP3IMAP协议。

2、请列举Nginx的一些特性。

Nginx服务器的特性包括:

反向代理/L7负载均衡器

嵌入式Perl解释器

动态二进制升级

可用于重新编写URL,具有非常好的PCRE支持

3、请列举NginxApache 之间的不同点。

图片描述

4、请解释Nginx如何处理HTTP请求。

Nginx使用反应器模式。主事件循环等待操作系统发出准备事件的信号,这样数据就可以从套接字读取,在该实例中读取到缓冲区并进行处理。单个线程可以提供数万个并发连接。

5、在Nginx中,如何使用未定义的服务器名称来阻止处理请求?

只需将请求删除的服务器就可以定义为:

Server {

listen 80;

server_name “ “ ;

return 444;

}

这里,服务器名被保留为一个空字符串,它将在没有“主机”头字段的情况下匹配请求,而一个特殊的Nginx的非标准代码444被返回,从而终止连接。

6、 使用“反向代理服务器”的优点是什么?

反向代理服务器可以隐藏源服务器的存在和特征。它充当互联网云和web服务器之间的中间层。这对于安全方面来说是很好的,特别是当您使用web托管服务时。

7、请列举Nginx服务器的最佳用途。

Nginx服务器的最佳用法是在网络上部署动态HTTP内容,使用SCGIWSGI应用程序服务器、用于脚本的FastCGI处理程序。它还可以作为负载均衡器。

8、请解释Nginx服务器上的MasterWorker进程分别是什么?

Master进程:读取及评估配置和维持

Worker进程:处理请求

9、请解释你如何通过不同于80的端口开启Nginx?

为了通过一个不同的端口开启Nginx,你必须进入/etc/Nginx/sites-enabled/,如果这是默认文件,那么你必须打开名为“default”的文件。编辑文件,并放置在你想要的端口:

Like server { listen 81; }

10、请解释是否有可能将Nginx的错误替换为502错误、503?

502 =错误网关

503 =服务器超载

有可能,但是您可以确保fastcgi_intercept_errors被设置为ON,并使用错误页面指令。

Location / {
fastcgi_pass 127.0.01:9001;
fastcgi_intercept_errors on;
error_page 502 =503/error_page.html;
#…
}

11、在Nginx中,解释如何在URL中保留双斜线?

要在URL中保留双斜线,就必须使用merge_slashes_off;

语法:merge_slashes [on/off]

默认值: merge_slashes on

环境: http,server

12、请解释ngx_http_upstream_module的作用是什么?

ngx_http_upstream_module用于定义可通过fastcgi传递、proxy传递、uwsgi传递、memcached传递和scgi传递指令来引用的服务器组。

13、请解释什么是C10K问题?

C10K问题是指无法同时处理大量客户端(10,000)的网络套接字。

14、请陈述stub_statussub_filter指令的作用是什么?

Stub_status指令:该指令用于了解Nginx当前状态的当前状态,如当前的活动连接,接受和处理当前读/写/等待连接的总数

Sub_filter指令:它用于搜索和替换响应中的内容,并快速修复陈旧的数据

15、解释Nginx是否支持将请求压缩到上游?

您可以使用Nginx模块gunzip将请求压缩到上游。gunzip模块是一个过滤器,它可以对不支持“gzip”编码方法的客户机或服务器使用“内容编码:gzip”来解压缩响应。

16、解释如何在Nginx中获得当前的时间?

要获得Nginx的当前时间,必须使用SSI模块、$date_gmt$date_local的变量。

Proxy_set_headerTHE-TIME $date_gmt;

17、用Nginx服务器解释-s的目的是什么?

用于运行Nginx -s参数的可执行文件。

18、解释如何在Nginx服务器上添加模块?

在编译过程中,必须选择Nginx模块,因为Nginx不支持模块的运行时间选择。

查看原文

ltxlong 收藏了文章 · 11月3日

nginx、swoole高并发原理初探

一、阅前热身

为了更加形象的说明同步异步、阻塞非阻塞,我们以小明去买奶茶为例。

1、同步与异步

①同步与异步的理解

同步与异步的重点在消息通知的方式上,也就是调用结果通知的方式。

  • 同步
    当一个同步调用发出去后,调用者要一直等待调用结果的通知后,才能进行后续的执行

  • 异步:
    当一个异步调用发出去后,调用者不能立即得到调用结果的返回。

异步调用,要想获得结果,一般有两种方式:
1、主动轮询异步调用的结果;
2、被调用方通过callback来通知调用方调用结果。

②:生活实例

同步买奶茶:小明点单交钱,然后等着拿奶茶;
异步买奶茶:小明点单交钱,店员给小明一个小票,等小明奶茶做好了,再来取。

异步买奶茶,小明要想知道奶茶是否做好了,有两种方式:
1、小明主动去问店员,一会就去问一下:“奶茶做好了吗?”...直到奶茶做好。
2、等奶茶做好了,店员喊一声:“小明,奶茶好了!”,然后小明去取奶茶。


2、阻塞与非阻塞

①阻塞与非阻塞的理解

阻塞与非阻塞的重点在于进/线程等待消息时候的行为,也就是在等待消息的时候,当前进/线程是挂起状态,还是非挂起状态。

  • 阻塞
    阻塞调用在发出去后,在消息返回之前,当前进/线程会被挂起,直到有消息返回,当前进/线程才会被激活.

  • 非阻塞
    非阻塞调用在发出去后,不会阻塞当前进/线程,而会立即返回。

②:生活实例

阻塞买奶茶:小明点单交钱,干等着拿奶茶,什么事都不做;
非阻塞买奶茶:小明点单交钱,等着拿奶茶,等的过程中,时不时刷刷微博、朋友圈...


3、总结

通过上面的分析,我们可以得知:

同步与异步,重点在于消息通知的方式;阻塞与非阻塞,重点在于等消息时候的行为。

所以,就有了下面4种组合方式

  • 同步阻塞:小明在柜台干等着拿奶茶;

  • 同步非阻塞:小明在柜台边刷微博边等着拿奶茶;

  • 异步阻塞:小明拿着小票啥都不干,一直等着店员通知他拿奶茶;

  • 异步非阻塞:小明拿着小票,刷着微博,等着店员通知他拿奶茶。


二、Nginx如何处理高并发

1、Apache面对高并发,为什么很无力?

Apache处理一个请求是同步阻塞的模式。

image_1b2ianfo42ag28ghrgl4u18sa13.png-93.4kB

每到达一个请求,Apache都会去fork一个子进程去处理这个请求,直到这个请求处理完毕。

面对低并发,这种模式没什么缺点,但是,面对高并发,就是这种模式的软肋了。

  • 1个客户端占用1个进程,那么,进程数量有多少,并发处理能力就有多少,但操作系统可以创建的进程数量是有限的。

  • 多进程就会有进程间的切换问题,而进程间的切换调度势必会造成CPU的额外消耗。当进程数量达到成千上万的时候,进程间的切换就占了CPU大部分的时间片,而真正进程的执行反而占了CPU的一小部分,这就得不偿失了。

下面,举例说明这2种场景是多进程模式的软肋:

  • 及时消息通知程序
    比如及时聊天程序,一台服务器可能要维持数十万的连接(典型的C10K问题),那么就要启动数十万的进程来维持。这显然不可能。

  • 调用外部Http接口时
    假设Apache启动100个进程来处理请求,每个请求消耗100ms,那么这100个进程能提供1000qps。

但是,在我们调用外部Http接口时,比如QQ登录、微博登录,耗时较长,假设一个请求消耗10s,也就是1个进程1s处理0.1个请求,那么100个进程只能达到10qps,这样的处理能力就未免太差了。

注:什么是C10K问题?
网络服务在处理数以万计的客户端连接时,往往出现效率低下甚至完全瘫痪,这被称为C10K问题。(concurrent 10000 connection)

综上,我们可以看出,Apache是同步阻塞的多进程模式,面对高并发等一些场景,是很苍白的。


2、Nginx何以问鼎高并发?

传统的服务器模型就是这样,因为其同步阻塞的多进程模型,无法面对高并发。
那么,有没有一种方式,可以让我们在一个进程处理所有的并发I/O呢?
答案是有的,这就是I/O复用技术。

①、I/O复用是神马?

最初级的I/O复用

所谓的I/O复用,就是多个I/O可以复用一个进程。

上面说的同步阻塞的多进程模型不适合处理高并发,那么,我们再来考虑非阻塞的方式。

采用非阻塞的模式,当一个连接过来时,我们不阻塞住,这样一个进程可以同时处理多个连接了。

比如一个进程接受了10000个连接,这个进程每次从头到尾的问一遍这10000个连接:“有I/O事件没?有的话就交给我处理,没有的话我一会再来问一遍。
然后进程就一直从头到尾问这10000个连接,如果这1000个连接都没有I/O事件,就会造成CPU的空转,并且效率也很低,不好不好。

升级版的I/O复用

上面虽然实现了基础版的I/O复用,但是效率太低了。于是伟大的程序猿们日思夜想的去解决这个问题...终于!

我们能不能引入一个代理,这个代理可以同时观察许多I/O流事件呢?

当没有I/O事件的时候,这个进程处于阻塞状态;当有I/O事件的时候,这个代理就去通知进程醒来?

于是,早期的程序猿们发明了两个代理---select、poll。

select、poll代理的原理是这样的:

当连接有I/O流事件产生的时候,就会去唤醒进程去处理。

但是进程并不知道是哪个连接产生的I/O流事件,于是进程就挨个去问:“请问是你有事要处理吗?”......问了99999遍,哦,原来是第100000个进程有事要处理。那么,前面这99999次就白问了,白白浪费宝贵的CPU时间片了!痛哉,惜哉...

注:select与poll原理是一样的,只不过select只能观察1024个连接,poll可以观察无限个连接。

上面看了,select、poll因为不知道哪个连接有I/O流事件要处理,性能也挺不好的。

那么,如果发明一个代理,每次能够知道哪个连接有了I/O流事件,不就可以避免无意义的空转了吗?

于是,超级无敌、闪闪发光的epoll被伟大的程序员发明出来了。

epoll代理的原理是这样的:

当连接有I/O流事件产生的时候,epoll就会去告诉进程哪个连接有I/O流事件产生,然后进程就去处理这个进程。

如此,多高效!

②、基于epoll的Nginx

有了epoll,理论上1个进程就可以无限数量的连接,而且无需轮询,真正解决了c10k的问题。

Nginx是基于epoll的,异步非阻塞的服务器程序。自然,Nginx能够轻松处理百万级的并发连接,也就无可厚非了。

三、swoole如何处理高并发以及异步I/O的实现

1、swoole介绍

swoole是PHP的一个扩展。
简单理解:swoole=异步I/O+网络通信
PHPer可以基于swoole去实现过去PHP无法实现的功能。
具体请参考swoole官网:swoole官网


2、swoole如何处理高并发

①Reactor模型介绍

IO复用异步非阻塞程序使用经典的Reactor模型,Reactor顾名思义就是反应堆的意思,它本身不处理任何数据收发。只是可以监视一个socket(也可以是管道、eventfd、信号)句柄的事件变化。

注:什么是句柄?句柄英文为handler,可以形象的比喻为锅柄、勺柄。也就是资源的唯一标识符、资源的ID。通过这个ID可以操作资源。

image_1b2iad9fimm51tc5f4c13tj2009.png-63.3kB

Reactor只是一个事件发生器,实际对socket句柄的操作,如connect/accept、send/recv、close是在callback中完成的。

②swoole的架构

swoole采用 多线程Reactor+多进程Worker

swoole的架构图如下:

image_1b2iakmlo6gtba5m6ktfp9esm.png-125.1kB

swoole的处理连接流程图如下:

image_1b2ib3qhpkbajn4l2u2go1vc51g.png-62.5kB

当请求到达时,swoole是这样处理的:

请求到达 Main Reactor
        |
        |
Main Reactor根据Reactor的情况,将请求注册给对应的Reactor
(每个Reactor都有epoll。用来监听客户端的变化)
        |
        |
客户端有变化时,交给worker来处理
        |
        |
worker处理完毕,通过进程间通信(比如管道、共享内存、消息队列)发给对应的reactor。
        |
        |
reactor将响应结果发给相应的连接
        |
        |
    请求处理完成

因为reactor基于epoll,所以每个reactor可以处理无数个连接请求。
如此,swoole就轻松的处理了高并发。

3、swoole如何实现异步I/O

基于上面的Swoole结构图,我们看到swoole的worker进程有2种类型:
一种是 普通的worker进程,一种是 task worker进程。

worker进程是用来处理普通的耗时不是太长的请求;
task worker进程用来处理耗时较长的请求,比如数据库的I/O操作。

我们以异步Mysql举例:

耗时较长的Mysql查询进入worker
            |
            |
worker通过管道将这个请求交给taskworker来处理
            |
            |
worker再去处理其他请求
            |
            |
task worker处理完毕后,处理结果通过管道返回给worker
            |
            |
worker 将结果返回给reactor
            |
            |
reactor将结果返回给请求方

如此,通过worker、task worker结合的方式,我们就实现了异步I/O。

四、参考文章

Nginx 多进程模型是如何实现高并发的?
PHP并发IO编程之路
epoll 或者 kqueue 的原理是什么?
IO 多路复用是什么意思?

更多精彩,请关注公众号“聊聊代码”,让我们一起聊聊“左手代码右手诗”的事儿。
图片描述

查看原文

ltxlong 收藏了文章 · 11月2日

【SWOOLE系列】谈谈reactor

前言

环境说明

php --ri swoole

swoole

Swoole => enabled
Author => Swoole Team <team@swoole.com>
Version => 4.4.4
Built => Aug 27 2019 11:

php -version
PHP 7.3.7 (cli) (built: Jul  5 2019 12:44:05) ( NTS )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.3.7, Copyright (c) 1998-2018 Zend Technologies
    with Zend OPcache v7.3.7, Copyright (c) 1999-2018, by Zend Technologies

通过swoole如何不用nginxphp-fpm快速启动一个http服务

<?php
$http = new Swoole\Http\Server("0.0.0.0", 9501);

$http->on('request', function ($request, $response) {
    $response->header("Content-Type", "text/html; charset=utf-8");
    $response->end("立的flag,含着泪,吃着屎也要做到");
});

$http->start();

cli下用php server.php启动服务

curl http://localhost:9501
立的flag,含着泪,吃着屎也要做到

服务是启动完了,性能如何?
那么问题来了!不卖关子了,直接给个结论吧 swoole启动的server 远大于 fpm下的http服务。(可以自行本地ab看结果)

php做web的瓶颈在哪?

大家在论坛、贴吧、社区经常看到以下对话
狗蛋A:php是世界最好的语言
狗蛋B:去你的,php性能差
狗蛋C:哟!php竟然还活着。
狗蛋D:php的最终结果都是转向java

。。。。。
这几年我也看淡了,这里我只想说php的确是世界最好的语言。
1538363198814.gif
至于php有没有问题,我只能说的确有问题,每个语言都有自己的或多或少的问题,你让那些天天吹go好的人,你问问他们用go CURD的开心不开心!
php被嘲讽最多的还是性能问题,你说的一点都没有错,追求极致的性能php还真不行(除非无脑堆机器)

lnmp

在具体说php性能问题之前,我们可以再来回顾下 lnmp
L:linux
N:nginx
M:mysql
P:php
让我们来康康这四个东西,我们发现不管啥语言基本上都依赖LinuxNginxMysql。那么这仨肯定没锅,问题就落到了php上。
nginx是没法直接解析php,那么借助是php-fpm
整个工作流程如下:
image.png

php-fpm是一个多进程单线程的模型,如果一个请求卡了60s吗,那么这个php-fpm就属于占着茅坑不拉屎待岗状态。那么我们得出结论,服务器承载的最高并发的短板是fpm的数量,那么有人会说无脑设置上线的fpm的数量不就好了吗?那么恭喜你可以直接找下一份工作了!
每个fpm平均大概占用内存20到30mb 那么机器支持最高的fpm数量公式如下:
fpm数量 = 机器内存 * 1024M * 0.8 / 30 (20)

乘0.8 主要考虑做人留一线,非要榨干服务器内存干啥,影响多不好啊

我们试着想一个问题nginx的是怎么演化的? php做web是否可以跟nginx一样
select -> poll -> epoll

没错swoole可以!php-fpm做不到的,但是swoole做到了。
image.png

reactor

关于reactor的基本概念网络一大堆,可以总结以下几点
1.I/O多路复用
2.事件注册、分发、调度
3.异步非堵塞
基于reactor实现的大家做熟悉nginxredis等(主要我也就知道这两个)
我们接下来来康康swoole怎么结合reactor

swoole 的 server的运行流程图

阅读源码之前我们先看看server的运行流程图

process.jpg
swoole.jpg
3.png

来自https://wiki.swoole.com/wiki/...

探索真相

根据上述的demo我可以看到new Swoole\Http\Server("0.0.0.0", 9501)创建一个server

我们可以定位源码到swoole_server.ccstatic PHP_METHOD(swoole_server, __construct)

static PHP_METHOD(swoole_server, __construct)
{
    ......
    // 初始化
    zval *zserv = ZEND_THIS;
    char *host;
    size_t host_len = 0;
    zend_long sock_type = SW_SOCK_TCP;
    zend_long serv_port = 0;
    zend_long serv_mode = SW_MODE_PROCESS;

    // 看到木有 只能cli执行
    if (!SWOOLE_G(cli))
    {
        zend_throw_exception_ex(swoole_exception_ce, -1, "%s can only be used in CLI mode", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    if (sw_server() != NULL)
    {
        zend_throw_exception_ex(swoole_exception_ce, -3, "server is running. unable to create %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    .....
    // serv_mode SWOOLE_BASE、SWOOLE_PROCESS 具体看官方wiki解释

    if (serv_mode != SW_MODE_BASE && serv_mode != SW_MODE_PROCESS)
    {
        php_swoole_fatal_error(E_ERROR, "invalid $mode parameters %d", (int) serv_mode);
        RETURN_FALSE;
    }

    // 申请内存
    serv = (swServer *) sw_malloc(sizeof(swServer));
    if (!serv)
    {
        zend_throw_exception_ex(swoole_exception_ce, errno, "malloc(%ld) failed", sizeof(swServer));
        RETURN_FALSE;
    }

    swServer_init(serv);
   
    ....
}

在构造方法里 我们并没有看到reactor相关的代码,我们继续往下追$http->start()定位到
static PHP_METHOD(swoole_server, start)

static PHP_METHOD(swoole_server, start)
{
    zval *zserv = ZEND_THIS;
    // 读一读什么叫标准的命名 获取server 并且 检测这个服务 多么通熟易懂
    swServer *serv = php_swoole_server_get_and_check_server(zserv);

    if (serv->gs->start > 0)
    {
        php_swoole_fatal_error(E_WARNING, "server is running, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }
    if (serv->gs->shutdown > 0)
    {
        php_swoole_fatal_error(E_WARNING, "server have been shutdown, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    if (SwooleTG.reactor)
    {
        php_swoole_fatal_error(E_WARNING, "eventLoop has already been created, unable to start %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    .....

    // swoole服务启动前的前置工作
    php_swoole_server_before_start(serv, zserv);
    // 启动sever
    if (swServer_start(serv) < 0)
    {
        php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
    }

    RETURN_TRUE;
}

按照我多年的搬屎山的直觉来说,再启动之前肯定需要做准备工作,那么reactor的初始化肯定在php_swoole_server_before_start

void php_swoole_server_before_start(swServer *serv, zval *zobject)
{
    /**
     * create swoole server
     */
    if (swServer_create(serv) < 0)
    {
        php_swoole_fatal_error(E_ERROR, "failed to create the server. Error: %s", sw_error);
        return;
    }
    .....
int swServer_create(swServer *serv)
{
    serv->factory.ptr = serv;

    serv->session_list = (swSession *) sw_shm_calloc(SW_SESSION_LIST_SIZE, sizeof(swSession));
    if (serv->session_list == NULL)
    {
        swError("sw_shm_calloc(%ld) for session_list failed", SW_SESSION_LIST_SIZE * sizeof(swSession));
        return SW_ERR;
    }

    if (serv->enable_static_handler && serv->locations == nullptr)
    {
        serv->locations = new std::unordered_set<std::string>;
    }

    if (serv->factory_mode == SW_MODE_BASE)
    {
        return swReactorProcess_create(serv);
    }
    else
    {
        return swReactorThread_create(serv);
    }
}

可以发现根据不同的执行模式 创建reactor也是不同,不过这次我们只看swReactorProcess_create 开始分析瞎逼逼这块代码
5.jpg

reactor

int swReactorProcess_create(swServer *serv)
{
    serv->reactor_num = serv->worker_num;
    serv->connection_list = (swConnection *) sw_calloc(serv->max_connection, sizeof(swConnection));
    if (serv->connection_list == NULL)
    {
        swSysWarn("calloc[2](%d) failed", (int )(serv->max_connection * sizeof(swConnection)));
        return SW_ERR;
    }
    //create factry object
    if (swFactory_create(&(serv->factory)) < 0)
    {
        swError("create factory failed");
        return SW_ERR;
    }
    serv->factory.finish = swReactorProcess_send2client;
    return SW_OK;
}

其实这个函数只是将swserver这个结构体初始化对应的属性和回调函数
1.reactor_num
2.conection_list 初始化好空间
3.finish的结束回调函数
4.factory

int swFactory_create(swFactory *factory)
{
    factory->dispatch = swFactory_dispatch;
    factory->finish = swFactory_finish;
    factory->start = swFactory_start;
    factory->shutdown = swFactory_shutdown;
    factory->end = swFactory_end;
    factory->notify = swFactory_notify;
    factory->free = swFactory_free;
    return SW_OK;
}

再初始化swServer后就可以看swServer_start 参数是之前初始化构造好的swServer

int swServer_start(swServer *serv)
{
    // factory 可以定位到swFactory_create
    swFactory *factory = &serv->factory;
    int ret;

    // 启动前的检测 判断不同mode下的参数 和 php上游回调函数的是否构造 比如onTask等。。
    ret = swServer_start_check(serv);
    if (ret < 0)
    {
        return SW_ERR;
    }
    // 检测钩子
    if (SwooleG.hooks[SW_GLOBAL_HOOK_BEFORE_SERVER_START])
    {
        swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_START, serv);
    }
    // sw_atomic_cmp_set 此处理解成一个锁 也就是同时时间只能存在一个服务 
    //cannot start 2 servers at the same time, please use process->exec.
    if (!sw_atomic_cmp_set(&serv->gs->start, 0, 1))
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_ONLY_START_ONE, "must only start one server");
        return SW_ERR;
    }
    // 这块标准输出 大家应该都懂 跳过跳过
    //run as daemon
    if (serv->daemonize > 0)
    {
        /**
         * redirect STDOUT to log file
         */
        if (SwooleG.log_fd > STDOUT_FILENO)
        {
            swoole_redirect_stdout(SwooleG.log_fd);
        }
        /**
         * redirect STDOUT_FILENO/STDERR_FILENO to /dev/null
         */
        else
        {
            serv->null_fd = open("/dev/null", O_WRONLY);
            if (serv->null_fd > 0)
            {
                swoole_redirect_stdout(serv->null_fd);
            }
            else
            {
                swSysWarn("open(/dev/null) failed");
            }
        }

        if (swoole_daemon(0, 1) < 0)
        {
            return SW_ERR;
        }
    }

    //master pid
    // 获取对应的master进程和启动时间
    serv->gs->master_pid = getpid();
    serv->stats->start_time = time(NULL);

    /**
     * init method
     */
     // 继续初始化关于tcp相关函数
    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;
    serv->notify = swServer_tcp_notify;
    serv->feedback = swServer_tcp_feedback;
    // 申请worker的对应的内存空间
    serv->workers = (swWorker *) SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker));
    if (serv->workers == NULL)
    {
        swSysWarn("gmalloc[server->workers] failed");
        return SW_ERR;
    }

    if (swMutex_create(&serv->lock, 0) < 0)
    {
        return SW_ERR;
    }

    /**
     * store to swProcessPool object
     */
    
    serv->gs->event_workers.ptr = serv;
    serv->gs->event_workers.workers = serv->workers;
    serv->gs->event_workers.worker_num = serv->worker_num;
    serv->gs->event_workers.use_msgqueue = 0;

    uint32_t i;
    for (i = 0; i < serv->worker_num; i++)
    {
        serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
        serv->gs->event_workers.workers[i].id = i;
        serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER;
    }

    /*
     * For swoole_server->taskwait, create notify pipe and result shared memory.
     */
    if (serv->task_worker_num > 0 && serv->worker_num > 0)
    {
        serv->task_result = (swEventData *) sw_shm_calloc(serv->worker_num, sizeof(swEventData));
        if (!serv->task_result)
        {
            swWarn("malloc[serv->task_result] failed");
            return SW_ERR;
        }
        serv->task_notify = (swPipe *) sw_calloc(serv->worker_num, sizeof(swPipe));
        if (!serv->task_notify)
        {
            swWarn("malloc[serv->task_notify] failed");
            sw_shm_free(serv->task_result);
            return SW_ERR;
        }
        for (i = 0; i < serv->worker_num; i++)
        {
            if (swPipeNotify_auto(&serv->task_notify[i], 1, 0))
            {
                sw_shm_free(serv->task_result);
                sw_free(serv->task_notify);
                return SW_ERR;
            }
        }
    }

    /**
     * user worker process
     */
    if (serv->user_worker_list)
    {
        i = 0;
        for (auto worker : *serv->user_worker_list)
        {
        // 此处可以看看 worker id的生成机制 有没有课代表解释下 要两个woker_num
            worker->id = serv->worker_num + serv->task_worker_num + i;
            i++;
        }
    }
    serv->running = 1;
    //factory start
    if (factory->start(factory) < 0)
    {
        return SW_ERR;
    }
    // 注册信号机制
    swServer_signal_init(serv);

    //write PID file
    if (serv->pid_file)
    {
        ret = sw_snprintf(SwooleTG.buffer_stack->str, SwooleTG.buffer_stack->size, "%d", getpid());
        swoole_file_put_contents(serv->pid_file, SwooleTG.buffer_stack->str, ret);
    }
    if (serv->factory_mode == SW_MODE_BASE)
    {
        ret = swReactorProcess_start(serv);
    }
    else
    {
        ret = swReactorThread_start(serv);
    }
    //failed to start
    if (ret < 0)
    {
        return SW_ERR;
    }
    swServer_destory(serv);
    //remove PID file
    if (serv->pid_file)
    {
        unlink(serv->pid_file);
    }
    return SW_OK;
}

代码很长,我们概括为几件事情

  1. 启动检测
  2. 检查钩子 & 执行钩子
  3. 锁判断
  4. 如果daemon启动更改标准输出
  5. 初始化函数和初始化基本的信息(pid、时间、woker的内存等)
  6. 创建管道、共享内存
  7. 创建信号处理机制
  8. 创建 & 启动 woker、task、reactor
  9. 撒花完结

所以我们就单独看看8

    if (serv->factory_mode == SW_MODE_BASE)
    {
        ret = swReactorProcess_start(serv);
    }
    else
    {
        ret = swReactorThread_start(serv);
    }

同样我们还是只看SW_MODE_BASE

int swReactorProcess_start(swServer *serv)
{
    // 此处需要主要下 SW_MODE_BASE下是单线程模式
    serv->single_thread = 1;

    //监听tcp
    if (serv->have_stream_sock == 1)
    {
        for (auto ls : *serv->listen_list)
        {
        // 过滤udp
            if (swSocket_is_dgram(ls->type))
            {
                continue;
            }
            // 复用端口的处理 
#ifdef HAVE_REUSEPORT
            if (serv->enable_reuse_port)
            {
                if (close(ls->socket->fd) < 0)
                {
                    swSysWarn("close(%d) failed", ls->socket->fd);
                }
                continue;
            }
            else
#endif
            {
                //监听socket
                if (swPort_listen(ls) < 0)
                {
                    return SW_ERR;
                }
            }
        }
    }

    swProcessPool *pool = &serv->gs->event_workers;
    if (swProcessPool_create(pool, serv->worker_num, 0, SW_IPC_UNIXSOCK) < 0)
    {
        return SW_ERR;
    }
    swProcessPool_set_max_request(pool, serv->max_request, serv->max_request_grace);

    /**
     * store to swProcessPool object
     */
    serv->gs->event_workers.ptr = serv;
    serv->gs->event_workers.max_wait_time = serv->max_wait_time;
    serv->gs->event_workers.use_msgqueue = 0;
    serv->gs->event_workers.main_loop = swReactorProcess_loop;
    serv->gs->event_workers.onWorkerNotFound = swManager_wait_other_worker;

    uint32_t i;
    for (i = 0; i < serv->worker_num; i++)
    {
        serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
        serv->gs->event_workers.workers[i].id = i;
        serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER;
    }

    //single worker
    if (swServer_is_single(serv))
    {
        return swReactorProcess_loop(&serv->gs->event_workers, &serv->gs->event_workers.workers[0]);
    }

    for (i = 0; i < serv->worker_num; i++)
    {
        if (swServer_worker_create(serv, &serv->gs->event_workers.workers[i]) < 0)
        {
            return SW_ERR;
        }
    }

    //task workers
    if (serv->task_worker_num > 0)
    {
        if (swServer_create_task_workers(serv) < 0)
        {
            return SW_ERR;
        }
        swTaskWorker_init(serv);
        if (swProcessPool_start(&serv->gs->task_workers) < 0)
        {
            return SW_ERR;
        }
    }

    /**
     * create user worker process
     */
    if (serv->user_worker_list)
    {
        serv->user_workers = (swWorker *) sw_malloc(serv->user_worker_num * sizeof(swWorker));
        if (serv->user_workers == NULL)
        {
            swSysWarn("gmalloc[server->user_workers] failed");
            return SW_ERR;
        }
        for (auto worker : *serv->user_worker_list)
        {
            /**
             * store the pipe object
             */
            if (worker->pipe_object)
            {
                swServer_store_pipe_fd(serv, worker->pipe_object);
            }
            swManager_spawn_user_worker(serv, worker);
        }
    }

    /**
     * manager process is the same as the master process
     */
    SwooleG.pid = serv->gs->manager_pid = getpid();
    SwooleG.process_type = SW_PROCESS_MANAGER;

    /**
     * manager process can not use signalfd
     */
    SwooleG.use_signalfd = 0;

    swProcessPool_start(&serv->gs->event_workers);
    swServer_signal_init(serv);

    if (serv->onStart)
    {
        swWarn("The onStart event with SWOOLE_BASE is deprecated");
        serv->onStart(serv);
    }

    if (serv->onManagerStart)
    {
        serv->onManagerStart(serv);
    }

    swProcessPool_wait(&serv->gs->event_workers);
    swProcessPool_shutdown(&serv->gs->event_workers);

    swManager_kill_user_workers(serv);

    if (serv->onManagerStop)
    {
        serv->onManagerStop(serv);
    }

    return SW_OK;
}

继续总结该函做的事情

  1. 监听tcp 端口 & 端口复用
  2. 创建woker进程
  3. 创建task进程
  4. 信号处理
  5. 等待wokers进程的结束
  6. wokers进程技术的后shuntdown处理
  7. onManagerStop的处理
  8. 撒花结束

关键server的启动代码完了,肯定也是雨里雾里的,别问我,我也是,我们其实想看reactorswoole的server服务承担什么角色 我们看这些干嘛
7.jpg
其实只有看到完启动的代码才能知道一些我们想要的 如果根据epoll的模型核心就在notify,我们可以回过头再看下总结下

1.关于reactor的线程创建

int swReactor_create(swReactor *reactor, int max_event)
{
    int ret;
    bzero(reactor, sizeof(swReactor));

#ifdef HAVE_EPOLL
    ret = swReactorEpoll_create(reactor, max_event);
#elif defined(HAVE_KQUEUE)
    ret = swReactorKqueue_create(reactor, max_event);
#elif defined(HAVE_POLL)
    ret = swReactorPoll_create(reactor, max_event);
#else
    ret = swReactorSelect_create(reactor);
#endif

    reactor->running = 1;

    reactor->onFinish = reactor_finish;
    reactor->onTimeout = reactor_timeout;
    reactor->is_empty = swReactor_empty;
    reactor->can_exit = SwooleG.reactor_can_exit;

    reactor->write = swReactor_write;
    reactor->close = swReactor_close;

    reactor->defer = defer_task_add;
    reactor->defer_tasks = nullptr;

    reactor->default_write_handler = swReactor_onWrite;

    Socket::init_reactor(reactor);
    System::init_reactor(reactor);
    swClient_init_reactor(reactor);

    if (SwooleG.hooks[SW_GLOBAL_HOOK_ON_REACTOR_CREATE])
    {
        swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE, reactor);
    }

    return ret;
}

2.关于tcp的函数

    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;
    serv->notify = swServer_tcp_notify;
    serv->feedback = swServer_tcp_feedback;

3.关于swServer_tcp_notify

/**
 * use in master process
 */
static int swServer_tcp_notify(swServer *serv, swConnection *conn, int event)
{
    swDataHead notify_event = {};
    notify_event.type = event;
    notify_event.reactor_id = conn->reactor_id;
    notify_event.fd = conn->fd;
    notify_event.server_fd = conn->server_fd;
    return serv->factory.notify(&serv->factory, &notify_event);
}

4.关于swFactory_create

int swFactory_create(swFactory *factory)
{
    factory->dispatch = swFactory_dispatch;
    factory->finish = swFactory_finish;
    factory->start = swFactory_start;
    factory->shutdown = swFactory_shutdown;
    factory->end = swFactory_end;
    factory->notify = swFactory_notify;
    factory->free = swFactory_free;
    return SW_OK;
}

5.关于notify

/**
 * only stream fd
 */
static int swFactory_notify(swFactory *factory, swDataHead *info)
{
    swServer *serv = (swServer *) factory->ptr;
    swConnection *conn = swServer_connection_get(serv, info->fd);
    if (conn == NULL || conn->active == 0)
    {
        swWarn("dispatch[type=%d] failed, connection#%d is not active", info->type, info->fd);
        return SW_ERR;
    }
    //server active close, discard data.
    if (conn->closed)
    {
        swWarn("dispatch[type=%d] failed, connection#%d is closed by server", info->type, info->fd);
        return SW_OK;
    }
    //converted fd to session_id
    info->fd = conn->session_id;
    info->server_fd = conn->server_fd;
    info->flags = SW_EVENT_DATA_NORMAL;

    return swWorker_onTask(factory, (swEventData *) info);
}

6.关于onReceiveonTaskonFinish

可以开始记笔记了
8.jpg

Reactor

1.reactor是线程形态,可以单线程也可以多线程 取决woker数量
2.负责维护客户端TCP连接、处理网络IO、处理协议、收发数据
3.不执行任何PHP代码

Worker

1.worker是进程形态,可以单进程也可以多进程
2.接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
3.生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端

总结就是再swoolereactor就是nginx, worker就是php-fpm,不同的有了协程后woker可以是异步的,那么理论上承载的并发是无上限的。(有人开喷了:你当文件句柄打开限制是吃素的吗!)

写给最后

swoole的源码的分析的文章可能要先缓缓,因为越看发现越吃力,还是因为自己底子不够好。
9.jpg
BUT 立的flag还是要完成。接下来可能先从redis入手,因为书籍比较全面(copy更加的顺畅丝滑)。
看完上述给个大家一个小小的问题
写过swoole的小伙伴都知道 为什么mysqlredis连接需要在onStart的时候进行处理?

查看原文

ltxlong 收藏了文章 · 11月2日

【SWOOLE系列】浅谈SWOOLE协程篇

阅读本文需要以下知识点

  • 了解进程、线程相关基础
  • 熟练php的hello world输出
  • 会swoole单词拼写

协程的介绍

协程是什么?

A coroutine is a function that can suspend its execution (yield) until the given given YieldInstruction finishes.

简单的说协程是寄宿在线程下程序员实现的一种跟更轻量的并发的协作轻量线程

随着程序员人群的增大,大佬也不断的爆发式增长,当然就开始有人觉得线程不好用了,那怎么办呢?当然是基于线程的理念上再去实现一套更加轻量、更好骗star的一套轻量线程(事实上协程不能完全被认为线程,因为一个线程可以有多个协程)

协程和线程的区别

本质

线程 内核态
协程 用户态

调度方式

线程的调度方式为系统调度,常用的调度策略有分时调度抢占调度。说白就是线程的调度完全不受自己控制

协程的调度方式为协作式调度 不受内核控制由自由策略调度切换

等等

协作式调度?

上述说了协程是用户态的,所以所谓的协作式调度直接可以理解为是程序员写的调度方式,也就是我想怎么调度就怎么调度,而不用通过系统内核被调度。

深。。。。浅入理解swoole的协程

既然打算浅入理解的swoole的协程,我们必须要知道swoole的协程模型。
swoole的协程是基于单线程。可以理解为协程的切换是串行的,再同一个时间点只运行一个协程.

说到这里,肯定就有人问了。go呢,go的协程的是基于多线程。当然各有各的好处,具体可以自行使用搜索引擎了解

我们可以直接copy & paste 下面代码,再本地的环境进行的 demo run

<?php

$func = function ($index, $isCorotunine = true) {
    $isCorotunine && \Swoole\Coroutine::sleep(2);
    echo "index:" . $index . PHP_EOL;
    echo "is corotunine:" . intval($isCorotunine) . PHP_EOL;
};

$func(1, false);
go($func, 2, true);
go($func, 3, true);
go($func, 4, true);
go($func, 5, true);
go($func, 6, true);
$func(7, false);    

会得到以下结果

index:1
is corotunine:0
index:7
is corotunine:0
index:2
is corotunine:1
index:6
is corotunine:1
index:5
is corotunine:1
index:4
is corotunine:1
index:3
is corotunine:1

肯定有人会想,哇塞,尽然2秒都执行完了,一点都不堵塞啊!!

好了,事实上关于2秒执行完的事情可以回过头再去看下协程的概念。
我们可以关注的是执行顺序,1和7是非协程的执行能立马返回结果符合预期。
关于协程的调度顺序
为什么是26543不是65432或者23456有序的返回呢

为了找到我们的答案,我们只能通过源码进行知晓一些东西

分析源码

image

图来自https://segmentfault.com/a/11...

如果没有较强的基础还有啃烂的apue的前提下(当然我也没有!T_T)
我们需要关心的是以下两个
yield 切换协程
resume 恢复协程

协程的创建

<?php
go (function(){
echo "swoole 太棒了";
});

调用的swoole封装给PHPgo函数为创建一个协程

我们根据拓展源码中的

大部分的PHP扩展函数以及扩展方法的参数声明放在swoole_*.ccswoole.cc里面。
PHP_FALIAS(go, swoole_coroutine_create, arginfo_swoole_coroutine_create)

可以知道 go->swoole_coroutine_create

在swoole_coroutine.cc文件里找到

PHP_FUNCTION(swoole_coroutine_create)
{
    ....
    // 划重点 要考
    long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
    ....
}

long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
    if (sw_unlikely(Coroutine::count() >= config.max_num))
    {
        php_swoole_fatal_error(E_WARNING, "exceed max number of coroutine %zu", (uintmax_t) Coroutine::count());
        return SW_CORO_ERR_LIMIT;
    }

    if (sw_unlikely(!active))
    {
        // 划重点 要考
        activate();
    }

    // 保存回调函数
    php_coro_args php_coro_args;
    //函数信息
    php_coro_args.fci_cache = fci_cache;
    //参数
    php_coro_args.argv = argv;
    php_coro_args.argc = argc;
    // 划重点 要考
    save_task(get_task());

    // 划重点 要考
    return Coroutine::create(main_func, (void*) &php_coro_args);
}
// 保存栈 
void PHPCoroutine::save_task(php_coro_task *task)
{
    save_vm_stack(task);
    save_og(task);
}
// 初始化reactor的事件
inline void PHPCoroutine::activate()
{
    if (sw_unlikely(active))
    {
        return;
    }

    /* init reactor and register event wait */
    php_swoole_check_reactor();

    /* replace interrupt function */
    orig_interrupt_function = zend_interrupt_function;
    zend_interrupt_function = coro_interrupt_function;
    
    /* replace the error function to save execute_data */
    orig_error_function = zend_error_cb;
    zend_error_cb = error;

    if (config.hook_flags)
    {
        enable_hook(config.hook_flags);
    }

    if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler)
    {
        /* create a thread to interrupt the coroutine that takes up too much time */
        interrupt_thread_start();
    }

    if (!coro_global_active)
    {
        if (zend_hash_str_find_ptr(&module_registry, ZEND_STRL("xdebug")))
        {
            php_swoole_fatal_error(E_WARNING, "Using Xdebug in coroutines is extremely dangerous, please notice that it may lead to coredump!");
        }

        /* replace functions that can not work correctly in coroutine */
        inject_function();

        coro_global_active = true;
    }
    /**
     * deactivate when reactor free.
     */
    swReactor_add_destroy_callback(SwooleG.main_reactor, deactivate, nullptr);
    active = true;
}

根据Coroutine::create继续往下跳转

    static inline long create(coroutine_func_t fn, void* args = nullptr)
    {
        return (new Coroutine(fn, args))->run();
    }

在创建完协程后立马执行
我们观察下构造方法

    Coroutine(coroutine_func_t fn, void *private_data) :
            ctx(stack_size, fn, private_data)
    {
        cid = ++last_cid;
        coroutines[cid] = this;
        if (sw_unlikely(count() > peak_num))
        {
            peak_num = count();
        }
    }

上述代码我可以发现还有一个Context的类 这个构造函数我们可以猜到做了3件事情

  1. 分配对应协程id (每个协程都有自己的id)
  2. 保存上下文
  3. 更新当前的协程的数量
swoole使用的协程库为 boost.context 可自行搜索
主要暴露的函数接口为jump_fcontextmake_fcontext
具体的作用保存当前执行状态的上下文暂停当前的执行状态够跳转到其他位置继续执行

创建完协程立马执行

inline long run()
    {
        long cid = this->cid;
        origin = current;
        current = this;
        // 依赖boost.context 切栈
        ctx.swap_in();
        // 判断是否执行结束
        check_end();
        return cid;
    }

判断是否结束

inline void check_end()
    {
        if (ctx.is_end())
        {
            close();
        }
        else if (sw_unlikely(on_bailout))
        {
            SW_ASSERT(current == nullptr);
            on_bailout();
            // expect that never here
            exit(1);
        }
    }

根据ctx.is_end()的函数找到

    inline bool is_end()
    {
        return end_;
    }
bool Context::swap_in()
{
    jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
    return true;
}

我们可以总结下swoole在创建协程的时候主要做了哪些事情

  1. 检测环境
  2. 解析参数
  3. 保存上下文
  4. 切换C栈
  5. 执行协程

协程的yield

上述的demo我们使用\Swoole\Coroutine::sleep(2)
根据上述说函数申明的我们在swoole_corotunine_system.cc发现对应函数为swoole_coroutine_systemsleep

PHP_METHOD(swoole_coroutine_system, sleep)
{
    double seconds;

    ZEND_PARSE_PARAMETERS_START(1, 1)
        Z_PARAM_DOUBLE(seconds)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    if (UNEXPECTED(seconds < SW_TIMER_MIN_SEC))
    {
        php_swoole_fatal_error(E_WARNING, "Timer must be greater than or equal to " ZEND_TOSTR(SW_TIMER_MIN_SEC));
        RETURN_FALSE;
    }
    System::sleep(seconds);
    RETURN_TRUE;
}

调用了sleep函数之后对当前的协程做了三件事
1.增加了timer定时器
2.注册回掉函数再延迟之后resume协程
3.通过yield让出调度

int System::sleep(double sec)
{
// 获取当前的协程
    Coroutine* co = Coroutine::get_current_safe();
   //swTimer_add 注册定时器 sleep_timeout回调的函数
   if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL)
    {
        return -1;
    }
    // 让出当前cpu
    co->yield();
    return 0;
}

// 回调函数
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
   // 恢复调度
    ((Coroutine *) tnode->data)->resume();
}
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
    ....
    // 保存当前上下文和对应过期时间
    tnode->data = data;
    tnode->type = SW_TIMER_TYPE_KERNEL;
    tnode->exec_msec = now_msec + _msec;
    tnode->interval = interval ? _msec : 0;
    tnode->removed = 0;
    tnode->callback = callback;
    tnode->round = timer->round;
    tnode->dtor = NULL;

    // _next_msec保存最快过期的事件
    if (timer->_next_msec < 0 || timer->_next_msec > _msec)
    {
        timer->set(timer, _msec);
        timer->_next_msec = _msec;
    }

    tnode->id = timer->_next_id++;
    if (sw_unlikely(tnode->id < 0))
    {
        tnode->id = 1;
        timer->_next_id = 2;
    }

    tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode);
    ....
    timer->num++;
    return tnode;
}

协程的切换

我们

void Coroutine::resume()
{
    SW_ASSERT(current != this);
    if (sw_unlikely(on_bailout))
    {
        return;
    }
    state = SW_CORO_RUNNING;
    if (sw_likely(on_resume))
    {
        on_resume(task);
    }
    // 将当前的协程保存为origin -> 理解程previous
    origin = current;
    // 需要执行的协程 变成 current
    current = this;
    // 入栈执行
    ctx.swap_in();
    check_end();
}

到这里时候 关于协程调用顺序的答案已经出来了

在创建协程的时候(new Coroutine(fn, args))->run();sleep触发yield都在不断变更的Corotuninecurrentorigin 再执切换的时候和php代码创建协程的时间发生穿插,而不是我们想象中的队列有序执行
比如当创建协程只有2个的时候

<?php

$func = function ($index, $isCorotunine = true) {
    $isCorotunine && \Swoole\Coroutine::sleep(2);
    echo "index:" . $index . PHP_EOL;
    echo "is corotunine:" . intval($isCorotunine) . PHP_EOL;
};

$func(1, false);

go($func, 2, true);
go($func, 3, true);

返回输出 因为连续创建协程的执行时间小没有被打乱

php swoole_go_demo1.php
index:1
is corotunine:0
index:2
is corotunine:1
index:3
is corotunine:1

当连续创建的时候200个协程的时候
返回就变得打乱的index 符合预计猜想

index:1,index:2,index:4,index:8,index:16,index:32,index:64,index:128,index:129,index:65,index:130,index:131,index:33,index:66,index:132,index:133,index:67,index:134,index:135,index:17,index:34,index:68,index:136,index:137,index:69,index:138,index:139,index:35,index:70,index:140,index:141,index:71,index:142,index:143,index:9,index:18,index:36,index:72,index:144,index:145,index:73,index:146,index:147,index:158,index:157,index:156,index:155,index:154,index:153,index:152,index:151,index:37,index:74,index:148,index:149,index:75,index:150,index:19,index:38,index:76,index:77,index:39,index:78,index:79,index:5,index:10,index:20,index:40,index:80,index:81,index:41,index:82,index:83,index:21,index:127,index:126,index:125,index:124,index:123,index:122,index:121,index:120,index:119,index:118,index:117,index:116,index:115,index:114,index:113,index:112,index:111,index:110,index:109,index:108,index:107,index:106,index:105,index:104,index:103,index:102,index:101,index:100,index:99,index:98,index:97,index:96,index:95,index:94,index:93,index:92,index:91,index:90,index:89,index:88,index:87,index:42,index:84,index:85,index:43,index:86,index:11,index:22,index:44,index:45,index:23,index:46,index:47,index:3,index:6,index:12,index:24,index:48,index:49,index:25,index:50,index:51,index:13,index:26,index:63,index:62,index:61,index:60,index:59,index:58,index:57,index:56,index:55,index:52,index:53,index:27,index:54,index:7,index:14,index:28,index:29,index:15,index:30,index:31,index:200,index:199,index:192,index:185,index:175,index:168,index:161,index:163,index:172,index:179,index:187,index:194,index:174,index:160,index:173,index:176,index:198,index:195,index:180,index:167,index:169,index:184,index:197,index:193,index:177,index:162,index:171,index:186,index:182,index:164,index:191,index:183,index:166,index:196,index:178,index:170,index:189,index:188,index:165,index:181,index:190,index:159

最后彩蛋

我们使用GO的协程的来实现上述的demo

package main

import (
    "fmt"
    "time"
)

var count int = 0

func main() {
    output(false, 1)

    go output(true, 2)
    go output(true, 3)
    go output(true, 4)
    go output(true, 5)
    go output(true, 6)

    output(false, 7)

    time.Sleep(time.Second)
}

func output(isCorotunine bool, index int) {
    time.Sleep(time.Second)
    count = count + 1
    fmt.Println(count, isCorotunine, index)
}

猜猜返回结果是如何的 可以根据go的协程基于多线程的方式再去研究下
image.png

写给最后,文章纯属自己根据代码和资料理解,如果有错误麻烦提出来,倍感万分,如果因为一些错误的观点被误导我只能说

查看原文

ltxlong 收藏了文章 · 10月30日

Linux网络 - 数据包的接收过程

本文将介绍在Linux系统中,数据包是如何一步一步从网卡传到进程手中的。

如果英文没有问题,强烈建议阅读后面参考里的两篇文章,里面介绍的更详细。

本文只讨论以太网的物理网卡,不涉及虚拟设备,并且以一个UDP包的接收过程作为示例.

本示例里列出的函数调用关系来自于kernel 3.13.0,如果你的内核不是这个版本,函数名称和相关路径可能不一样,但背后的原理应该是一样的(或者有细微差别)

网卡到内存

网卡需要有驱动才能工作,驱动是加载到内核中的模块,负责衔接网卡和内核的网络模块,驱动在加载的时候将自己注册进网络模块,当相应的网卡收到数据包时,网络模块会调用相应的驱动程序处理数据。

下图展示了数据包(packet)如何进入内存,并被内核的网络模块开始处理:

                   +-----+
                   |     |                            Memroy
+--------+   1     |     |  2  DMA     +--------+--------+--------+--------+
| Packet |-------->| NIC |------------>| Packet | Packet | Packet | ...... |
+--------+         |     |             +--------+--------+--------+--------+
                   |     |<--------+
                   +-----+         |
                      |            +---------------+
                      |                            |
                    3 | Raise IRQ                  | Disable IRQ
                      |                          5 |
                      |                            |
                      ↓                            |
                   +-----+                   +------------+
                   |     |  Run IRQ handler  |            |
                   | CPU |------------------>| NIC Driver |
                   |     |       4           |            |
                   +-----+                   +------------+
                                                   |
                                                6  | Raise soft IRQ
                                                   |
                                                   ↓
  • 1: 数据包从外面的网络进入物理网卡。如果目的地址不是该网卡,且该网卡没有开启混杂模式,该包会被网卡丢弃。
  • 2: 网卡将数据包通过DMA的方式写入到指定的内存地址,该地址由网卡驱动分配并初始化。注: 老的网卡可能不支持DMA,不过新的网卡一般都支持。
  • 3: 网卡通过硬件中断(IRQ)通知CPU,告诉它有数据来了
  • 4: CPU根据中断表,调用已经注册的中断函数,这个中断函数会调到驱动程序(NIC Driver)中相应的函数
  • 5: 驱动先禁用网卡的中断,表示驱动程序已经知道内存中有数据了,告诉网卡下次再收到数据包直接写内存就可以了,不要再通知CPU了,这样可以提高效率,避免CPU不停的被中断。
  • 6: 启动软中断。这步结束后,硬件中断处理函数就结束返回了。由于硬中断处理程序执行的过程中不能被中断,所以如果它执行时间过长,会导致CPU没法响应其它硬件的中断,于是内核引入软中断,这样可以将硬中断处理函数中耗时的部分移到软中断处理函数里面来慢慢处理。

内核的网络模块

软中断会触发内核网络模块中的软中断处理函数,后续流程如下

                                                     +-----+
                                             17      |     |
                                        +----------->| NIC |
                                        |            |     |
                                        |Enable IRQ  +-----+
                                        |
                                        |
                                  +------------+                                      Memroy
                                  |            |        Read           +--------+--------+--------+--------+
                 +--------------->| NIC Driver |<--------------------- | Packet | Packet | Packet | ...... |
                 |                |            |          9            +--------+--------+--------+--------+
                 |                +------------+
                 |                      |    |        skb
            Poll | 8      Raise softIRQ | 6  +-----------------+
                 |                      |             10       |
                 |                      ↓                      ↓
         +---------------+  Call  +-----------+        +------------------+        +--------------------+  12  +---------------------+
         | net_rx_action |<-------| ksoftirqd |        | napi_gro_receive |------->| enqueue_to_backlog |----->| CPU input_pkt_queue |
         +---------------+   7    +-----------+        +------------------+   11   +--------------------+      +---------------------+
                                                               |                                                      | 13
                                                            14 |        + - - - - - - - - - - - - - - - - - - - - - - +
                                                               ↓        ↓
                                                    +--------------------------+    15      +------------------------+
                                                    | __netif_receive_skb_core |----------->| packet taps(AF_PACKET) |
                                                    +--------------------------+            +------------------------+
                                                               |
                                                               | 16
                                                               ↓
                                                      +-----------------+
                                                      | protocol layers |
                                                      +-----------------+
  • 7: 内核中的ksoftirqd进程专门负责软中断的处理,当它收到软中断后,就会调用相应软中断所对应的处理函数,对于上面第6步中是网卡驱动模块抛出的软中断,ksoftirqd会调用网络模块的net_rx_action函数
  • 8: net_rx_action调用网卡驱动里的poll函数来一个一个的处理数据包
  • 9: 在pool函数中,驱动会一个接一个的读取网卡写到内存中的数据包,内存中数据包的格式只有驱动知道
  • 10: 驱动程序将内存中的数据包转换成内核网络模块能识别的skb格式,然后调用napi_gro_receive函数
  • 11: napi_gro_receive会处理GRO相关的内容,也就是将可以合并的数据包进行合并,这样就只需要调用一次协议栈。然后判断是否开启了RPS,如果开启了,将会调用enqueue_to_backlog
  • 12: 在enqueue_to_backlog函数中,会将数据包放入CPU的softnet_data结构体的input_pkt_queue中,然后返回,如果input_pkt_queue满了的话,该数据包将会被丢弃,queue的大小可以通过net.core.netdev_max_backlog来配置
  • 13: CPU会接着在自己的软中断上下文中处理自己input_pkt_queue里的网络数据(调用__netif_receive_skb_core)
  • 14: 如果没开启RPS,napi_gro_receive会直接调用__netif_receive_skb_core
  • 15: 看是不是有AF_PACKET类型的socket(也就是我们常说的原始套接字),如果有的话,拷贝一份数据给它。tcpdump抓包就是抓的这里的包。
  • 16: 调用协议栈相应的函数,将数据包交给协议栈处理。
  • 17: 待内存中的所有数据包被处理完成后(即poll函数执行完成),启用网卡的硬中断,这样下次网卡再收到数据的时候就会通知CPU
enqueue_to_backlog函数也会被netif_rx函数调用,而netif_rx正是lo设备发送数据包时调用的函数

协议栈

IP层

由于是UDP包,所以第一步会进入IP层,然后一级一级的函数往下调:

          |
          |
          ↓         promiscuous mode &&
      +--------+    PACKET_OTHERHOST (set by driver)   +-----------------+
      | ip_rcv |-------------------------------------->| drop this packet|
      +--------+                                       +-----------------+
          |
          |
          ↓
+---------------------+
| NF_INET_PRE_ROUTING |
+---------------------+
          |
          |
          ↓
      +---------+
      |         | enabled ip forword  +------------+        +----------------+
      | routing |-------------------->| ip_forward |------->| NF_INET_FORWARD |
      |         |                     +------------+        +----------------+
      +---------+                                                   |
          |                                                         |
          | destination IP is local                                 ↓
          ↓                                                 +---------------+
 +------------------+                                       | dst_output_sk |
 | ip_local_deliver |                                       +---------------+
 +------------------+
          |
          |
          ↓
 +------------------+
 | NF_INET_LOCAL_IN |
 +------------------+
          |
          |
          ↓
    +-----------+
    | UDP layer |
    +-----------+
  • ip_rcv: ip_rcv函数是IP模块的入口函数,在该函数里面,第一件事就是将垃圾数据包(目的mac地址不是当前网卡,但由于网卡设置了混杂模式而被接收进来)直接丢掉,然后调用注册在NF_INET_PRE_ROUTING上的函数
  • NF_INET_PRE_ROUTING: netfilter放在协议栈中的钩子,可以通过iptables来注入一些数据包处理函数,用来修改或者丢弃数据包,如果数据包没被丢弃,将继续往下走
  • routing: 进行路由,如果是目的IP不是本地IP,且没有开启ip forward功能,那么数据包将被丢弃,如果开启了ip forward功能,那将进入ip_forward函数
  • ip_forward: ip_forward会先调用netfilter注册的NF_INET_FORWARD相关函数,如果数据包没有被丢弃,那么将继续往后调用dst_output_sk函数
  • dst_output_sk: 该函数会调用IP层的相应函数将该数据包发送出去,同下一篇要介绍的数据包发送流程的后半部分一样。
  • ip_local_deliver:如果上面routing的时候发现目的IP是本地IP,那么将会调用该函数,在该函数中,会先调用NF_INET_LOCAL_IN相关的钩子程序,如果通过,数据包将会向下发送到UDP层

UDP层

          |
          |
          ↓
      +---------+            +-----------------------+
      | udp_rcv |----------->| __udp4_lib_lookup_skb |
      +---------+            +-----------------------+
          |
          |
          ↓
 +--------------------+      +-----------+
 | sock_queue_rcv_skb |----->| sk_filter |
 +--------------------+      +-----------+
          |
          |
          ↓
 +------------------+
 | __skb_queue_tail |
 +------------------+
          |
          |
          ↓
  +---------------+
  | sk_data_ready |
  +---------------+
  • udp_rcv: udp_rcv函数是UDP模块的入口函数,它里面会调用其它的函数,主要是做一些必要的检查,其中一个重要的调用是__udp4_lib_lookup_skb,该函数会根据目的IP和端口找对应的socket,如果没有找到相应的socket,那么该数据包将会被丢弃,否则继续
  • sock_queue_rcv_skb: 主要干了两件事,一是检查这个socket的receive buffer是不是满了,如果满了的话,丢弃该数据包,然后就是调用sk_filter看这个包是否是满足条件的包,如果当前socket上设置了filter,且该包不满足条件的话,这个数据包也将被丢弃(在Linux里面,每个socket上都可以像tcpdump里面一样定义filter,不满足条件的数据包将会被丢弃)
  • __skb_queue_tail: 将数据包放入socket接收队列的末尾
  • sk_data_ready: 通知socket数据包已经准备好
调用完sk_data_ready之后,一个数据包处理完成,等待应用层程序来读取,上面所有函数的执行过程都在软中断的上下文中。

socket

应用层一般有两种方式接收数据,一种是recvfrom函数阻塞在那里等着数据来,这种情况下当socket收到通知后,recvfrom就会被唤醒,然后读取接收队列的数据;另一种是通过epoll或者select监听相应的socket,当收到通知后,再调用recvfrom函数去读取接收队列的数据。两种情况都能正常的接收到相应的数据包。

结束语

了解数据包的接收流程有助于帮助我们搞清楚我们可以在哪些地方监控和修改数据包,哪些情况下数据包可能被丢弃,为我们处理网络问题提供了一些参考,同时了解netfilter中相应钩子的位置,对于了解iptables的用法有一定的帮助,同时也会帮助我们后续更好的理解Linux下的网络虚拟设备。

在接下来的几篇文章中,将会介绍Linux下的网络虚拟设备和iptables。

参考

Monitoring and Tuning the Linux Networking Stack: Receiving Data
Illustrated Guide to Monitoring and Tuning the Linux Networking Stack: Receiving Data
NAPI

查看原文

ltxlong 赞了文章 · 10月30日

Linux网络 - 数据包的接收过程

本文将介绍在Linux系统中,数据包是如何一步一步从网卡传到进程手中的。

如果英文没有问题,强烈建议阅读后面参考里的两篇文章,里面介绍的更详细。

本文只讨论以太网的物理网卡,不涉及虚拟设备,并且以一个UDP包的接收过程作为示例.

本示例里列出的函数调用关系来自于kernel 3.13.0,如果你的内核不是这个版本,函数名称和相关路径可能不一样,但背后的原理应该是一样的(或者有细微差别)

网卡到内存

网卡需要有驱动才能工作,驱动是加载到内核中的模块,负责衔接网卡和内核的网络模块,驱动在加载的时候将自己注册进网络模块,当相应的网卡收到数据包时,网络模块会调用相应的驱动程序处理数据。

下图展示了数据包(packet)如何进入内存,并被内核的网络模块开始处理:

                   +-----+
                   |     |                            Memroy
+--------+   1     |     |  2  DMA     +--------+--------+--------+--------+
| Packet |-------->| NIC |------------>| Packet | Packet | Packet | ...... |
+--------+         |     |             +--------+--------+--------+--------+
                   |     |<--------+
                   +-----+         |
                      |            +---------------+
                      |                            |
                    3 | Raise IRQ                  | Disable IRQ
                      |                          5 |
                      |                            |
                      ↓                            |
                   +-----+                   +------------+
                   |     |  Run IRQ handler  |            |
                   | CPU |------------------>| NIC Driver |
                   |     |       4           |            |
                   +-----+                   +------------+
                                                   |
                                                6  | Raise soft IRQ
                                                   |
                                                   ↓
  • 1: 数据包从外面的网络进入物理网卡。如果目的地址不是该网卡,且该网卡没有开启混杂模式,该包会被网卡丢弃。
  • 2: 网卡将数据包通过DMA的方式写入到指定的内存地址,该地址由网卡驱动分配并初始化。注: 老的网卡可能不支持DMA,不过新的网卡一般都支持。
  • 3: 网卡通过硬件中断(IRQ)通知CPU,告诉它有数据来了
  • 4: CPU根据中断表,调用已经注册的中断函数,这个中断函数会调到驱动程序(NIC Driver)中相应的函数
  • 5: 驱动先禁用网卡的中断,表示驱动程序已经知道内存中有数据了,告诉网卡下次再收到数据包直接写内存就可以了,不要再通知CPU了,这样可以提高效率,避免CPU不停的被中断。
  • 6: 启动软中断。这步结束后,硬件中断处理函数就结束返回了。由于硬中断处理程序执行的过程中不能被中断,所以如果它执行时间过长,会导致CPU没法响应其它硬件的中断,于是内核引入软中断,这样可以将硬中断处理函数中耗时的部分移到软中断处理函数里面来慢慢处理。

内核的网络模块

软中断会触发内核网络模块中的软中断处理函数,后续流程如下

                                                     +-----+
                                             17      |     |
                                        +----------->| NIC |
                                        |            |     |
                                        |Enable IRQ  +-----+
                                        |
                                        |
                                  +------------+                                      Memroy
                                  |            |        Read           +--------+--------+--------+--------+
                 +--------------->| NIC Driver |<--------------------- | Packet | Packet | Packet | ...... |
                 |                |            |          9            +--------+--------+--------+--------+
                 |                +------------+
                 |                      |    |        skb
            Poll | 8      Raise softIRQ | 6  +-----------------+
                 |                      |             10       |
                 |                      ↓                      ↓
         +---------------+  Call  +-----------+        +------------------+        +--------------------+  12  +---------------------+
         | net_rx_action |<-------| ksoftirqd |        | napi_gro_receive |------->| enqueue_to_backlog |----->| CPU input_pkt_queue |
         +---------------+   7    +-----------+        +------------------+   11   +--------------------+      +---------------------+
                                                               |                                                      | 13
                                                            14 |        + - - - - - - - - - - - - - - - - - - - - - - +
                                                               ↓        ↓
                                                    +--------------------------+    15      +------------------------+
                                                    | __netif_receive_skb_core |----------->| packet taps(AF_PACKET) |
                                                    +--------------------------+            +------------------------+
                                                               |
                                                               | 16
                                                               ↓
                                                      +-----------------+
                                                      | protocol layers |
                                                      +-----------------+
  • 7: 内核中的ksoftirqd进程专门负责软中断的处理,当它收到软中断后,就会调用相应软中断所对应的处理函数,对于上面第6步中是网卡驱动模块抛出的软中断,ksoftirqd会调用网络模块的net_rx_action函数
  • 8: net_rx_action调用网卡驱动里的poll函数来一个一个的处理数据包
  • 9: 在pool函数中,驱动会一个接一个的读取网卡写到内存中的数据包,内存中数据包的格式只有驱动知道
  • 10: 驱动程序将内存中的数据包转换成内核网络模块能识别的skb格式,然后调用napi_gro_receive函数
  • 11: napi_gro_receive会处理GRO相关的内容,也就是将可以合并的数据包进行合并,这样就只需要调用一次协议栈。然后判断是否开启了RPS,如果开启了,将会调用enqueue_to_backlog
  • 12: 在enqueue_to_backlog函数中,会将数据包放入CPU的softnet_data结构体的input_pkt_queue中,然后返回,如果input_pkt_queue满了的话,该数据包将会被丢弃,queue的大小可以通过net.core.netdev_max_backlog来配置
  • 13: CPU会接着在自己的软中断上下文中处理自己input_pkt_queue里的网络数据(调用__netif_receive_skb_core)
  • 14: 如果没开启RPS,napi_gro_receive会直接调用__netif_receive_skb_core
  • 15: 看是不是有AF_PACKET类型的socket(也就是我们常说的原始套接字),如果有的话,拷贝一份数据给它。tcpdump抓包就是抓的这里的包。
  • 16: 调用协议栈相应的函数,将数据包交给协议栈处理。
  • 17: 待内存中的所有数据包被处理完成后(即poll函数执行完成),启用网卡的硬中断,这样下次网卡再收到数据的时候就会通知CPU
enqueue_to_backlog函数也会被netif_rx函数调用,而netif_rx正是lo设备发送数据包时调用的函数

协议栈

IP层

由于是UDP包,所以第一步会进入IP层,然后一级一级的函数往下调:

          |
          |
          ↓         promiscuous mode &&
      +--------+    PACKET_OTHERHOST (set by driver)   +-----------------+
      | ip_rcv |-------------------------------------->| drop this packet|
      +--------+                                       +-----------------+
          |
          |
          ↓
+---------------------+
| NF_INET_PRE_ROUTING |
+---------------------+
          |
          |
          ↓
      +---------+
      |         | enabled ip forword  +------------+        +----------------+
      | routing |-------------------->| ip_forward |------->| NF_INET_FORWARD |
      |         |                     +------------+        +----------------+
      +---------+                                                   |
          |                                                         |
          | destination IP is local                                 ↓
          ↓                                                 +---------------+
 +------------------+                                       | dst_output_sk |
 | ip_local_deliver |                                       +---------------+
 +------------------+
          |
          |
          ↓
 +------------------+
 | NF_INET_LOCAL_IN |
 +------------------+
          |
          |
          ↓
    +-----------+
    | UDP layer |
    +-----------+
  • ip_rcv: ip_rcv函数是IP模块的入口函数,在该函数里面,第一件事就是将垃圾数据包(目的mac地址不是当前网卡,但由于网卡设置了混杂模式而被接收进来)直接丢掉,然后调用注册在NF_INET_PRE_ROUTING上的函数
  • NF_INET_PRE_ROUTING: netfilter放在协议栈中的钩子,可以通过iptables来注入一些数据包处理函数,用来修改或者丢弃数据包,如果数据包没被丢弃,将继续往下走
  • routing: 进行路由,如果是目的IP不是本地IP,且没有开启ip forward功能,那么数据包将被丢弃,如果开启了ip forward功能,那将进入ip_forward函数
  • ip_forward: ip_forward会先调用netfilter注册的NF_INET_FORWARD相关函数,如果数据包没有被丢弃,那么将继续往后调用dst_output_sk函数
  • dst_output_sk: 该函数会调用IP层的相应函数将该数据包发送出去,同下一篇要介绍的数据包发送流程的后半部分一样。
  • ip_local_deliver:如果上面routing的时候发现目的IP是本地IP,那么将会调用该函数,在该函数中,会先调用NF_INET_LOCAL_IN相关的钩子程序,如果通过,数据包将会向下发送到UDP层

UDP层

          |
          |
          ↓
      +---------+            +-----------------------+
      | udp_rcv |----------->| __udp4_lib_lookup_skb |
      +---------+            +-----------------------+
          |
          |
          ↓
 +--------------------+      +-----------+
 | sock_queue_rcv_skb |----->| sk_filter |
 +--------------------+      +-----------+
          |
          |
          ↓
 +------------------+
 | __skb_queue_tail |
 +------------------+
          |
          |
          ↓
  +---------------+
  | sk_data_ready |
  +---------------+
  • udp_rcv: udp_rcv函数是UDP模块的入口函数,它里面会调用其它的函数,主要是做一些必要的检查,其中一个重要的调用是__udp4_lib_lookup_skb,该函数会根据目的IP和端口找对应的socket,如果没有找到相应的socket,那么该数据包将会被丢弃,否则继续
  • sock_queue_rcv_skb: 主要干了两件事,一是检查这个socket的receive buffer是不是满了,如果满了的话,丢弃该数据包,然后就是调用sk_filter看这个包是否是满足条件的包,如果当前socket上设置了filter,且该包不满足条件的话,这个数据包也将被丢弃(在Linux里面,每个socket上都可以像tcpdump里面一样定义filter,不满足条件的数据包将会被丢弃)
  • __skb_queue_tail: 将数据包放入socket接收队列的末尾
  • sk_data_ready: 通知socket数据包已经准备好
调用完sk_data_ready之后,一个数据包处理完成,等待应用层程序来读取,上面所有函数的执行过程都在软中断的上下文中。

socket

应用层一般有两种方式接收数据,一种是recvfrom函数阻塞在那里等着数据来,这种情况下当socket收到通知后,recvfrom就会被唤醒,然后读取接收队列的数据;另一种是通过epoll或者select监听相应的socket,当收到通知后,再调用recvfrom函数去读取接收队列的数据。两种情况都能正常的接收到相应的数据包。

结束语

了解数据包的接收流程有助于帮助我们搞清楚我们可以在哪些地方监控和修改数据包,哪些情况下数据包可能被丢弃,为我们处理网络问题提供了一些参考,同时了解netfilter中相应钩子的位置,对于了解iptables的用法有一定的帮助,同时也会帮助我们后续更好的理解Linux下的网络虚拟设备。

在接下来的几篇文章中,将会介绍Linux下的网络虚拟设备和iptables。

参考

Monitoring and Tuning the Linux Networking Stack: Receiving Data
Illustrated Guide to Monitoring and Tuning the Linux Networking Stack: Receiving Data
NAPI

查看原文

赞 123 收藏 150 评论 18

ltxlong 赞了文章 · 9月4日

小谈yii2中3个数据提供者及与GridView的搭配使用

你一定对yii2的ActiveDataProvider很熟悉,黄金组合ActiveDataProvider + GridView帮助无数的yii2开发者快速构造出功能强大的表格。

你可知yii2的世界里还有SqlDataProviderArrayDataProvider两位大兄弟?接下来我们把这同家三兄弟的用法做一个小教程。

本文并非配置篇,对于三兄弟的配置大全阿北会在《yii2配置词典》专栏中进行介绍

ActiveDataProvider

最常用的一个,使用ActiveDataProvider将返回一个对象集合,里面包含了查询的对象群、分页信息等等,一般我们是这样用的

$query = User::find();
$dataProvider = new ActiveDataProvider([
  'query'=>$query,
  'pagination'=>[
    'pageSize'=>20
  ]
]);

在ActiveDataProvider中除了query参数都是可选的,比如pagination、sort等。

当你得到了$dataProvider后,可以将其传给视图和GridView进行无敌组合,当然也可以使用自身的一些方法,比如

获得当前页的所有users

$dataProvider->getModels();//这是一个内含对象的一维数组

获得页码信息

$dataProvider->getTotalCount();// 获得总页数
$dataProvider->getCount();// 当前页码
$dataProvider->getPagination(); // 内含的Pagination对象

上面是最常用的,除此之外还提供了比如refresh等方法。

当然我们最常用的还是将ActiveDataProvider传递给GridView然后渲染出表格。

use yii\grid\GridView;
<?= GridView::widget([
  'dataProvider' => $dataProvider,
]);?>
小结 ActiveDataProvider返回的是对象的集合,确切的说是每一个模型(AR),因此模型的关联特性可以使用,这让我们可以构造出复杂完整的表格。

SqlDataProvider

和名字一样,SqlDataProvider接收一个原生的SQL语句并且能生成一个带有参数的dataProvider,一个月你总会碰到几次是用QueryBuilder构造不出来的复杂SQL。

用法如下

$totalCount = Yii::$app->db->createCommand('SELECT COUNT(*) FROM user WHERE sex=:sex', ['sex:' => 1])
            ->queryScalar();

$dataProvider = new SqlDataProvider([
    'sql' => 'SELECT * FROM user WHERE sex=:sex',
    'params' => [':sex' => 1],
    'totalCount' => $totalCount,
    //'sort' =>false,//如果为假则删除排序
    'sort' => [
        'attributes' => [
            'username' => [
                'asc' => ['username' => SORT_ASC],
                'desc' => ['username' => SORT_DESC],
                'default' => SORT_DESC,
                'label' => '用户名',
            ],
            'sex' => [
                'asc' => ['sex' => SORT_ASC],
                'desc' => ['sex' => SORT_DESC],
                'default' => SORT_DESC,
                'label' => '性别',
            ],
            'created_on'
        ],
    ],
    'pagination' => [
        'pageSize' => 10,
    ],
]);

return $dataProvider;

不要被上面一大堆代码吓到,我只是装个逼而已,其实SqlDataProvider必填的参数只有一项,就是sql。比如我下面的这段代码

$dataProvider = new SqlDataProvider([
    'sql' => 'SELECT * FROM `blog`'
]);

当然SqlDataProvider也有一些方法,比如getModels、getTotalCount等方法,不同的是这里面没有对象了,这里面是数组。这很正常,你不在操作AR了,你现在直接在操作数据库。

而我们在 小谈Yii Framework 2.0(yii2)的数据库层 提到过,AR才是数据表面向对象的归宿。

所以当你搭配GridView的时候,就要如下写代码了

GridView::widget([
    'dataProvider' => $dataProvider,
    'columns' => [
        ['class' => 'yii\grid\SerialColumn'],
        [
            'label' =>"昵称",
            'attribute' => 'nickname',
            'value'=>function($data){
                return $data["nickname"];
            }
        ],
        'username',
        'sex',
        'created_on',
        ['class' => 'yii\grid\ActionColumn'],
    ],
]); 

此刻的$data就是我们sql语句查出的一行。

小结 替补ActiveDataProvider,同时也提供给一些喜欢数组作为结果的小盆友们,无法使用关联属性。当然如果你是一个SQL迷,这种方法也许更直接更快速。

ArrayDataProvider

之所以出现了三种数据提供者(DataProvider)主要是因为我们要面对不同的数据来源、一个Query、一条SQL语句,你看,现在使用ArrayDataProvider你可以将一个数组作为数据来源还。

比如下面的代码

$dataProvider = new ArrayDataProvider([
    'allModels' => Blog::find()->asArray()->all(),// 或Blog::find()->all()
    'pagination' => [
        'pageSize' => 1,
    ],
]);

allModels 是一个必填项,代表数据来源,allModels是一个数组,但是allModels里面的元素并不限制,可以数组或对象,甚至是简单的数据类型,比如数字和字符串。

当然ArrayDataProvider一样可以和GridView组合,比如

GridView::widget([
    'dataProvider' => $dataProvider
]);

对于GridView而言,这三种DataProvider没有区别

小结 给你一堆数据,然后用ArrayDataProvider进行分页、排序、构造表格!也许有其使用的场景吧,不过我现在还没遇到。

小结

这篇文章的初衷是为了让大家知道yii2中一共提供了几种数据提供者以及其使用方法,具体配置等我们会在其他专栏介绍。

在本文也使用了一些文章

原文链接 https://nai8.me/site/index.html
查看原文

赞 4 收藏 3 评论 0

ltxlong 收藏了文章 · 9月4日

小谈yii2中3个数据提供者及与GridView的搭配使用

你一定对yii2的ActiveDataProvider很熟悉,黄金组合ActiveDataProvider + GridView帮助无数的yii2开发者快速构造出功能强大的表格。

你可知yii2的世界里还有SqlDataProviderArrayDataProvider两位大兄弟?接下来我们把这同家三兄弟的用法做一个小教程。

本文并非配置篇,对于三兄弟的配置大全阿北会在《yii2配置词典》专栏中进行介绍

ActiveDataProvider

最常用的一个,使用ActiveDataProvider将返回一个对象集合,里面包含了查询的对象群、分页信息等等,一般我们是这样用的

$query = User::find();
$dataProvider = new ActiveDataProvider([
  'query'=>$query,
  'pagination'=>[
    'pageSize'=>20
  ]
]);

在ActiveDataProvider中除了query参数都是可选的,比如pagination、sort等。

当你得到了$dataProvider后,可以将其传给视图和GridView进行无敌组合,当然也可以使用自身的一些方法,比如

获得当前页的所有users

$dataProvider->getModels();//这是一个内含对象的一维数组

获得页码信息

$dataProvider->getTotalCount();// 获得总页数
$dataProvider->getCount();// 当前页码
$dataProvider->getPagination(); // 内含的Pagination对象

上面是最常用的,除此之外还提供了比如refresh等方法。

当然我们最常用的还是将ActiveDataProvider传递给GridView然后渲染出表格。

use yii\grid\GridView;
<?= GridView::widget([
  'dataProvider' => $dataProvider,
]);?>
小结 ActiveDataProvider返回的是对象的集合,确切的说是每一个模型(AR),因此模型的关联特性可以使用,这让我们可以构造出复杂完整的表格。

SqlDataProvider

和名字一样,SqlDataProvider接收一个原生的SQL语句并且能生成一个带有参数的dataProvider,一个月你总会碰到几次是用QueryBuilder构造不出来的复杂SQL。

用法如下

$totalCount = Yii::$app->db->createCommand('SELECT COUNT(*) FROM user WHERE sex=:sex', ['sex:' => 1])
            ->queryScalar();

$dataProvider = new SqlDataProvider([
    'sql' => 'SELECT * FROM user WHERE sex=:sex',
    'params' => [':sex' => 1],
    'totalCount' => $totalCount,
    //'sort' =>false,//如果为假则删除排序
    'sort' => [
        'attributes' => [
            'username' => [
                'asc' => ['username' => SORT_ASC],
                'desc' => ['username' => SORT_DESC],
                'default' => SORT_DESC,
                'label' => '用户名',
            ],
            'sex' => [
                'asc' => ['sex' => SORT_ASC],
                'desc' => ['sex' => SORT_DESC],
                'default' => SORT_DESC,
                'label' => '性别',
            ],
            'created_on'
        ],
    ],
    'pagination' => [
        'pageSize' => 10,
    ],
]);

return $dataProvider;

不要被上面一大堆代码吓到,我只是装个逼而已,其实SqlDataProvider必填的参数只有一项,就是sql。比如我下面的这段代码

$dataProvider = new SqlDataProvider([
    'sql' => 'SELECT * FROM `blog`'
]);

当然SqlDataProvider也有一些方法,比如getModels、getTotalCount等方法,不同的是这里面没有对象了,这里面是数组。这很正常,你不在操作AR了,你现在直接在操作数据库。

而我们在 小谈Yii Framework 2.0(yii2)的数据库层 提到过,AR才是数据表面向对象的归宿。

所以当你搭配GridView的时候,就要如下写代码了

GridView::widget([
    'dataProvider' => $dataProvider,
    'columns' => [
        ['class' => 'yii\grid\SerialColumn'],
        [
            'label' =>"昵称",
            'attribute' => 'nickname',
            'value'=>function($data){
                return $data["nickname"];
            }
        ],
        'username',
        'sex',
        'created_on',
        ['class' => 'yii\grid\ActionColumn'],
    ],
]); 

此刻的$data就是我们sql语句查出的一行。

小结 替补ActiveDataProvider,同时也提供给一些喜欢数组作为结果的小盆友们,无法使用关联属性。当然如果你是一个SQL迷,这种方法也许更直接更快速。

ArrayDataProvider

之所以出现了三种数据提供者(DataProvider)主要是因为我们要面对不同的数据来源、一个Query、一条SQL语句,你看,现在使用ArrayDataProvider你可以将一个数组作为数据来源还。

比如下面的代码

$dataProvider = new ArrayDataProvider([
    'allModels' => Blog::find()->asArray()->all(),// 或Blog::find()->all()
    'pagination' => [
        'pageSize' => 1,
    ],
]);

allModels 是一个必填项,代表数据来源,allModels是一个数组,但是allModels里面的元素并不限制,可以数组或对象,甚至是简单的数据类型,比如数字和字符串。

当然ArrayDataProvider一样可以和GridView组合,比如

GridView::widget([
    'dataProvider' => $dataProvider
]);

对于GridView而言,这三种DataProvider没有区别

小结 给你一堆数据,然后用ArrayDataProvider进行分页、排序、构造表格!也许有其使用的场景吧,不过我现在还没遇到。

小结

这篇文章的初衷是为了让大家知道yii2中一共提供了几种数据提供者以及其使用方法,具体配置等我们会在其他专栏介绍。

在本文也使用了一些文章

原文链接 https://nai8.me/site/index.html
查看原文

认证与成就

  • 获得 1 次点赞
  • 获得 8 枚徽章 获得 1 枚金徽章, 获得 6 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-07-26
个人主页被 107 人浏览