天翊

天翊 查看完整档案

成都编辑北华航天工业学院  |  测控技术与仪器 编辑聚美  |  研发 编辑 www.fanhaobai.com 编辑
编辑

个人动态

天翊 回答了问题 · 2019-04-19

解决php 定时发送短信,短信量可能有几千,然而调用的是阿里的批量短信接口,批量短信一次只能发送100个手机号 , 求思路 !!

用一个队列,或者消息,然后启多个worker去消费撒。

关注 6 回答 4

天翊 评论了文章 · 2019-04-08

php 定时任务

google百度了下,PHP任务大体上可以分为三类

最近需要去定时请求数据,然后分析之后 指定相应的文本 通过socket广播给用户。

具体的分析 制定文本的业务 不复杂。 使用curl 请求数据 。但是对于定时任务这一块怎么使用都不行。

1、服务器计划任务

WIN服务器可以直接使用定时任务执行相关的应用程序,LINUX服务器则可以通过在/etc/crontab里添加定时任务来实现。

    php -f  文件所在位置

缺点:最低一分钟的任务计划。 很多请求需要一分钟以内。 需要储存上一次的数据,下次再去取。

2、通过死循环实现任务自动执行

建立一个程序文件页面,通过ignore_user_abort来设置关闭浏览器页面后仍可执行,那么,在运行该程序页面后,您所需要的任务计划便会一直自动执行。对于一般的PHP程序员来说,如果没有足够的把握,这种操作是不被允许的,因为死循环极容易使用服务器当机。

        set_time_limit(0);   // 取消超时
        // ignore_user_abort(true);  //  浏览器关闭 继续执行 
        while (true) {
            dongsomthing();
            sleep( 120); // 定时 120秒   usleep()  毫秒  1000毫秒=1秒
        }
        die; 

sleep 和 usleep 都不太精确

缺点:长时间占据内存,经常无缘无故 自己暂停。

3、通过workman等第三方框架。

···

<?php
    use \Workerman\Worker;
    use \Workerman\Lib\Timer;
    require_once __DIR__ . './Workerman/Autoloader.php';

$task = new Worker();
// 开启多少个进程运行定时任务,注意多进程并发问题
$task->count = 1;
$task->onWorkerStart = function($task)
{
    // 每2.5秒执行一次 支持小数,可以精确到0.001,即精确到毫秒级别
    $time_interval = 30;
    Timer::add($time_interval, function()
    {
           dosomthing() 
          echo "task run\n";
    });
};

// 运行worker
Worker::runAll();
    
?>

···

可以精确到秒, 还是会经常暂停! 别人说他能跑半年 不停。 我是不是开的任务比较多,因为我又8个任务 没30秒 请求一次。
参考workman手册 (https://www.kancloud.cn/walko...
使用方法很简单 配置好php环境,然后直接命令行执行改文件就行了

  php  -f 文件所在位置

4、使用GatewayWorker搭建socket 服务端

    https://www.workerman.net/workerman-chat

因为需要分析出文本之后广播到每个客户端,那如何在调用socket方法推送给每个客户呢?
参考其他项目中推送消息 (https://www.kancloud.cn/walko...

那如何存入数据库呢?
参考https://www.kancloud.cn/walko...

参考(https://www.kancloud.cn/walko...

查看原文

天翊 评论了文章 · 2019-04-08

你还不会shell脚本吗,我来教你

这几天也是刚学习了shell脚本的编写,虽然他不像Python、Java那样能够编写很大型的项目(当然我是这么认为),但是在操控Linux系统方面,还是有独特的优势的,当然在学习过程中我们也能更好的了解Linux。接下来,就开始学习吧。后面会有几个小例子,当然都是别的地方挪过来的,我就是代码的搬运工,嘿嘿。喜欢学习的同志们可以点击Python聚焦专栏,查看更多知识。

繁琐的括号总结

基本概念

解释器种类

  • Bourne Shell(/usr/bin/sh或/bin/sh)
  • Bourne Again Shell(/bin/bash)
  • C Shell(/usr/bin/csh)
  • K Shell(/usr/bin/ksh)
  • Shell for Root(/sbin/sh)

在一般情况下,人们并不区分 Bourne Shell 和 Bourne Again Shell,所以,像 #!/bin/sh,它同样也可以改为 #!/bin/bash。

脚本编写

  • 指定使用的解释器类型
#!/bin/bash 

运行方式

作为可执行程序

chmod +x ./test.sh # 修改权限
./test.sh   # 执行

运行时一定要写成./test.sh,因为直接写test.sh,linux会去PATH里寻找,而一般只有/bin,/sbin,/usr/bin,/usr/sbin等在PATH中,所以使用./test.sh告诉系统,就在本目录下找

作为解释器参数

  • 这种方式可以不用在sh文件中写明解释器类型,写了也是没有用的
/bin/sh test.sh

基本语法

变量

  • 定义方式和python类似,只不过定义过程中不允许使用空格,默认都是字符串类型

    • declare命令定义有类型的变量

      • -i : 定义整型变量
      • -a : 定义一个数组
      • -f : 查看系统中所有定义的函数
      • -F : 查看系统中所有定义的函数名称
      • -x : 声明一个环境变量--在脚本文件中可以直接使用的变量
      • 取消定义的变量:把命令减号改成加号再执行
  • 使用时只需要使用&dollar; / &dollar;{ }就可以了
  • 只读变量:在变量定义后,再使用readonly 修饰,再次赋值会报错
  • 删除变量:unset var_name,不能删除只读变量

变量数据修改

语法说明
${var_name#规则}从变量开头进行匹配,将符合最短的数据删除
${var_name##规则}从变量开头进行匹配,将符合最长的数据删除
${var_name%规则}从变量末尾进行匹配,将符合最短的数据删除
${var_name%%规则}从变量末尾进行匹配,将符合最长的数据删除
${变量名/旧字符串/新字符串}变量内容符合旧字符串,就将第一个替换
${变量名//旧字符串/新字符串}变量内容符合旧字符串,就全部替换

字符串

  • Shell字符串

    • 单引号:不能使用变量
    • 双引号:可以使用变量,并转义\n等字符
    • 反引号:用于保存要执行的命令,同:&dollar;() 【点击例子】
    • 在进行拼接的时候是可以出现以下形式的:

      • "Hello, "$world""
      • 'Hello, '$world''
  • expr命令是从1开始索引,而普通的提取都是从零开始索引的
  • 求长度

    • 获取字符串长度:$#var_name
    • ${#str}
    • expr lenth $str
  • 求字串索引

    • expr index $str substr_reg

      • substr其实索引的是其每个字符,返回最小索引的那个
  • 匹配的字串的长度

    • expr match $str substr_reg
  • 截取

    • ${str:start}
    • ${str:start:lenth}
    • ${str:(start)}/\${str: -start} start为负数,表示从尾部开始
    • expr substr $str $start $length

数组

  • 定义:

    • array_name=(value0 value1 value2 value3) 使用空格分隔元素
    • array_name[0]=value0 / array_name[1]=value1下标可以不连续
  • 读取:

    • 单个读取:${array_name[index]}
    • 全部读取:${array_name[@]}
    • 获取长度:

      • 数组长度:length=&dollar;{#array_name[@]}/length=${#array_name[*]}
      • 单个元素长度:lengthn=${#array_name[n]}

注释

  • 单行:#
  • 多行:

    • :<<EOF . . . EOF
    • 使用其他符号替代EOF,如:'

函数

  • 定义

    • function func_name { }
    • func_name ( ) { }
  • 返回值

    • return : 一般返回一个整数,用于做判断
    • echo : 用于返回数据
    • printf
  • 局部变量

    • local修饰,不进行修饰那么函数执行后,其他地方也可以使用。
  • 函数库

    • 文件后缀是任意的,一般为lib
    • 一般不会赋予可执行权限
    • 第一行一般使用#!/bin/echo输出警告信息,避免用户执行

获取命令行参数

  • $num:num为要获取的参数位置,从0开始,分别代表文件名,参数1,参数2
  • $#:传递到脚本的参数个数
  • &dollar;&dollar;:脚本运行的当前进程ID号
  • $!:后台运行的最后一个进程的ID号
  • $?:显示最后命令的退出状态。0表示没有错误,其他任何值表明有错误。
  • $-:显示Shell使用的当前选项,与set命令功能相同。
  • $* / $@:以一个单字符串显示所有向脚本传递的参数。

    • 相同点:都是引用所有参数。
    • 不同点:只有在双引号中体现出来。假设在脚本运行时写了三个参数 1、2、3,,则 " * " 等价于 "1 2 3"(传递了一个参数),而 "@" 等价于 "1" "2" "3"(传递了三个参数)。

中括号的使用(一般可以使用test命令替换)

  • 一个变量是否为0, [ $var -eq 0 ]

    • ne:不等于
    • lt/gt:小于/大于
    • le/ge:小于等于/大于等于
  • 一个文件是否存在,[ -e $var ], 是否是目录,[ -d $var ]
  • 两个字符串是否相同, [[ $var1 = $var2 ]]
  • -a/-o:and/or
  • -e : exist
  • -r : 是否可读
  • -w : 是否可写
  • -n :判断字符串长度是否非0
  • -z :判断字符串长度是否为0
  • $ :判断字符串是否非空

运算符:原生bash不支持简单的数学运算,需要使用awk和expr,expr最常用,并且只能进行整数运算

  • 表达式和运算符之间要有空格
  • 完整的表达式要被 包含
  • 带有转移的字符需要使用\修饰才能使用
  • 使用$(())中间的运算符不需要转义并且不要求有空格,不能进行相等和不等的判断
  •   echo `expr 2 + 2` # 输出4
  • 成立返回1,不成立返回0
  • 在 MAC 中 shell 的 expr 语法是:$((表达式)),此处表达式中的 "*" 不需要转义符号 ""
  • 浮点数计算:bc

    • echo "scale=2;1+1.2" | bc

几个常用命令

  • read命令

    • 从标准输入中接收一行,并对修饰的变量赋值
    • read -p "请输入一段文字:" -n 6 -t 5 -s password

      • -p 输入提示文字
      • -n 输入字符长度限制(达到6位,自动结束)
      • -t 输入限时
      • -s 隐藏输入内容
  • echo

    • -e 开启转义,对字符串中的转义字符进行转义操作,不区分单双引号
  • printf

    • printf "%s" jim
  • test:一般用于替换中括号

    • test $num = $num2

流程控制

  • if . . . else

    • if condition
      then
          command1 
      elif condition1
      then
          command2
      else
          commandN
      fi
  • for . . . in

    • for var in data
      do
          command1
      done
  • while

    • while condition
      do
          command
      done
  • until:与while相反操作,条件为true时退出循环
  • 死循环

    • while :
      do
          command
      done
      # 使用true
      while true
      do
          command
      done
      # 使用for
      for (( ; ; ))
  • case

    • case value in
      value1)
          command1
      ;;
      value2)
          command2
      ;;
      *)
          command2
      ;;
      esac

let执行一个或多个表达式

sed(Stream Editor)命令详解,对查找到的数据进行处理【点击例子】

# 语法格式:
sed [option] "pattern command" file_name
# 删除文件第一行
sed -i '1d' file_name
  • option选项

    • n:只输出匹配的行
    • e:需要匹配的条件,可以指定多个-e "pattern command"
    • f:指定sed文件,用于封装替换"pattern command"
    • r:用于支持正则表达式
    • 修改输出内容:sed -n 's/love/like/g;p' sed.txt
    • i:修改源文件
  • pattern

    • 可以使用正则表达式
    • 可以使用变量,只要按照脚本使用变量就可以:双引号,$var_name
    • 匹配/需要进行转义
    • 按行匹配的时候,行数在后面如果小于前一个匹配模式,那么久只显示满足前一个条件的行
    • =:显示行号
  • command

    • a : 在匹配到行的下一行添加字符串
    • i :在匹配到行的上一行添加字符串
    • r :在匹配到行的下一行添加file内容
    • w :将匹配到的行写入文件
    • d :删除数据
    • p :打印数据
    • g :修改数据时全部匹配,3g表示从第三个开始全部修改,ig忽略大小写
    • = :显示匹配到的行号
  • 反向引用

    • 在使用替换字符的时候,修改内容使用&表示使用被替换的条件

      • # 在匹配到^la..jim的后面加shuai
        # &:全匹配,\1:其使用了正则的分组,所以前面需要使用小括号括起来
        sed -i 's/^la..jim/&shuai/g' sed.txt
  • 命令详解

awk工作模式【点击例子】

# 语法格式:
awk 'BEGIN{}pattern{commands}END{}' file_name

小例子

  • <span id="find_user_all">查询所有用户</span>
for user in `cat /etc/passwd | cut -d ":" -f 1`
do
    echo "$user"
done
  • <span id="start_nginx">启动nginx</span>
nginx_num_process=$(ps -ef | grep nginx | grep -v grep | wc -l)
if [ nginx_num_process -eq 0 ];then
    systemctl start nginx
fi
  • <span id="num_add">用户输入num,求1-num之和</span>
while true
do
    read -p "pls input a positive number: " num
    expr $num + 1 &> /dev/null
    if [ $? -eq 0 ];then
        if [ `expr $num \> 0` -eq 1 ];then
            for((i=1;i<=$num;i++))
            do
                sum=`expr $sum + $i`
            done    
            echo "1+2+3+....+$num = $sum"
            exit
        fi
    fi
    echo "error,input enlegal"
    continue
done
  • <span id="check_nginx">检查Nginx是否正常运行,宕机则启动它</span>
this_pid=$$

while true
do
ps -ef | grep nginx | grep -v grep | grep -v $this_pid &> /dev/null

if [ $? -eq 0 ];then
    echo "Nginx is running well"
    sleep 3
else
    systemctl start nginx
    echo "Nginx is down,Start it...."
fi
done
  • <span id="find_mysql">查找mysql配置文件中有几段</span>
FILE_NAME=/root/lesson/5.6/my.cnf
function get_all_segments
{
    echo "`sed -n '/\[.*\]/p' $FILE_NAME  | sed -e 's/\[//g' -e 's/\]//g'`"
}
function count_items_in_segment
{
    items=`sed -n '/\['$1'\]/,/\[.*\]/p' $FILE_NAME | grep -v "^#" | grep -v "^$" | grep -v "\[.*\]"`
    index=0
    for item in $items
    do
        index=`expr $index + 1`
    done
    echo $index
}
number=0
for segment in `get_all_segments`
do
    number=`expr $number + 1`
    items_count=`count_items_in_segment $segment`
    echo "$number: $segment  $items_count"
done
  • <span id="del_blank">删除配置文件中所有的注释行和空行</span>
sed -i '/[:blank:]*#/^$/d' config.cnf
  • <span id="add_not_ano">在非#注释行前加*</span>
sed -i 's/^[^#]/\*&/g' config.cnf
  • <span id="text_insert_mysql">文本格式化数据插入mysql</span>
user=""
password=""
host=""
mysql_conn="mysql -u"$user" -p"$password" -h"$host""
IFS=":" # 内置分隔符变量
cat data.txt | while read id name birth sex
do
  $mysql_conn -e "insert into school values('$id','$name','$birth','$sex')"
done
  • <span id="script_use_ftp">脚本使用ftp</span>
ftp -inv << EOF
open ftp_ip_addr
user user_name password

put file_name
bye
EOF #必须顶格写

小东东

  • nohub + & 后台启动 : nohub不间断的运行程序,关闭窗口也不会关闭进程,&用于后台运行
  • netstat -tnlp | grep port : 一般用于查看端口
  • &&当左侧的命令返回0(成功)才会执行右侧命令
  • cut -d ":"制定分隔符
  • free -m:内存使用情况
  • df -h:磁盘使用情况
  • n >& m:将输出文件 m 和 n 合并
  • n <& m:将输入文件 m 和 n 合并
  • << tag:将开始标记 tag 和结束标记 tag 之间的内容作为输入
  • grep -E等同于egrep,用于扩展支持正则表达式
  • cat -n file显示行号输出
  • /sbin/nologin 不可以登陆的用户
  • [:blank:]表示空格
  • ^$表示空行
  • sh -x可以查看执行过程
  • 根据其他表的结构创建新表

    • create table new_table like other_table
  • mysql -B不显示边框 -E表示垂直显示 -H输出html -X输出xml -N不显示列名
  • mysqldumps备份mysql

    • d :只导出表结构
    • t :只导出数据,不导出建表语句
    • A :导出所有数据库
    • B :导出一个或者多个数据库
  • crontab定时任务
查看原文

天翊 评论了文章 · 2019-03-27

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

天翊 评论了文章 · 2019-03-27

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

天翊 评论了文章 · 2019-03-27

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 评论了文章 · 2019-03-26

使用Supervisor管理进程

首发于 樊浩柏科学院

Supervisor 是一款使用 Python 开发的非常优秀的进程管理工具。它可以在类 UNIX 系统上让用户精确地监视与控制多组指定数量的服务进程。当监控的服务进程意外退出时,会尝试自动重启这些服务,以保持服务可用状态。

安装

Supervisor 官方 提供的安装方式较多,这里采用 pip 方式安装。

安装pip

$ yum install python-pip
# 升级pip
$ pip install --upgrade pip
$ pip -V
pip 9.0.1

安装Supervisor

通过 pip 安装 Supervisor:

$ pip install supervisor
Successfully installed supervisor-3.3.3

安装 Supervisor 后,会出现 supervisorctl 和 supervisord 两个程序,其中 supervisorctl 为服务监控终端,而 supervisord 才是所有监控服务的大脑。查看 supervisord 是否安装成功:

$ supervisord -v
3.3.3

开机启动

将 supervisord 配置成开机启动服务,下载官方 init 脚本

修改关键路径配置:

PIDFILE=/var/run/supervisord.pid
LOCKFILE=/var/lock/subsys/supervisord
OPTIONS="-c /etc/supervisord.conf"

移到该文件到/etc/init.d目录下,并重命名为 supervisor,添加可执行权限:

$ chmod 777 /etc/init.d/supervisor

配置成开机启动服务:

$ chkconfig --add supervisor
$ chkconfig supervisor on
$ chkconfig --list | grep "supervisor"
supervisor  0:off 1:off 2:on 3:on 4:on 5:on 6:off

配置

生成配置文件

Supervisord 安装后,需要使用如下命令生成配置文件。

$ mkdir /etc/supervisor
$ echo_supervisord_conf > /etc/supervisor/supervisord.conf

主配置部分

supervisord.conf的主配置部分说明:

[unix_http_server]
file=/tmp/supervisor.sock   ; socket文件的路径
;chmod=0700                 ; socket文件权限
;chown=nobody:nogroup       ; socket文件用户和用户组
;username=user              ; 连接时认证的用户名
;password=123               ; 连接时认证的密码

[inet_http_server]          ; 监听TCP
port=127.0.0.1:9001         ; 监听ip和端口
username=user               ; 连接时认证的用户名
password=123                ; 连接时认证的密码

[supervisord]
logfile=/var/log/supervisord.log ; log目录
logfile_maxbytes=50MB        ; log文件最大空间
logfile_backups=10           ; log文件保持的数量
loglevel=info                ; log级别
pidfile=/var/run/supervisord.pid
nodaemon=false               ; 是否非守护进程态运行
minfds=1024                  ; 系统空闲的最少文件描述符
minprocs=200                 ; 可用的最小进程描述符
;umask=022                   ; 进程创建文件的掩码
;identifier=supervisor       ; supervisord标识符
;directory=/tmp              ; 启动前切换到的目录
;nocleanup=true              ; 启动前是否清除子进程的日志文件
;childlogdir=/tmp            ; AUTO模式,子进程日志路径
;environment=KEY="value"     ; 设置环境变量

[rpcinterface:supervisor]    ; XML_RPC配置
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; 连接的socket路径
;username=chris               ; 用户名
;password=123                 ; 密码
prompt=mysupervisor           ; 输入用户名和密码时的提示符
;history_file=~/.sc_history   ; 历史操作记录存储路径

[include]                     ; 包含文件,将每个进程配置为一个文件并包含
files = /etc/supervisor/*.ini ; 多个进程的配置文件

这部分我们不需要做太多的配置修改,如果需要开启 WEB 终端监控,则需要配置并开启 inet_http_server 项。

进程配置部分

Supervisor 需管理的进程服务配置,示例如下:

[program:work]                      ; 服务名,例如work
command=php -r "sleep(10);exit(1);" ; 带有参数的可执行命令
process_name=%(process_num)s        ; 进程名,当numprocs>1时,需包含%(process_num)s
numprocs=2                          ; 启动进程的数目数
;directory=/tmp                     ; 运行前切换到该目录
;umask=022                          ; 进程掩码
;priority=999                       ; 子进程启动关闭优先级
autostart=true                      ; 子进程是否被自动启动
startsecs=1                         ; 成功启动几秒后则认为成功启动
;startretries=3                     ; 子进程启动失败后,最大尝试启动的次数
autorestart=unexpected            ; 子进程意外退出后自动重启的选项,false, unexpected, true。unexpected表示不在exitcodes列表时重启
exitcodes=0,2                     ; 期待的子程序退出码
;stopsignal=QUIT                  ; 进程停止信号,可以为TERM,HUP,INT,QUIT,KILL,USR1,or USR2等信号,默认为TERM
;stopwaitsecs=10                  ; 发送停止信号后等待的最大时间
;stopasgroup=false                ; 是否向子进程组发送停止信号
;killasgroup=false                ; 是否向子进程组发送kill信号
;redirect_stderr=true             ; 是否重定向日志到标准输出
stdout_logfile=/data/logs/work.log ; 进程的stdout的日志路径
;stdout_logfile_maxbytes=1MB      ; 日志文件最大大小
;stdout_logfile_backups=10
;stdout_capture_maxbytes=1MB
;stderr_logfile=/a/path           ; stderr的日志路径
;stderr_logfile_maxbytes=1MB
;stderr_logfile_backups=10
;stderr_capture_maxbytes=1MB
;environment=A="1",B="2"          ; 子进程的环境变量
;serverurl=AUTO                   ; 子进程的环境变量SUPERVISOR_SERVER_URL 
通常将每个进程的配置信息配置成独立文件,并通过 include 模块包含,这样方便修改和管理配置文件。

启动

配置完成后,启动 supervisord 守护服务:

$ supervisord -c /etc/supervisor/supervisord.conf

常用的命令参数说明:

  • -c:指定配置文件路径
  • -n:是否非守护态运行
  • -l:日志文件目录
  • -i:唯一标识

查看 supervisord 启动情况:

$ ps -ef | grep "supervisor"
root  24901  1  0 Sep23 ? 00:00:30 /usr/bin/python /usr/bin/supervisord -c /etc/supervisor/supervisord.conf
$ netstat -tunpl
tcp 0 0 127.0.0.1:9001  0.0.0.0:*  LISTEN  24901/python

监控进程

Supervisor 提供了多种监控服务的方式,包括 supervisorctl 命令行终端、Web 端、XML_RPC 接口多种方式。

命令终端

直接使用 supervisorctl 即可在命令行终端查看所有服务的情况,如下:

$ supervisorctl 
work:0      RUNNING   pid 31313, uptime 0:00:07
work:1      RUNNING   pid 31318, uptime 0:00:06
# -u 用户名 -p 密码

supervisorctl 常用命令列表如下;

  • status:查看服务状态
  • update:重新加载配置文件
  • restart:重新启动服务
  • stop:停止服务
  • pid:查看某服务的 pid
  • tail:输出最新的 log 信息
  • shutdown:关闭 supervisord 服务

Web

在配置中开启 inet_http_server 后,即可通过 Web 界面便捷地监控进程服务了。

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

认证与成就

  • 获得 332 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-11-11
个人主页被 1.4k 人浏览