天翊

天翊 查看完整档案

成都编辑北华航天工业学院  |  测控技术与仪器 编辑聚美  |  研发 编辑 www.fanhaobai.com 编辑
编辑

个人动态

天翊 回答了问题 · 2019-04-19

解决php 定时发送短信,短信量可能有几千,然而调用的是阿里的批量短信接口,批量短信一次只能发送100个手机号 , 求思路 !!

用一个队列,或者消息,然后启多个worker去消费撒。

关注 6 回答 4

天翊 评论了文章 · 2019-04-08

php 定时任务

google百度了下,PHP任务大体上可以分为三类

最近需要去定时请求数据,然后分析之后 指定相应的文本 通过socket广播给用户。

具体的分析 制定文本的业务 不复杂。 使用curl 请求数据 。但是对于定时任务这一块怎么使用都不行。

1、服务器计划任务

WIN服务器可以直接使用定时任务执行相关的应用程序,LINUX服务器则可以通过在/etc/crontab里添加定时任务来实现。

    php -f  文件所在位置

缺点:最低一分钟的任务计划。 很多请求需要一分钟以内。 需要储存上一次的数据,下次再去取。

2、通过死循环实现任务自动执行

建立一个程序文件页面,通过ignore_user_abort来设置关闭浏览器页面后仍可执行,那么,在运行该程序页面后,您所需要的任务计划便会一直自动执行。对于一般的PHP程序员来说,如果没有足够的把握,这种操作是不被允许的,因为死循环极容易使用服务器当机。

        set_time_limit(0);   // 取消超时
        // ignore_user_abort(true);  //  浏览器关闭 继续执行 
        while (true) {
            dongsomthing();
            sleep( 120); // 定时 120秒   usleep()  毫秒  1000毫秒=1秒
        }
        die; 

sleep 和 usleep 都不太精确

缺点:长时间占据内存,经常无缘无故 自己暂停。

3、通过workman等第三方框架。

···

<?php
    use \Workerman\Worker;
    use \Workerman\Lib\Timer;
    require_once __DIR__ . './Workerman/Autoloader.php';

$task = new Worker();
// 开启多少个进程运行定时任务,注意多进程并发问题
$task->count = 1;
$task->onWorkerStart = function($task)
{
    // 每2.5秒执行一次 支持小数,可以精确到0.001,即精确到毫秒级别
    $time_interval = 30;
    Timer::add($time_interval, function()
    {
           dosomthing() 
          echo "task run\n";
    });
};

// 运行worker
Worker::runAll();
    
?>

···

可以精确到秒, 还是会经常暂停! 别人说他能跑半年 不停。 我是不是开的任务比较多,因为我又8个任务 没30秒 请求一次。
参考workman手册 (https://www.kancloud.cn/walko...
使用方法很简单 配置好php环境,然后直接命令行执行改文件就行了

  php  -f 文件所在位置

4、使用GatewayWorker搭建socket 服务端

    https://www.workerman.net/workerman-chat

因为需要分析出文本之后广播到每个客户端,那如何在调用socket方法推送给每个客户呢?
参考其他项目中推送消息 (https://www.kancloud.cn/walko...

那如何存入数据库呢?
参考https://www.kancloud.cn/walko...

参考(https://www.kancloud.cn/walko...

查看原文

天翊 评论了文章 · 2019-04-08

你还不会shell脚本吗,我来教你

这几天也是刚学习了shell脚本的编写,虽然他不像Python、Java那样能够编写很大型的项目(当然我是这么认为),但是在操控Linux系统方面,还是有独特的优势的,当然在学习过程中我们也能更好的了解Linux。接下来,就开始学习吧。后面会有几个小例子,当然都是别的地方挪过来的,我就是代码的搬运工,嘿嘿。喜欢学习的同志们可以点击Python聚焦专栏,查看更多知识。

繁琐的括号总结

基本概念

解释器种类

  • Bourne Shell(/usr/bin/sh或/bin/sh)
  • Bourne Again Shell(/bin/bash)
  • C Shell(/usr/bin/csh)
  • K Shell(/usr/bin/ksh)
  • Shell for Root(/sbin/sh)

在一般情况下,人们并不区分 Bourne Shell 和 Bourne Again Shell,所以,像 #!/bin/sh,它同样也可以改为 #!/bin/bash。

脚本编写

  • 指定使用的解释器类型
#!/bin/bash 

运行方式

作为可执行程序

chmod +x ./test.sh # 修改权限
./test.sh   # 执行

运行时一定要写成./test.sh,因为直接写test.sh,linux会去PATH里寻找,而一般只有/bin,/sbin,/usr/bin,/usr/sbin等在PATH中,所以使用./test.sh告诉系统,就在本目录下找

作为解释器参数

  • 这种方式可以不用在sh文件中写明解释器类型,写了也是没有用的
/bin/sh test.sh

基本语法

变量

  • 定义方式和python类似,只不过定义过程中不允许使用空格,默认都是字符串类型

    • declare命令定义有类型的变量

      • -i : 定义整型变量
      • -a : 定义一个数组
      • -f : 查看系统中所有定义的函数
      • -F : 查看系统中所有定义的函数名称
      • -x : 声明一个环境变量--在脚本文件中可以直接使用的变量
      • 取消定义的变量:把命令减号改成加号再执行
  • 使用时只需要使用&dollar; / &dollar;{ }就可以了
  • 只读变量:在变量定义后,再使用readonly 修饰,再次赋值会报错
  • 删除变量:unset var_name,不能删除只读变量

变量数据修改

语法说明
${var_name#规则}从变量开头进行匹配,将符合最短的数据删除
${var_name##规则}从变量开头进行匹配,将符合最长的数据删除
${var_name%规则}从变量末尾进行匹配,将符合最短的数据删除
${var_name%%规则}从变量末尾进行匹配,将符合最长的数据删除
${变量名/旧字符串/新字符串}变量内容符合旧字符串,就将第一个替换
${变量名//旧字符串/新字符串}变量内容符合旧字符串,就全部替换

字符串

  • Shell字符串

    • 单引号:不能使用变量
    • 双引号:可以使用变量,并转义\n等字符
    • 反引号:用于保存要执行的命令,同:&dollar;() 【点击例子】
    • 在进行拼接的时候是可以出现以下形式的:

      • "Hello, "$world""
      • 'Hello, '$world''
  • expr命令是从1开始索引,而普通的提取都是从零开始索引的
  • 求长度

    • 获取字符串长度:$#var_name
    • ${#str}
    • expr lenth $str
  • 求字串索引

    • expr index $str substr_reg

      • substr其实索引的是其每个字符,返回最小索引的那个
  • 匹配的字串的长度

    • expr match $str substr_reg
  • 截取

    • ${str:start}
    • ${str:start:lenth}
    • ${str:(start)}/\${str: -start} start为负数,表示从尾部开始
    • expr substr $str $start $length

数组

  • 定义:

    • array_name=(value0 value1 value2 value3) 使用空格分隔元素
    • array_name[0]=value0 / array_name[1]=value1下标可以不连续
  • 读取:

    • 单个读取:${array_name[index]}
    • 全部读取:${array_name[@]}
    • 获取长度:

      • 数组长度:length=&dollar;{#array_name[@]}/length=${#array_name[*]}
      • 单个元素长度:lengthn=${#array_name[n]}

注释

  • 单行:#
  • 多行:

    • :<<EOF . . . EOF
    • 使用其他符号替代EOF,如:'

函数

  • 定义

    • function func_name { }
    • func_name ( ) { }
  • 返回值

    • return : 一般返回一个整数,用于做判断
    • echo : 用于返回数据
    • printf
  • 局部变量

    • local修饰,不进行修饰那么函数执行后,其他地方也可以使用。
  • 函数库

    • 文件后缀是任意的,一般为lib
    • 一般不会赋予可执行权限
    • 第一行一般使用#!/bin/echo输出警告信息,避免用户执行

获取命令行参数

  • $num:num为要获取的参数位置,从0开始,分别代表文件名,参数1,参数2
  • $#:传递到脚本的参数个数
  • &dollar;&dollar;:脚本运行的当前进程ID号
  • $!:后台运行的最后一个进程的ID号
  • $?:显示最后命令的退出状态。0表示没有错误,其他任何值表明有错误。
  • $-:显示Shell使用的当前选项,与set命令功能相同。
  • $* / $@:以一个单字符串显示所有向脚本传递的参数。

    • 相同点:都是引用所有参数。
    • 不同点:只有在双引号中体现出来。假设在脚本运行时写了三个参数 1、2、3,,则 " * " 等价于 "1 2 3"(传递了一个参数),而 "@" 等价于 "1" "2" "3"(传递了三个参数)。

中括号的使用(一般可以使用test命令替换)

  • 一个变量是否为0, [ $var -eq 0 ]

    • ne:不等于
    • lt/gt:小于/大于
    • le/ge:小于等于/大于等于
  • 一个文件是否存在,[ -e $var ], 是否是目录,[ -d $var ]
  • 两个字符串是否相同, [[ $var1 = $var2 ]]
  • -a/-o:and/or
  • -e : exist
  • -r : 是否可读
  • -w : 是否可写
  • -n :判断字符串长度是否非0
  • -z :判断字符串长度是否为0
  • $ :判断字符串是否非空

运算符:原生bash不支持简单的数学运算,需要使用awk和expr,expr最常用,并且只能进行整数运算

  • 表达式和运算符之间要有空格
  • 完整的表达式要被 包含
  • 带有转移的字符需要使用\修饰才能使用
  • 使用$(())中间的运算符不需要转义并且不要求有空格,不能进行相等和不等的判断
  •   echo `expr 2 + 2` # 输出4
  • 成立返回1,不成立返回0
  • 在 MAC 中 shell 的 expr 语法是:$((表达式)),此处表达式中的 "*" 不需要转义符号 ""
  • 浮点数计算:bc

    • echo "scale=2;1+1.2" | bc

几个常用命令

  • read命令

    • 从标准输入中接收一行,并对修饰的变量赋值
    • read -p "请输入一段文字:" -n 6 -t 5 -s password

      • -p 输入提示文字
      • -n 输入字符长度限制(达到6位,自动结束)
      • -t 输入限时
      • -s 隐藏输入内容
  • echo

    • -e 开启转义,对字符串中的转义字符进行转义操作,不区分单双引号
  • printf

    • printf "%s" jim
  • test:一般用于替换中括号

    • test $num = $num2

流程控制

  • if . . . else

    • if condition
      then
          command1 
      elif condition1
      then
          command2
      else
          commandN
      fi
  • for . . . in

    • for var in data
      do
          command1
      done
  • while

    • while condition
      do
          command
      done
  • until:与while相反操作,条件为true时退出循环
  • 死循环

    • while :
      do
          command
      done
      # 使用true
      while true
      do
          command
      done
      # 使用for
      for (( ; ; ))
  • case

    • case value in
      value1)
          command1
      ;;
      value2)
          command2
      ;;
      *)
          command2
      ;;
      esac

let执行一个或多个表达式

sed(Stream Editor)命令详解,对查找到的数据进行处理【点击例子】

# 语法格式:
sed [option] "pattern command" file_name
# 删除文件第一行
sed -i '1d' file_name
  • option选项

    • n:只输出匹配的行
    • e:需要匹配的条件,可以指定多个-e "pattern command"
    • f:指定sed文件,用于封装替换"pattern command"
    • r:用于支持正则表达式
    • 修改输出内容:sed -n 's/love/like/g;p' sed.txt
    • i:修改源文件
  • pattern

    • 可以使用正则表达式
    • 可以使用变量,只要按照脚本使用变量就可以:双引号,$var_name
    • 匹配/需要进行转义
    • 按行匹配的时候,行数在后面如果小于前一个匹配模式,那么久只显示满足前一个条件的行
    • =:显示行号
  • command

    • a : 在匹配到行的下一行添加字符串
    • i :在匹配到行的上一行添加字符串
    • r :在匹配到行的下一行添加file内容
    • w :将匹配到的行写入文件
    • d :删除数据
    • p :打印数据
    • g :修改数据时全部匹配,3g表示从第三个开始全部修改,ig忽略大小写
    • = :显示匹配到的行号
  • 反向引用

    • 在使用替换字符的时候,修改内容使用&表示使用被替换的条件

      • # 在匹配到^la..jim的后面加shuai
        # &:全匹配,\1:其使用了正则的分组,所以前面需要使用小括号括起来
        sed -i 's/^la..jim/&shuai/g' sed.txt
  • 命令详解

awk工作模式【点击例子】

# 语法格式:
awk 'BEGIN{}pattern{commands}END{}' file_name

小例子

  • <span id="find_user_all">查询所有用户</span>
for user in `cat /etc/passwd | cut -d ":" -f 1`
do
    echo "$user"
done
  • <span id="start_nginx">启动nginx</span>
nginx_num_process=$(ps -ef | grep nginx | grep -v grep | wc -l)
if [ nginx_num_process -eq 0 ];then
    systemctl start nginx
fi
  • <span id="num_add">用户输入num,求1-num之和</span>
while true
do
    read -p "pls input a positive number: " num
    expr $num + 1 &> /dev/null
    if [ $? -eq 0 ];then
        if [ `expr $num \> 0` -eq 1 ];then
            for((i=1;i<=$num;i++))
            do
                sum=`expr $sum + $i`
            done    
            echo "1+2+3+....+$num = $sum"
            exit
        fi
    fi
    echo "error,input enlegal"
    continue
done
  • <span id="check_nginx">检查Nginx是否正常运行,宕机则启动它</span>
this_pid=$$

while true
do
ps -ef | grep nginx | grep -v grep | grep -v $this_pid &> /dev/null

if [ $? -eq 0 ];then
    echo "Nginx is running well"
    sleep 3
else
    systemctl start nginx
    echo "Nginx is down,Start it...."
fi
done
  • <span id="find_mysql">查找mysql配置文件中有几段</span>
FILE_NAME=/root/lesson/5.6/my.cnf
function get_all_segments
{
    echo "`sed -n '/\[.*\]/p' $FILE_NAME  | sed -e 's/\[//g' -e 's/\]//g'`"
}
function count_items_in_segment
{
    items=`sed -n '/\['$1'\]/,/\[.*\]/p' $FILE_NAME | grep -v "^#" | grep -v "^$" | grep -v "\[.*\]"`
    index=0
    for item in $items
    do
        index=`expr $index + 1`
    done
    echo $index
}
number=0
for segment in `get_all_segments`
do
    number=`expr $number + 1`
    items_count=`count_items_in_segment $segment`
    echo "$number: $segment  $items_count"
done
  • <span id="del_blank">删除配置文件中所有的注释行和空行</span>
sed -i '/[:blank:]*#/^$/d' config.cnf
  • <span id="add_not_ano">在非#注释行前加*</span>
sed -i 's/^[^#]/\*&/g' config.cnf
  • <span id="text_insert_mysql">文本格式化数据插入mysql</span>
user=""
password=""
host=""
mysql_conn="mysql -u"$user" -p"$password" -h"$host""
IFS=":" # 内置分隔符变量
cat data.txt | while read id name birth sex
do
  $mysql_conn -e "insert into school values('$id','$name','$birth','$sex')"
done
  • <span id="script_use_ftp">脚本使用ftp</span>
ftp -inv << EOF
open ftp_ip_addr
user user_name password

put file_name
bye
EOF #必须顶格写

小东东

  • nohub + & 后台启动 : nohub不间断的运行程序,关闭窗口也不会关闭进程,&用于后台运行
  • netstat -tnlp | grep port : 一般用于查看端口
  • &&当左侧的命令返回0(成功)才会执行右侧命令
  • cut -d ":"制定分隔符
  • free -m:内存使用情况
  • df -h:磁盘使用情况
  • n >& m:将输出文件 m 和 n 合并
  • n <& m:将输入文件 m 和 n 合并
  • << tag:将开始标记 tag 和结束标记 tag 之间的内容作为输入
  • grep -E等同于egrep,用于扩展支持正则表达式
  • cat -n file显示行号输出
  • /sbin/nologin 不可以登陆的用户
  • [:blank:]表示空格
  • ^$表示空行
  • sh -x可以查看执行过程
  • 根据其他表的结构创建新表

    • create table new_table like other_table
  • mysql -B不显示边框 -E表示垂直显示 -H输出html -X输出xml -N不显示列名
  • mysqldumps备份mysql

    • d :只导出表结构
    • t :只导出数据,不导出建表语句
    • A :导出所有数据库
    • B :导出一个或者多个数据库
  • crontab定时任务
查看原文

天翊 评论了文章 · 2019-03-27

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

天翊 评论了文章 · 2019-03-27

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

天翊 评论了文章 · 2019-03-27

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 评论了文章 · 2019-03-26

使用Supervisor管理进程

首发于 樊浩柏科学院

Supervisor 是一款使用 Python 开发的非常优秀的进程管理工具。它可以在类 UNIX 系统上让用户精确地监视与控制多组指定数量的服务进程。当监控的服务进程意外退出时,会尝试自动重启这些服务,以保持服务可用状态。

安装

Supervisor 官方 提供的安装方式较多,这里采用 pip 方式安装。

安装pip

$ yum install python-pip
# 升级pip
$ pip install --upgrade pip
$ pip -V
pip 9.0.1

安装Supervisor

通过 pip 安装 Supervisor:

$ pip install supervisor
Successfully installed supervisor-3.3.3

安装 Supervisor 后,会出现 supervisorctl 和 supervisord 两个程序,其中 supervisorctl 为服务监控终端,而 supervisord 才是所有监控服务的大脑。查看 supervisord 是否安装成功:

$ supervisord -v
3.3.3

开机启动

将 supervisord 配置成开机启动服务,下载官方 init 脚本

修改关键路径配置:

PIDFILE=/var/run/supervisord.pid
LOCKFILE=/var/lock/subsys/supervisord
OPTIONS="-c /etc/supervisord.conf"

移到该文件到/etc/init.d目录下,并重命名为 supervisor,添加可执行权限:

$ chmod 777 /etc/init.d/supervisor

配置成开机启动服务:

$ chkconfig --add supervisor
$ chkconfig supervisor on
$ chkconfig --list | grep "supervisor"
supervisor  0:off 1:off 2:on 3:on 4:on 5:on 6:off

配置

生成配置文件

Supervisord 安装后,需要使用如下命令生成配置文件。

$ mkdir /etc/supervisor
$ echo_supervisord_conf > /etc/supervisor/supervisord.conf

主配置部分

supervisord.conf的主配置部分说明:

[unix_http_server]
file=/tmp/supervisor.sock   ; socket文件的路径
;chmod=0700                 ; socket文件权限
;chown=nobody:nogroup       ; socket文件用户和用户组
;username=user              ; 连接时认证的用户名
;password=123               ; 连接时认证的密码

[inet_http_server]          ; 监听TCP
port=127.0.0.1:9001         ; 监听ip和端口
username=user               ; 连接时认证的用户名
password=123                ; 连接时认证的密码

[supervisord]
logfile=/var/log/supervisord.log ; log目录
logfile_maxbytes=50MB        ; log文件最大空间
logfile_backups=10           ; log文件保持的数量
loglevel=info                ; log级别
pidfile=/var/run/supervisord.pid
nodaemon=false               ; 是否非守护进程态运行
minfds=1024                  ; 系统空闲的最少文件描述符
minprocs=200                 ; 可用的最小进程描述符
;umask=022                   ; 进程创建文件的掩码
;identifier=supervisor       ; supervisord标识符
;directory=/tmp              ; 启动前切换到的目录
;nocleanup=true              ; 启动前是否清除子进程的日志文件
;childlogdir=/tmp            ; AUTO模式,子进程日志路径
;environment=KEY="value"     ; 设置环境变量

[rpcinterface:supervisor]    ; XML_RPC配置
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; 连接的socket路径
;username=chris               ; 用户名
;password=123                 ; 密码
prompt=mysupervisor           ; 输入用户名和密码时的提示符
;history_file=~/.sc_history   ; 历史操作记录存储路径

[include]                     ; 包含文件,将每个进程配置为一个文件并包含
files = /etc/supervisor/*.ini ; 多个进程的配置文件

这部分我们不需要做太多的配置修改,如果需要开启 WEB 终端监控,则需要配置并开启 inet_http_server 项。

进程配置部分

Supervisor 需管理的进程服务配置,示例如下:

[program:work]                      ; 服务名,例如work
command=php -r "sleep(10);exit(1);" ; 带有参数的可执行命令
process_name=%(process_num)s        ; 进程名,当numprocs>1时,需包含%(process_num)s
numprocs=2                          ; 启动进程的数目数
;directory=/tmp                     ; 运行前切换到该目录
;umask=022                          ; 进程掩码
;priority=999                       ; 子进程启动关闭优先级
autostart=true                      ; 子进程是否被自动启动
startsecs=1                         ; 成功启动几秒后则认为成功启动
;startretries=3                     ; 子进程启动失败后,最大尝试启动的次数
autorestart=unexpected            ; 子进程意外退出后自动重启的选项,false, unexpected, true。unexpected表示不在exitcodes列表时重启
exitcodes=0,2                     ; 期待的子程序退出码
;stopsignal=QUIT                  ; 进程停止信号,可以为TERM,HUP,INT,QUIT,KILL,USR1,or USR2等信号,默认为TERM
;stopwaitsecs=10                  ; 发送停止信号后等待的最大时间
;stopasgroup=false                ; 是否向子进程组发送停止信号
;killasgroup=false                ; 是否向子进程组发送kill信号
;redirect_stderr=true             ; 是否重定向日志到标准输出
stdout_logfile=/data/logs/work.log ; 进程的stdout的日志路径
;stdout_logfile_maxbytes=1MB      ; 日志文件最大大小
;stdout_logfile_backups=10
;stdout_capture_maxbytes=1MB
;stderr_logfile=/a/path           ; stderr的日志路径
;stderr_logfile_maxbytes=1MB
;stderr_logfile_backups=10
;stderr_capture_maxbytes=1MB
;environment=A="1",B="2"          ; 子进程的环境变量
;serverurl=AUTO                   ; 子进程的环境变量SUPERVISOR_SERVER_URL 
通常将每个进程的配置信息配置成独立文件,并通过 include 模块包含,这样方便修改和管理配置文件。

启动

配置完成后,启动 supervisord 守护服务:

$ supervisord -c /etc/supervisor/supervisord.conf

常用的命令参数说明:

  • -c:指定配置文件路径
  • -n:是否非守护态运行
  • -l:日志文件目录
  • -i:唯一标识

查看 supervisord 启动情况:

$ ps -ef | grep "supervisor"
root  24901  1  0 Sep23 ? 00:00:30 /usr/bin/python /usr/bin/supervisord -c /etc/supervisor/supervisord.conf
$ netstat -tunpl
tcp 0 0 127.0.0.1:9001  0.0.0.0:*  LISTEN  24901/python

监控进程

Supervisor 提供了多种监控服务的方式,包括 supervisorctl 命令行终端、Web 端、XML_RPC 接口多种方式。

命令终端

直接使用 supervisorctl 即可在命令行终端查看所有服务的情况,如下:

$ supervisorctl 
work:0      RUNNING   pid 31313, uptime 0:00:07
work:1      RUNNING   pid 31318, uptime 0:00:06
# -u 用户名 -p 密码

supervisorctl 常用命令列表如下;

  • status:查看服务状态
  • update:重新加载配置文件
  • restart:重新启动服务
  • stop:停止服务
  • pid:查看某服务的 pid
  • tail:输出最新的 log 信息
  • shutdown:关闭 supervisord 服务

Web

在配置中开启 inet_http_server 后,即可通过 Web 界面便捷地监控进程服务了。

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 评论了文章 · 2019-03-26

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

天翊 发布了文章 · 2019-03-25

商品价格的多币种方案

首发于 樊浩柏科学院

假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。

预览图

需求

需求点,可以描述为:

  • 购买的用户,商品价格需要支持卢比;
  • 营运人员,商品管理系统依然使用人民币价格;

同样这个需求,定了以下两个硬指标:

  • 必须实现需求;
  • 必须快速上线;

问题

首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:

  • 涉及商品价格的系统众多;
  • 各上层系统调用商品价格接口繁多;
  • 商品价格相关字段较多;

为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。

可行性调研

改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:

1、数据表价格字段存卢比

将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。

2、接口输出数据时转化为卢比

原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。

针对以上方案,我们需要注意 2 个问题:

  • 汇率会每天变化,所以商品价格也会变化;
  • 后续商品价格,可能须支持多币种;

上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。

解决方案

最终,为了系统的可扩展性,我们选择了方案 ②。

解决方案

这里主要改造了商品中心,主要解决 透传地区标识支持多币种价格 这 2 个问题。

透传地区标识

我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。

机制原理

思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。

透传地区标识机制

代码实现

传递全局上下文

由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:

class Location
{
    public static function init()
    {
        global $context;

        if (empty($context['location'])) {
            return;
        }

        // API在这里直接获取X-Location头
        if (!empty($_SERVER['HTTP_X_LOCATION'])) {
            $context['location'] = $_SERVER['HTTP_X_LOCATION'];
        }
        // RPC Server会自动获取Context
    }
}
上述init()方法,需要在项目入口位置初始化。

其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context

RPC Client 传递 Context 实现如下:

protected function addGlobalContext($data)
{
    global $context;

    $context = !is_array($context) ? array() : $context;
    
    // data为待请求的RPC协议数据
    $data['Context'] = $context;
    return $data;
}

RPC Server 获取 Context 实现如下:

public function getGlobalContext($packet)
{
    global $context;
    
    $context = array();
    // packet为接收的RPC协议数据
    if(isset($packet['Context'])) {
        $context = $packet['Context'];
    }
}

当设置了 Context 后,RPC 通信时协议数据会携带location字段,内容如下:

RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
设置地区标识

到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:

class Location
{
    public static function set($location)
    {
        global $context;

        $context['location'] = $location;
        // API需要在这里单独设置X-Location头
        header('X-Location: ' . $context['location']);
    }
}
获取地区标识

设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:

class Location
{
    public static function get()
    {
        global $context;

        if (!isset($context['location'])) {
            return 'china';
        }

        return $context['location'];
    }
}

支持多币种价格

商品中心

有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。

改造获取数据方法

下述的ReadBase类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()getAll()方法,所以我们只需要改造这两个方法。

class ReadBase
{
    public function getOne(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
        
        return $this->getExchangePrice($data);
    }
    
    public function getAll(array $cond, $fields)
    {
        $data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
        
        if ($data) {
            foreach ($data as &$one) {
                 $this->getExchangePrice($one);
            }
        }
        
        return $data;
    }
}
后缀匹配价格字段

由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。

protected function isExchangeField($field)
{
    $priceSuffix = array('cost', '_price');
    $black = array();
    $len = strlen($field) ;

    foreach ($priceSuffix as $suffix) {
        $lastPos = $len - strlen($suffix);
        // 非黑名单且非is_
        if (!in_array($field, $black)
            && false === strpos($field, 'is_')
            && $lastPos === strpos($field, $suffix)
        ) {
            return true;
        }
    }

    return false;
}
前缀为is_的字段一般定义为标识字段,默认为非价格字段。
计算地区价格

上述getExchangePrice()方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin后缀的人民币价格字段。

public function getExchangePrice(&$data)
{
    if (empty($data)) {
        return $data;
    }

    $originPrice = array();
    foreach ($data as $field => &$value) {
        // 是否是价格字段
        if ($this->isExchangeField($field)) {
            $originField = $field . '_origin';
            $originPrice[$originField] = $value;
            // 获取对应地区的价格
            $value = $this->getExchangePrice($value);
        }
    }
    
    $data = array_merge($originPrice, $data);

    return $data;
}

public static function getExchangePrice($price)
{
    // 获取地区标识
    $location = Location::get();
    // 汇率
    $exchangeRateConfig = \Config::$exchangeRate;
    if ($location === 'china') {
        return $price;
    } else if (isset($exchangeRateConfig[$location])) {
        $exchangeRate = $exchangeRateConfig[$location];
    } else {
        throw new \BusinessException("not found $location exchange rate");
    }
    // 向上取值并保留两位小数
    $exchangePrice = bcmul($price, $exchangeRate, 3);

    return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}

其中,getExchangePrice()方法会调用Location::get()获取地区标识,并根据汇率计算实时价格。

最终,商品中心改造后,得到的部分商品价格信息,如下:

# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10

API系统

对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location头即可。

GET /product/detail/1 HTTP/1.1

Request Headers
  X-Location: india

API 项目需在入口文件处,初始化地区标识。如下:

Location::init();

商品管理系统

对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:

Location::init();
// 地区设置为中国
Location::set('china');

总结

为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!

查看原文

赞 10 收藏 5 评论 5

天翊 发布了文章 · 2019-03-25

负载均衡算法 — 平滑加权轮询

首发于 樊浩柏科学院

负载均衡算法 — 轮询 一文中,我们就指出了加权轮询算法一个明显的缺陷。即在某些特殊的权重下,加权轮询调度会生成不均匀的实例序列,这种不平滑的负载可能会使某些实例出现瞬时高负载的现象,导致系统存在宕机的风险。为了解决这个调度缺陷,就提出了 平滑加权轮询 调度算法。

预览图

待解决的问题

为了说明平滑加权轮询调度的平滑性,使用以下 3 个特殊的权重实例来演示调度过程。

服务实例权重值
192.168.10.1:22025
192.168.10.2:22021
192.168.10.3:22021

我们已经知道通过 加权轮询 算法调度后,会生成如下不均匀的调度序列。

请求选中的实例
1192.168.10.1:2202
2192.168.10.1:2202
3192.168.10.1:2202
4192.168.10.1:2202
5192.168.10.1:2202
6192.168.10.2:2202
7192.168.10.3:2202

接下来,我们就使用平滑加权轮询算法调度上述实例,看看生成的实例序列如何?

算法描述

假设有 N 台实例 S = {S1, S2, …, Sn},配置权重 W = {W1, W2, …, Wn},有效权重 CW = {CW1, CW2, …, CWn}。每个实例 i 除了存在一个配置权重 Wi 外,还存在一个当前有效权重 CWi,且 CWi 初始化为 Wi;指示变量 currentPos 表示当前选择的实例 ID,初始化为 -1;所有实例的配置权重和为 weightSum;

那么,调度算法可以描述为:
1、初始每个实例 i 的 当前有效权重 CWi 为 配置权重 Wi,并求得配置权重和 weightSum;
2、选出 当前有效权重最大 的实例,将 当前有效权重 CWi 减去所有实例的 权重和 weightSum,且变量 currentPos 指向此位置;
3、将每个实例 i 的 当前有效权重 CWi 都加上 配置权重 Wi;
4、此时变量 currentPos 指向的实例就是需调度的实例;
5、每次调度重复上述步骤 2、3、4;

上述 3 个服务,配置权重和 weightSum 为 7,其调度过程如下:

请求选中前的当前权重currentPos选中的实例选中后的当前权重
1{5, 1, 1}0192.168.10.1:2202{-2, 1, 1}
2{3, 2, 2}0192.168.10.1:2202{-4, 2, 2}
3{1, 3, 3}1192.168.10.2:2202{1, -4, 3}
4{6, -3, 4}0192.168.10.1:2202{-1, -3, 4}
5{4, -2, 5}2192.168.10.3:2202{4, -2, -2}
6{9, -1, -1}0192.168.10.1:2202{2, -1, -1}
7{7, 0, 0}0192.168.10.1:2202{0, 0, 0}
8{5, 1, 1}0192.168.10.1:2202{-2, 1, 1}

可以看出上述调度序列分散是非常均匀的,且第 8 次调度时当前有效权重值又回到 {0, 0, 0},实例的状态同初始状态一致,所以后续可以一直重复调度操作。

此轮询调度算法思路首先被 Nginx 开发者提出,见 phusion/nginx 部分。

代码实现

这里使用 PHP 来实现,源码见 fan-haobai/load-balance 部分。

class SmoothWeightedRobin implements RobinInterface
{
    private $services = array();

    private $total;

    private $currentPos = -1;

    public function init(array $services)
    {
        foreach ($services as $ip => $weight) {
            $this->services[] = [
                'ip'      => $ip,
                'weight'  => $weight,
                'current_weight' => $weight,
            ];
        }
        $this->total = count($this->services);
    }

    public function next()
    {
        // 获取最大当前有效权重实例的位置
        $this->currentPos = $this->getMaxCurrentWeightPos();

        // 当前权重减去权重和
        $currentWeight = $this->getCurrentWeight($this->currentPos) - $this->getSumWeight();
        $this->setCurrentWeight($this->currentPos, $currentWeight);

        // 每个实例的当前有效权重加上配置权重
        $this->recoverCurrentWeight();

        return $this->services[$this->currentPos]['ip'];
    }
}

其中,getSumWeight()为所有实例的配置权重和;getCurrentWeight()setCurrentWeight()分别用于获取和设置指定实例的当前有效权重;getMaxCurrentWeightPos()求得最大当前有效权重的实例位置,实现如下:

public function getMaxCurrentWeightPos()
{
    $currentWeight = $pos = 0;
    foreach ($this->services as $index => $service) {
        if ($service['current_weight'] > $currentWeight) {
            $currentWeight = $service['current_weight'];
            $pos = $index;
        }
    }

    return $pos;
}

recoverCurrentWeight()用于调整每个实例的当前有效权重,即加上配置权重,实现如下:

public function recoverCurrentWeight()
{
    foreach ($this->services as $index => &$service) {
        $service['current_weight'] += $service['weight'];
    }
}

需要注意的是,在配置services服务列表时,同样需要指定其权重:

$services = [
    '192.168.10.1:2202' => 5,
    '192.168.10.2:2202' => 1,
    '192.168.10.3:2202' => 1,
];

数学证明

可惜的是,关于此调度算法严谨的数学证明少之又少,不过网友 tenfy 给出的 安大神 证明过程,非常值得参考和学习。

证明权重合理性

假如有 n 个结点,记第 i 个结点的权重是 $x_i$,设总权重为 $S = x_1 + x_2 + … + x_n$。选择分两步:
1、为每个节点加上它的权重值;
2、选择最大的节点减去总的权重值;

n 个节点的初始化值为 [0, 0, …, 0],数组长度为 n,值都为 0。第一轮选择的第 1 步执行后,数组的值为 $[x_1, x_2, …, x_n]$。

假设第 1 步后,最大的节点为 j,则第 j 个节点减去 S。
所以第 2 步的数组为 $[x_1, x_2, …, x_j-S, …, x_n]$。 执行完第 2 步后,数组的和为:
$x_1 + x_2 + … + x_j-S + … + x_n => x_1 + x_2 + … + x_n - S = S - S = 0$

由此可见,每轮选择第 1 步操作都是数组的总和加上 S,第 2 步总和再减去 S,所以每轮选择完后的数组总和都为 0。

假设总共执行 S 轮选择,记第 i 个结点选择 $m_i$ 次。第 i 个结点的当前权重为 $w_i$。 假设节点 j 在第 t 轮(t < S)之前,已经被选择了 $x_j$ 次,记此时第 j 个结点的当前权重为 $w_j = t \* x_j - x_j \* S = (t - S) \* x_j < 0$, 因为 t 恒小于 S,所以 $w_j < 0$。

前面假设总共执行 S 轮选择,则剩下 S-t 轮 j 都不会被选中,上面的公式 $w_j = (t - S) \* x_j + (S - t) \* x_j = 0$。 所以在剩下的选择中,$w_j$ 永远小于等于 0,由于上面已经证明任何一轮选择后,数组总和都为 0,则必定存在一个节点 k 使得 $w_k > 0$,永远不会再选中节点 j。

由此可以得出,第 i 个结点最多被选中 $x_i$ 次,即 $m_i <= x_i$。
因为 $S = m_1 + m_2 + … + m_n$ 且 $S = x_1 + x_2 + … + x_n$。 所以,可以得出 $m_i == x_i$。

证明平滑性

证明平滑性,只要证明不要一直都是连续选择那一个节点即可。

跟上面一样,假设总权重为 S,假如某个节点 i 连续选择了 t($t < x_i$) 次,只要存在下一次选择的不是节点 i,即可证明是平滑的。

假设 $t = x_i - 1$,此时第 i 个结点的当前权重为 $w_i = t \* x_i - t \* S = (x_i - 1) \* x_i - (x_i - 1) \* S$。证明下一轮的第 1 步执行完的值 $w_i + x_i$ 不是最大的即可。

$w_i + x_i => (x_i - 1) \* x_i - (x_i - 1) \* S + x_i =>$
$x_i^2 - x_i \* S + S => (x_i - 1) \* (x_i - S) + x_i$

因为 $x_i$ 恒小于 S,所以 $x_i - S <= -1$。 所以上面:
$(x_i - 1) \* (x_i - S) + x_i <= (x_i - 1) \* -1 + x_i = -x_i + 1 + x_i = 1$

所以第 t 轮后,再执行完第 1 步的值 $w_i + x_i <= 1$。
如果这 t 轮刚好是最开始的 t 轮,则必定存在另一个结点 j 的值为 $x_j \* t$,所以有 $w_i + x_i <= 1 < 1 \* t < x_j \* t$。所以下一轮肯定不会选中 i。

总结

尽管,平滑加权轮询算法改善了加权轮询算法调度的缺陷,即调度序列分散的不均匀,避免了实例负载突然加重的可能,但是仍然不能动态感知每个实例的负载。

若由于实例权重配置不合理,或者一些其他原因加重系统负载的情况,平滑加权轮询都无法实现每个实例的负载均衡,这时就需要 有状态 的调度算法来完成。

相关文章 »

查看原文

赞 43 收藏 29 评论 2

天翊 发布了文章 · 2019-03-25

用PHP玩转进程之二 — 多进程PHPServer

首发于 樊浩柏科学院

经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。

预览图

PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。

总流程

该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:

控制流程

其中,主要涉及 3 个对象,分别为 入口脚本Master 进程Worker 进程。它们扮演的角色如下:

  • 入口脚本:主要实现 PHPServer 的启动、停止、重载功能,即触发 Master 进程startstopreload流程;
  • Master 进程:负责创建并监控 Worker 进程。在启动阶段,会注册信号处理器,然后创建 Worker;在运行阶段,会持续监控 Worker 进程健康状态,并接受来自入口脚本的控制信号并作出响应;在停止阶段,会停止掉所有 Worker 进程;
  • Worker 进程:负责执行业务逻辑。在被 Master 进程创建后,就处于持续运行阶段,会监听到来自 Master 进程的信号,以实现自我的停止;

整个过程,又包括 4 个流程

  • 流程 ① :以守护态启动 PHPServer 时的主要流程。入口脚本会进行 daemonize,也就是实现进程的守护态,此时会fork出一个 Master 进程;Master 进程先经过 保存 PID注册信号处理器 操作,然后 创建 Workerfork出多个 Worker 进程;
  • 流程 ② :为 Master 进程持续监控的流程,过程中会捕获入口脚本发送来的信号。主要监控 Worker 进程健康状态,当 Worker 进程异常退出时,会尝试创建新的 Worker 进程以维持 Worker 进程数量;
  • 流程 ③ :为 Worker 进程持续运行的流程,过程中会捕获 Master 进程发送来的信号。流程 ① 中 Worker 进程被创建后,就会持续执行业务逻辑,并阻塞于此;
  • 流程 ④ :停止 PHPServer 的主要流程。入口脚本首先会向 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号后,会向所有的 Worker 进程转发 SIGINT 信号(通知所有的 Worker 进程终止),等待所有 Worker 进程终止退出;
在流程 ② 中,Worker 进程被 Master 进程fork出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。

代码实现

启动

启动流程见 流程 ①,主要包括 守护进程保存 PID注册信号处理器创建多进程 Worker 这 4 部分。

守护进程

首先,在入口脚本中fork一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:

守护进程流程

这里使用了 2 次fork,所以最后fork的一个子进程才是 Master 进程,其实一次fork也是可以的。代码如下:

protected static function daemonize()
{
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif ($pid > 0) {
        exit(0);
    }

    // 将当前进程提升为会话leader
    if (-1 === posix_setsid()) {
        exit("process setsid fail\n");
    }

    // 再次fork以避免SVR4这种系统终端再一次获取到进程控制
    $pid = pcntl_fork();
    if (-1 === $pid) {
        exit("process fork fail\n");
    } elseif (0 !== $pid) {
        exit(0);
    }
}
通常在启动时增加-d参数,表示进程将运行于守护态模式。

当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:

protected static function resetStdFd()
{
    global $STDERR, $STDOUT;
    //重定向标准输出和错误输出
    @fclose(STDOUT);
    fclose(STDERR);
    $STDOUT = fopen(static::$stdoutFile, 'a');
    $STDERR = fopen(static::$stdoutFile, 'a');
}

保存PID

为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid文件。代码如下:

protected static function saveMasterPid()
{
    // 保存pid以实现重载和停止
    static::$_masterPid = posix_getpid();
    if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
        exit("can not save pid to" . static::$pidFile . "\n");
    }

    echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}

注册信号处理器

因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。

这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:

protected static function installSignal()
{
    pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);

    pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
    pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);

    // 忽略信号
    pcntl_signal(SIGUSR2, SIG_IGN, false);
    pcntl_signal(SIGHUP,  SIG_IGN, false);
}

protected static function signalHandler($signal)
{
    switch($signal) {
        case SIGINT:
        case SIGTERM:
            static::stop();
            break;
        case SIGQUIT:
        case SIGUSR1:
            static::reload();
            break;
        default: break;
    }
}

其中,SIGINT 和 SIGTERM 信号会触发stop操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill掉。

创建多进程Worker

Master 进程通过fork系统调用,就能创建多个 Worker 进程。实现代码,如下:

protected static function forkOneWorker()
{
    $pid = pcntl_fork();

    // 父进程
    if ($pid > 0) {
        static::$_workers[] = $pid;
    } else if ($pid === 0) { // 子进程
        static::setProcessTitle('PHPServer: worker');

        // 子进程会阻塞在这里
        static::run();

        // 子进程退出
        exit(0);
    } else {
        throw new \Exception("fork one worker fail");
    }
}

protected static function forkWorkers()
{
    while(count(static::$_workers) < static::$workerCount) {
        static::forkOneWorker();
    }
}

Worker进程的持续运行

Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:

Worker进程的持续运行

对于 Worker 进程,run()方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while来模拟调度,实际中应该使用事件(Select 等)驱动。

public static function run()
{
    // 模拟调度,实际用event实现
    while (1) {
        // 捕获信号
        pcntl_signal_dispatch();

        call_user_func(function() {
            // do something
            usleep(200);
        });
    }
}

其中,pcntl_signal_dispatch()会在每次调度过程中,捕获信号并执行注册的信号处理器。

Master进程的持续监控

调度流程

Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:

Master持续监控流程

对于 Master 进程的调度,这里也使用了while,但是引入了wait的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。

protected static function monitor()
{
    while (1) {
        // 这两处捕获触发信号,很重要
        pcntl_signal_dispatch();
        // 挂起当前进程的执行直到一个子进程退出或接收到一个信号
        $status = 0;
        $pid = pcntl_wait($status, WUNTRACED);
        pcntl_signal_dispatch();

        if ($pid >= 0) {
            // worker健康检查
            static::checkWorkerAlive();
        }
        // 其他你想监控的
    }
}
第两次的pcntl_signal_dispatch()捕获信号,是由于wait挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。

其中,PHPServer 的 停止重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。

Worker进程的健康检查

由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:

健康检查流程

代码实现,如下:

protected static function checkWorkerAlive()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $index => $pid) {
        if (!static::isAlive($pid)) {
            unset(static::$_workers[$index]);
        }
    }

    static::forkWorkers();
}

停止

Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:

停止流程

入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()方法。如下:

protected static function stop()
{
    // 主进程给所有子进程发送退出信号
    if (static::$_masterPid === posix_getpid()) {
        static::stopAllWorkers();

        if (is_file(static::$pidFile)) {
            @unlink(static::$pidFile);
        }
        exit(0);
    } else { // 子进程退出

        // 退出前可以做一些事
        exit(0);
    }
}

若是 Master 进程执行该方法,会先调用stopAllWorkers()方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;

由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。

protected static function stopAllWorkers()
{
    $allWorkerPid = static::getAllWorkerPid();
    foreach ($allWorkerPid as $workerPid) {
        posix_kill($workerPid, SIGINT);
    }

    // 子进程退出异常,强制kill
    usleep(1000);
    if (static::isAlive($allWorkerPid)) {
        foreach ($allWorkerPid as $workerPid) {
            static::forceKill($workerPid);
        }
    }

    // 清空worker实例
    static::$_workers = array();
}

重载

代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:

重载流程

整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()方法,reload()方法调用stopAllWorkers()方法。如下:

protected static function reload()
{
    // 停止所有worker即可,master会自动fork新worker
    static::stopAllWorkers();
}
reload()方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。

你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。

运行效果

到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:

$ php server.php 
Usage: Commands [mode] 

Commands:
start        Start worker.
stop        Stop worker.
reload        Reload codes.

Options:
-d        to start in DAEMON mode.

Use "--help" for more information about a command.

首先,我们启动它:

$ php server.php start -d
PHPServer start      [OK]

其次,查看进程树,如下:

$ pstree -p
init(1)-+-init(3)---bash(4)
        |-php(1286)-+-php(1287)
                    `-php(1288)

最后,我们把它停止:

$ php server.php stop
PHPServer stopping ...
PHPServer stop success

现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)

总结

我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:

首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。

相关文章 »

查看原文

赞 34 收藏 25 评论 10

天翊 评论了文章 · 2019-03-25

自如2018新年活动系统 — 抢红包

首发于 樊浩柏科学院

2017 年是自如快速增长的一年,自如客突破 100 万,管理资产达到 50 万间,在年底成功获得了 40 亿 A 轮融资,而这些都要感谢广大的自如客,公司为了回馈自如客,在六周年活动时就发放了 6000 万租住基金,当然年底散币活动也够疯狂。

2018口碑年

活动规模

既然公司对自如客这么阔,那对我们员工也得够意思,所以年底我们共准备了 3 个活动。

1、针对 自如客 的服务费减免活动;
2、针对 自如客 的 1000 万现金礼包;
3、25 万的 员工 红包活动;

员工红包活动

散币活动 2 和 3 是通过微信红包形式进行,想散币就散吧,可微信告诉我们,想散币还得交税(>﹏<)。员工红包来说,25 万要交掉 10 多万税,此时心疼我的钱。好了,下面开始说点正事。

技术方案

说到红包,我们肯定会想到红包拆分和抢红包两个场景。红包拆分是指将指定金额拆分为指定数目红包的过程,即是用来确定每个红包的金额数;而抢红包就是典型的高并发场景,需要避免红包超发的情况。

红包拆分

可选的方案

拆分方式

1、实时拆分
实时拆分,指的是在抢红包时实时计算每个红包的金额,以实现红包的拆分过程,对系统性能和拆分算法要求较高,例如拆分过程要一直保证后续待拆分红包的金额不能为空,不容易做到拆分红包的金额服从正态分布规律。

2、预先生成
预先生成,指的是在红包开抢之前已经完成了红包的拆分,抢红包时只是依次取出拆分好的红包金额,对拆分算法要求较低,可以拆分出随机性很好的红包金额,通常需要结合队列使用。

拆分算法

我并没有找到业界的通用算法,但红包拆分算法应该是拆分金额要看起来随机,最好能够服从正态分布,可以参考 微信@lcode 提供的红包拆分算法。

微信拆分算法的优点是算法较简单,拆分效率高,同时,由于该算法天然的特性,可以保证后续红包金额一定不为空,特别适合实时拆分场景,但缺点是会导致大额红包较大概率地在拆分的最后出现。 @lcode 拆分算法的优点是拆分金额基本符合正态分布,适合随机性要求较高的拆分场景。

我们的方案

我们这次的业务对红包金额的随机性要求不高,但是对系统可靠性要求较高,所以我们选用了预算生成方式,使用 二倍均值法 的红包拆分算法,作为我们的红包拆分方案。

采用预算生成方式,我们预先生成红包并放入 Redis 的 List 中,当抢红包时只是 Pop List 即可,具体实现将在 抢红包 部分介绍。

拆分算法可以描述为:假设剩余拆分金额为 M,剩余待拆分红包个数为 N,红包最小金额为 1 元,红包最小单位为元,那么定义当前红包的金额为:

$$m = rand(1, floor(M/N*2))$$

其中,floor 表示向下取整,rand(min, max) 表示从 [min, max] 区间随机一个值。$M/N \ast 2$ 表示剩余待拆分金额平均金额的 2 倍,因为 N >= 2,所以 $M/N \ast 2 <= M$,表示一定能保证后续红包能拆分到金额。

代码实现为:

for ($i = 0; $i < $N - 1; $i++) {
    $max = (int)floor($M / ($N - $i)) * 2;
    $m[$i] = $max ? mt_rand(1, $max) : 0;
    $M -= $m[$i];
}

$m[] = $M;

值得一提的是,我们为了保证红包金额差异尽量小,先将总金额平均拆分成 N+1 份,将第 N+1 份红包按照上述的红包拆分算法拆分成 N 份,这 N 份红包加上之前的平均金额才作为最终的红包金额。

抢红包

可选的方案

限流

1、前端限流
前端限制用户在 n 秒之内只能提交一次请求,虽然这种方式只能挡住小白,不过这是 99% 的用户哟,所以也必须得做。

2、后端限流
常用的后端限流方法有 漏桶算法令牌桶算法漏桶算法 主要目的是控制请求数据注入的速率,如果此时漏桶溢出,后续的请求数据会被丢弃。而 令牌桶算法 是以一个恒定的速度往桶里放入令牌,而如果请求数据需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌时,这些请求才被丢弃,令牌桶算法的一个好处是可以方便地改变应用接受请求的速率。

防超发

1、库存加锁
可以通过加锁的方式解决资源抢占问题,但是加锁会增加系统开销,大流量下更容易拖垮系统,不过可以尝试一下基于版本号的乐观锁。

2、通过高速队列串行化请求
之所会出现超发问题,是因为并发时会出现多个进程同时获取同一资源的现象,如果使用高速队列将并行请求串行化,那么问题就不存在了。高速队列可以使用 Redis 缓存服务器来实现,当然光使用队列还不够,必要保证整个流程调用链要短、要快,否则队列会积压严重,甚至会拖垮整个服务。

我们的方案

在限流方面,由于我们预估的请求量还在系统承受范围,所以没有考虑引入后端限流方案。我们的抢红包系统流程图如下:

抢红包流程图

我们将抢红包拆分为 红包占有(流程①,同步) 和 红包发放 (流程②,异步)这两个过程,首先采用高速队列串行化请求,红包发放逻辑由一组 Worker 异步去完成。高速队列只是完成红包占有的过程,实现库存的控制,Worker 则处理耗时较长的红包发放过程。

当然,在实际应用中,红包占用过程还需要加上一些前置规则校验,比如用户是否已经领取过,领取次数是否已经达到上限等?红包占有流程图如下:

红包占有流程图

其中,red::list为 List 结构,存放预先生成的红包金额(流程①中的红包队列);red::task 也为 List 结构,红包异步发放队列(流程②中的任务队列);red::draw为 Hash 结构,存放红包领取记录,field为用户的 openid,value为序列化的红包信息;red::draw_count:u:openid为 k-v 结构,用户领取红包计数器。

下面,我将以以下 3 个问题为中心,来说说我们设计出的抢红包系统。

1、怎么保证不超发
我们需要关注的是红包占有过程,从红包占有流程图可看出,这个过程是很多 Key 操作的组合,那怎么保证原子性?可以使用 Redis 事务,但我们选用了 Lua 方案,一方面是因为首先要保证性能,而 Lua 脚本嵌入 Redis 执行不存在性能瓶颈,另一方面 Lua 脚本执行时本身就是原子性的,满足需求。

红包占有的 Lua 脚本实现如下:

-- 领取人的openid为xxxxxxxxxxx
local openid = 'xxxxxxxxxxx'
local isDraw = redis.call('HEXISTS', 'red::draw', openid)
-- 已经领取
if isDraw ~= 0 then
    return true
end
-- 领取太多次了
local times = redis.call('INCR', 'red::draw_count:u:'..openid)
if times and tonumber(times) > 9 then
    return 0
end

local number = redis.call('RPOP', 'red::list')
-- 没有红包
if not number then
    return {}
end
-- 领取人昵称为Fhb,头像为https://xxxxxxx
local red = {money=number,name='Fhb',pic='https://xxxxxxx'}
-- 领取记录
redis.call('HSET', 'red::draw', openid, cjson.encode(red))
-- 处理队列
red['openid'] = openid
redis.call('RPUSH', 'red::task', cjson.encode(red))

return true
需要注意 Lua 脚本执行过程并不是事务的,脚本中的操作命令在执行时是有先后顺序的,当某个操作执行失败时不会回滚已经执行成功的操作,它的原子性是通过单线程模型实现。

2、怎么提高系统响应速度
如红包占有流程图所示,当用户发起抢红包请求时,若有红包则直接完成红包占有操作,同步告知用户是否抢到红包,这个过程要求快速响应。

但由于微信红包支付属于第三方调用,若抢到红包后同步调用红包支付,系统调用链又长又慢,所以红包占有和红包发放异步拆分是必然。拆分后,红包占有只需操作 Redis,响应性能已不是问题。

3、怎么提高系统处理能力
从上述分析可知,目前系统的压力都会集中在红包发放这个环节,因为用户抢到红包时,我们只是同步告知用户已抢到红包,然后异步去发放红包,因此用户并不会立即收到红包(受红包发放 Worker 处理能力和微信服务压力制约)。若红包发放的 Worker 处理能力较弱,那么红包发放的延迟就会很高,体验较差。

如抢红包流程图中所示,我们采用一组 Worker 去消费任务队列,并调用红包支付 API,以及数据持久化操作(后续对账)。尽管红包发放调用链又长又慢,但是注意到这些 Worker 是 无状态 的,所以可以通过增加 Worker 数量,以横向扩展提高系统的处理能力。

4、怎么保证数据一致性
其实,红包发放延时我们可以做到用户无感知,但是若红包发放(流程②)失败了,已经告知用户抢到红包,但是却木有发,估计他杀人的心都有了。根据 CAP 原理,我们无法同时满足数据一致性、数据可用性、分区耐受性,通常只需做到数据最终一致性。

为了达到数据最终一致性,我们就引入了重试机制,生成一个全局唯一的外部订单号,当某单红包发放失败,就会放回任务队列,使得有机会进行发放重试,当然这一切都需要 API 做幂等处理。

Worker可靠性保障

这里必须将 Worker 可靠性单独说,因为它实在太重要了。Worker 的实现如下:

$maxTask = 1000;
$sleepTime = 1000;

while (true) {
    while ($red = RedLogic::getTask()) {
        RedLogic::doTask($red);
        //处理多少个任务主动退出
        $maxTask--;
        if ($maxTask < 0) {
            return EXIT_CODE_NORMAL;
        }
    }
    //等待任务
    usleep($sleepTime);
}
这里使用 LPOP 命令获取任务,所以使用了 while 结构,并且无任务时需要等待,可以用阻塞命令 BLPOP 来改进。

由于 Worker 需要常驻内存运行,难免会出现异常退出的情况(也有主动退出), 所以需要保持 Worker 一直处于运行状态。我们使用进程管理工具 Supervisor 来监控 Worker 的运行状态,同时管理 Worker 的数量,当任务队列出现堆积时,增加 Worker 数量即可。Supervisor 的监控后台如下:

Supervisor进程管理

员工系统号散列

公司员工都用唯一一个系统号 emp_code(自增字段)标识,登录成功后返回 emp_code,系统后续所有交互流程都基于 emp_code,分享出去的红包也会携带 emp_code,为了保护员工敏感信息和防止恶意碰撞攻击,我们不能直接将 emp_code 暴露给前端,需要借助一个 token(无规律)的中间者来完成交互。

可选的方案

1、储存映射关系,时时查询
预先生成一个随机串 token,然后跟 emp_code 绑定,每次请求都根据 token 时时查询 emp_code。优点是可以定期更新,相对安全,缺点是性能不高。

2、建立映射关系函数,实时计算
建立一个映射关系函数,如 hash 散列或者加密解密算法,能够根据 emp_code 生成一个无规律的字符串 token,并且要能够根据 token 反映射出 emp_code。优点是需要存储介质存储关系,性能较高,缺点是很难做到定期失效并更新。

我们的方案

由于我们的红包活动只进行几天,所以我们选用了方案 2。对 emp_code 做了 hashids 散列算法,暴露的只是一串无规律的散列字符串。

hashids 是一个开源且轻量的唯一 id 生成器,支持 Java、PHP、C/C++、Python 等主流语言,PHP 想使用 hashids,只需composer require hashids/hashids命令安装即可。

然后,如下方式使用:

use Hashids\Hashids;

$hashids = new Hashids('salt', 6, 'abcdefghijk1234567890');

$hashids->encode(11002);    //994k2kk
$hashids->decode('994k2kk');  //[11002]

需要说明的是,其中salt是非常重要的散列加密盐串,6表示散列值最小长度,abcde...7890为散列字典,太长影响效率,太短不安全。由于默认的散列字典比较长,decode 效率并不高,所以这里移除了大写字母部分。

语音点赞

语音点赞就是用户以语音的形式助力好友,核心技术其实是语音识别,而我们一般都会使用第三方语音识别服务。

可选的方案

1、客户端调用第三方服务识别
客户端直接调用第三方语音识别服务,如微信提供了 JS-SDK 的语音识别 API ,返回识别的语音文本的信息,并且已经经过语义化。优点是识别较快,且不许关注语音存储问题,缺点是不安全,识别结果提交到服务端之前可能被恶意篡改。

2、服务端调用第三方服务识别
先将录制的语音上传至存储平台,然后服务端调用第三方语音识别服务,第三方语音识别服务去获取语音信息并识别,返回识别的语音文本的信息。优点是识别结果较安全,缺点是系统交互较多,识别效率不高。

我们的方案

我们业务场景的特殊性,存在用户可助力次数的限制,所以无需担心恶意刷赞的情况,因此可以选用方案 1,语音识别的交互流程如下:

语音识别交互图

此时,整个语音识别流程如下:

语音点赞流程图

当然中国文字博大精深,语音识别的文本在匹配时,需要考虑容错处理,可以将文本转化为拼音,然后匹配拼音,或者设置一个匹配百分比,达到匹配值则认为语音口令正确。

需要注意的是,微信只提供 3 天的语音存储服务,若语音播放周期较长,则要考虑实现语音的存储。

其他

红包发放测试

我们使用了线上公账号进行红包发放测试,为了让线上公众号能够授权到测试环境,在线上的微信授权回调地址新增一个参数,将带有to=feature参数的请求引流到测试环境,其他线上流量还是保持不变,匹配规则如下:

# Nginx不支持if嵌套,所以就这样变通实现
set $auth_redirect "";
if ($args ~* "r=auth/redirect") {
    set $auth_redirect "prod";
}
if ($args ~* "to=feature") {
    set $auth_redirect "feature";
}
if ($auth_redirect ~ "feature") {
    rewrite ^(.*)$ http://wx.t.ziroom.com/index.php last;
}
if ($auth_redirect ~ "prod") {
    rewrite ^(.*)$ http://wx.ziroom.com/index.php last;
}

CDN缓存

由于本次活动力度较大,预估流量会比以往增加不少(不能再出现机房带宽打满的情况了,不然 >﹏<),静态页面占流量的很大一部分,所以静态页面在发布时都会放置一份在 CDN 上,这样回源的流量就很小了。

灾备方案

尽管做了很多准备,还是无法确保万无一失,我们在每个关键节点都增加了开关,一点出现异常,通过配置中心可以人工介入做降级处理。

查看原文

天翊 评论了文章 · 2019-03-25

在Docker中使用Xdebug

首发于 樊浩柏科学院

我们经常会使用 PhpStorm 结合 Xdebug 进行代码断点调试,这样能追踪程序执行流程,方便调试代码和发现潜在问题。博主将开发环境迁入 Docker 后,Xdebug 调试遇到了些问题,在这里整理出 Docker 中使用 Xdebug 的方法和注意事项。

说明:开发和调试环境为本地 Docker 中的 LNMP,IDE 环境为本地 Win10 下的 PhpStorm。这种情况下 Xdebug 属于远程调试模式,IDE 和本地 IP 为 192.168.1.101,Docker 中 LNMP 容器 IP 为 172.17.0.2。

问题描述

在 Docker 中安装并配置完 Xdebug ,并设置 PhpStorm 中对应的 Debug 参数后,但是 Debug 并不能正常工作。

此时,php.ini中 Xdebug 配置如下:

xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_connect_back = on
xdebug.remote_port = 9001        //PhpStorm监听本地9001端口
xdebug.remote_handler = dbgp
xdebug.remote_log = /home/tmp/xdebug.log

开始收集问题详细表述。首先,观察到 PhpStorm 的 Debug 控制台出现状态:

Waiting for incoming connection with ide key ***

然后查看 Xdebug 调试日志xdebug.log,存在如下错误:

I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(

分析问题

查看这些问题表述,基本上可以定位为 Xdebug 和 PhpStorm 之间的 网络通信 问题,接下来一步步定位具体问题。

排查本地9001端口

Win 下执行 netstat -ant命令:

协议    本地地址       外部地址        状态           卸载状态
TCP  0.0.0.0:9001   0.0.0.0:0     LISTENING       InHost

端口 9001 监听正常,然后在容器中使用 telnet 尝试同本地 9001 端口建立 TCP 连接:

$ telnet 192.168.1.101 9001

Trying 192.168.1.101...
Connected to 192.168.1.101.
Escape character is '^]'.

说明容器同本地 9001 建立 TCP 连接正常,但是 Xdebug 为什么会报连接失败呢?此时,至少可以排除不会是因为 PhpStorm 端配置的问题。

排查Xdebug问题

回过头来看看 Xdebug 的错误日志,注意观察到失败时的连接信息:

I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(

此时,在容器中使用 tcpdump 截获的数据包如下:

$ tcpdump -nnA port 9001
# 尝试建立连接,但是失败了
12:20:34.318080 IP 172.17.0.2.40720 > 172.17.0.1.9001: Flags [S], seq 2365657644, win 29200, options [mss 1460,sackOK,TS val 833443 ecr 0,nop,wscale 7], length 0
E..<..@.@.=...........#)...,......r.XT.........
............
12:20:34.318123 IP 172.17.0.1.9001 > 172.17.0.2.40720: Flags [R.], seq 0, ack 2365657645, win 0, length 0
E..(.]@.@..M........#).........-P....B..

可以确定的是, Xdebug 是向 IP 为 172.17.0.1 且端口为 9001 的目标机器尝试建立 TCP 连接,而非正确的 192.168.1.101 本地 IP。到底发生了什么?

首先,为了搞懂 Xdebug 和 PhpStorm 的交互过程,查了 官方手册 得知,Xdebug 工作在远程调试模式时,有两种工作方式:

1、IDE 所在机器 IP 确定/单人开发

图中,由于 IDE 的 IP 和监听端口都已知,所以 Xdebug 端可以很明确知道 DBGP 交互时 IDE 目标机器信息,所以 Xdebug 只需配置 xdebug.remote_hostxdebug.remote_port 即可。

2、IDE 所在机器 IP 未知/团队开发

由于 IDE 的 IP 未知或者 IDE 存在多个 ,那么 Xdebug 无法提前预知 DBGP 交互时的目标 IP,所以不能直接配置 xdebug.remote_host 项(remote_port 项可以确定),必须设置 xdebug.remote_connect_back 为 On 标识(会忽略 xdebug.remote_host 项)。这时,Xdebug 会优先获取 HTTP_X_FORWARDED_FORREMOTE_ADDR 中的一个值作为通信时 IDE 端的目标 IP,通过Xdebug.log记录可以确认。

I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found

接下来,可以知道 Xdebug 端是工作在远程调试的模式 2 上,Xdebug 会通过 HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR 项获取目标机 IP。Docker 启动容器时已经做了 80 端口映射,忽略宿主机同 Docker 容器复杂的数据包转发规则,先截取容器 80 端口数据包:

$ tcpdump -nnA port 80
# 请求信息
13:30:07.017770 IP 172.17.0.1.33976 > 172.17.0.2.80: Flags [P.], seq 1:208, ack 1, win 229, options [nop,nop,TS val 1250713 ecr 1250713], length 207
E....=@.@..............P..    .+.......Y......
........GET /v2/room/list.json HTTP/1.1
Accept: */*
Cache-Control: no-cache
Host: localhost
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_152-release)
Accept-Encoding: gzip,deflate

可以看出,数据包的源地址为 172.17.0.1,并非真正的源地址 192.168.1.101,HTTP 请求头中也无 HTTP_X_FORWARDED_FOR 项。

说明:172.17.0.1 实际为 Docker 创建的虚拟网桥 docker0 的地址 ,也是所有容器的默认网关。Docker 网络通信方式默认为 Bridge 模式,通信时宿主机会对数据包进行 SNAT 转换,进而源地址变为 docker0,那么,怎么在 Docker 里获取客户端真正 IP 呢?

定位根源

最后,可以确定由于 HTTP_X_FORWARDED_FOR 未定义,因此 Xdebug 会取 REMOTE_ADDR 为 IDE 的 IP,同时由于 Docker 特殊的网络转发规则,导致 REMOTE_ADDR 变更为网关 IP,所以 Xdebug 同 PhpStorm 进行 DBGP 交互会失败。

解决问题

由于 Docker 容器里获取真正客户端 IP 比较复杂,这里使用 Xdebug 的 远程模式 1 明确 IDE 端 IP 来规避源 IP 被修改的情况,最终解决 Xdebug 调试问题。

模式 1 的 Xdebug 主要配置为:

//并没有xdebug.remote_connect_back项
xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_host = 192.168.1.101
xdebug.remote_port = 9001
xdebug.remote_handler = dbgp

重启 php-fpm,使用php --ri xdebug确定无误,使用 PhpStorm 重新进行调试。

再次在容器中 tcpdump 抓取 9001 端口数据包:

# 连接的源地址已经正确
14:05:27.379783 IP 172.17.0.2.44668 > 192.168.1.101.9001: Flags [S], seq 3444466556, win 29200, options [mss 1460,sackOK,TS val 1462749 ecr 0,nop,wscale 7], length 0
E..<2.@.@..........e.|#).Nc|......r.nO.........
..Q.........

再次使用 PhpStorm 的 REST Client 断点调试 API 时, Debug 控制台如下:

所以,使用 Xdebug 进行远程调试时,需要选择合适的调试模式,在 Docker 下建议使用远程模式 1。

其他注意事项

  • Xdebug 版本和 PHP 版本一致

并不是每个 Xdebug 版本都适配 PHP 每个版本,可以直接使用 官方工具,选择合适的 Xdebug 版本。

  • 本地文件和远端文件映射关系

如上图,在使用 PhpStorm 时进行远程调试时,需要配置本地文件和远端文件的目录映射关系,这样 IDE 才能根据 Xdebug 传递的当前执行文件路径与本地文件做匹配,实现断点调试和单步调试等。

查看原文

天翊 评论了文章 · 2019-03-25

在Docker中使用Xdebug

首发于 樊浩柏科学院

我们经常会使用 PhpStorm 结合 Xdebug 进行代码断点调试,这样能追踪程序执行流程,方便调试代码和发现潜在问题。博主将开发环境迁入 Docker 后,Xdebug 调试遇到了些问题,在这里整理出 Docker 中使用 Xdebug 的方法和注意事项。

说明:开发和调试环境为本地 Docker 中的 LNMP,IDE 环境为本地 Win10 下的 PhpStorm。这种情况下 Xdebug 属于远程调试模式,IDE 和本地 IP 为 192.168.1.101,Docker 中 LNMP 容器 IP 为 172.17.0.2。

问题描述

在 Docker 中安装并配置完 Xdebug ,并设置 PhpStorm 中对应的 Debug 参数后,但是 Debug 并不能正常工作。

此时,php.ini中 Xdebug 配置如下:

xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_connect_back = on
xdebug.remote_port = 9001        //PhpStorm监听本地9001端口
xdebug.remote_handler = dbgp
xdebug.remote_log = /home/tmp/xdebug.log

开始收集问题详细表述。首先,观察到 PhpStorm 的 Debug 控制台出现状态:

Waiting for incoming connection with ide key ***

然后查看 Xdebug 调试日志xdebug.log,存在如下错误:

I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(

分析问题

查看这些问题表述,基本上可以定位为 Xdebug 和 PhpStorm 之间的 网络通信 问题,接下来一步步定位具体问题。

排查本地9001端口

Win 下执行 netstat -ant命令:

协议    本地地址       外部地址        状态           卸载状态
TCP  0.0.0.0:9001   0.0.0.0:0     LISTENING       InHost

端口 9001 监听正常,然后在容器中使用 telnet 尝试同本地 9001 端口建立 TCP 连接:

$ telnet 192.168.1.101 9001

Trying 192.168.1.101...
Connected to 192.168.1.101.
Escape character is '^]'.

说明容器同本地 9001 建立 TCP 连接正常,但是 Xdebug 为什么会报连接失败呢?此时,至少可以排除不会是因为 PhpStorm 端配置的问题。

排查Xdebug问题

回过头来看看 Xdebug 的错误日志,注意观察到失败时的连接信息:

I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(

此时,在容器中使用 tcpdump 截获的数据包如下:

$ tcpdump -nnA port 9001
# 尝试建立连接,但是失败了
12:20:34.318080 IP 172.17.0.2.40720 > 172.17.0.1.9001: Flags [S], seq 2365657644, win 29200, options [mss 1460,sackOK,TS val 833443 ecr 0,nop,wscale 7], length 0
E..<..@.@.=...........#)...,......r.XT.........
............
12:20:34.318123 IP 172.17.0.1.9001 > 172.17.0.2.40720: Flags [R.], seq 0, ack 2365657645, win 0, length 0
E..(.]@.@..M........#).........-P....B..

可以确定的是, Xdebug 是向 IP 为 172.17.0.1 且端口为 9001 的目标机器尝试建立 TCP 连接,而非正确的 192.168.1.101 本地 IP。到底发生了什么?

首先,为了搞懂 Xdebug 和 PhpStorm 的交互过程,查了 官方手册 得知,Xdebug 工作在远程调试模式时,有两种工作方式:

1、IDE 所在机器 IP 确定/单人开发

图中,由于 IDE 的 IP 和监听端口都已知,所以 Xdebug 端可以很明确知道 DBGP 交互时 IDE 目标机器信息,所以 Xdebug 只需配置 xdebug.remote_hostxdebug.remote_port 即可。

2、IDE 所在机器 IP 未知/团队开发

由于 IDE 的 IP 未知或者 IDE 存在多个 ,那么 Xdebug 无法提前预知 DBGP 交互时的目标 IP,所以不能直接配置 xdebug.remote_host 项(remote_port 项可以确定),必须设置 xdebug.remote_connect_back 为 On 标识(会忽略 xdebug.remote_host 项)。这时,Xdebug 会优先获取 HTTP_X_FORWARDED_FORREMOTE_ADDR 中的一个值作为通信时 IDE 端的目标 IP,通过Xdebug.log记录可以确认。

I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found

接下来,可以知道 Xdebug 端是工作在远程调试的模式 2 上,Xdebug 会通过 HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR 项获取目标机 IP。Docker 启动容器时已经做了 80 端口映射,忽略宿主机同 Docker 容器复杂的数据包转发规则,先截取容器 80 端口数据包:

$ tcpdump -nnA port 80
# 请求信息
13:30:07.017770 IP 172.17.0.1.33976 > 172.17.0.2.80: Flags [P.], seq 1:208, ack 1, win 229, options [nop,nop,TS val 1250713 ecr 1250713], length 207
E....=@.@..............P..    .+.......Y......
........GET /v2/room/list.json HTTP/1.1
Accept: */*
Cache-Control: no-cache
Host: localhost
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_152-release)
Accept-Encoding: gzip,deflate

可以看出,数据包的源地址为 172.17.0.1,并非真正的源地址 192.168.1.101,HTTP 请求头中也无 HTTP_X_FORWARDED_FOR 项。

说明:172.17.0.1 实际为 Docker 创建的虚拟网桥 docker0 的地址 ,也是所有容器的默认网关。Docker 网络通信方式默认为 Bridge 模式,通信时宿主机会对数据包进行 SNAT 转换,进而源地址变为 docker0,那么,怎么在 Docker 里获取客户端真正 IP 呢?

定位根源

最后,可以确定由于 HTTP_X_FORWARDED_FOR 未定义,因此 Xdebug 会取 REMOTE_ADDR 为 IDE 的 IP,同时由于 Docker 特殊的网络转发规则,导致 REMOTE_ADDR 变更为网关 IP,所以 Xdebug 同 PhpStorm 进行 DBGP 交互会失败。

解决问题

由于 Docker 容器里获取真正客户端 IP 比较复杂,这里使用 Xdebug 的 远程模式 1 明确 IDE 端 IP 来规避源 IP 被修改的情况,最终解决 Xdebug 调试问题。

模式 1 的 Xdebug 主要配置为:

//并没有xdebug.remote_connect_back项
xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_host = 192.168.1.101
xdebug.remote_port = 9001
xdebug.remote_handler = dbgp

重启 php-fpm,使用php --ri xdebug确定无误,使用 PhpStorm 重新进行调试。

再次在容器中 tcpdump 抓取 9001 端口数据包:

# 连接的源地址已经正确
14:05:27.379783 IP 172.17.0.2.44668 > 192.168.1.101.9001: Flags [S], seq 3444466556, win 29200, options [mss 1460,sackOK,TS val 1462749 ecr 0,nop,wscale 7], length 0
E..<2.@.@..........e.|#).Nc|......r.nO.........
..Q.........

再次使用 PhpStorm 的 REST Client 断点调试 API 时, Debug 控制台如下:

所以,使用 Xdebug 进行远程调试时,需要选择合适的调试模式,在 Docker 下建议使用远程模式 1。

其他注意事项

  • Xdebug 版本和 PHP 版本一致

并不是每个 Xdebug 版本都适配 PHP 每个版本,可以直接使用 官方工具,选择合适的 Xdebug 版本。

  • 本地文件和远端文件映射关系

如上图,在使用 PhpStorm 时进行远程调试时,需要配置本地文件和远端文件的目录映射关系,这样 IDE 才能根据 Xdebug 传递的当前执行文件路径与本地文件做匹配,实现断点调试和单步调试等。

查看原文

天翊 发布了文章 · 2019-03-25

我的博客发布上线方案 — Hexo

首发于 樊浩柏科学院

之前一直在使用 Hexo 推荐的发布方案,缺点是本地依赖 Hexo 环境,无法随时随地地更新博客。为了摆脱 Hexo 环境约束进而高效写作,有了下述的发布方案。

预览图

本文的发布方案中,Git 仓库只是托管 md 文件,通过 Webhook 通知服务器拉取 md 文件,然后执行构建静态文件操作,完成一个发布过程。

我的写作环境为 Typora(Win10),博客发布在阿里云的 ECS(CentOS)上,文章托管在 GitHub

需求迭代

随着时间成本的增高,只能利用碎片时间来进行写作。因此,我的写作场景变成了这样:

  • 习惯使用 MarkDown 写原稿,有 MarkDown 编辑器就行;
  • 写作场地不限定,有电脑就行;
  • 写作时间不确定,有灵感就写;

新的问题

之前(包括 Hexo 推荐)的发布方案,都是先本地编写 MarkDown 源文件,然后本地构建静态文件,最后同步静态文件到服务器。发布流程图如下:

原来的发布流程

显而易见,若继续使用之前的发布方案,那么每当更换写作场地时都需要安装 Hexo 环境,写作场地和时间都受到限制,不满足需求。

新的方案

问题主要是,本地受制于构建静态文件时需要的 Hexo 环境,那么是否可以将构建静态文件操作放到服务器端?

发布流程

首先,看下新方案的发布流程图:

我的发布流程

如流程图所示,整个发布系统共涉及到 3 个环境,分别为本地(写作)、Git 仓库(托管 md 源文件)、服务器(Web 服务)环境。在服务器环境构建静态文件,因此只需要在服务器端安装 Hexo 环境。

一个完整的发布流程包含 3 个部分:

写作流程

采用按分支开发策略,当写作完成后,只需要 push 修改到对应分支即可。只要有 MarkDown 编辑器,以及任何文本编辑器,甚至 马克飞象 都可以随时随地写作。

写作流程

当然,你可能说还需要 Git 环境呀?好吧,如果你是一名合格的 Coder,竟然没有 Git,你知道该干嘛了!再说没有 Git 环境,还可以通过 GitHub 来完成写作。

发布流程

采用 master 发布策略,当需要发布时,需要将对应开发分支 merge 到 master 分支,然后push master分支,即可实现发布。

发布流程

构建流程

这里使用到 Webhook 机制,触发服务器执行构建操作,构建脚本见 Webhook 脚本 部分。

当流程 ① 和 ② 结束后,Git 仓库都会向服务器发起一次 HTTP 请求,记录如下:

Webhook请求

当收到构建请求后,执行构建操作。构建流程图如下:

构建流程图

首先检查当前变更分支,只有为 master 分支时,执行 pull 操作拉取 md 文件更新,然后再执行 hexo g完成静态文件的构建。

Webhook脚本

Webhook 脚本使用 PHP 实现,代码如下:

主流程方法如下:

public function run()
{
    //校验token
    if ($this->checkToken()) {
        echo 'ok';
    } else {
        echo 'error';
    }
    fastcgi_finish_request();       //返回响应
    if ($this->checkBranch()) {     //校验分支
        $this->exec();              //执行操作逻辑
    }
}

这里使用 shell 脚本实现构建所需的所有操作,方便扩展。执行操作方法如下:

public function exec()
{
    //shell文件
    $path = $this->config['bash_path'];
    $result = shell_exec("sh $path 2>&1");
    $this->accessLog($result);
    return $result;
}

构建 shell 脚本如下:

#!/usr/bin/env bash

export NODE_HOME=/usr/local/node
export PATH=$NODE_HOME/bin:$PATH

pwd='/data/html/hexo'
cd $pwd/source
git pull
cd $pwd
$pwd/node_modules/hexo/bin/hexo g

总结

新发布方案与之前方案的区别是:前者只需本地编写 md 文件,博客服务器构建静态文件;后者本地编写 md 文件后,需要本地构建静态文件,然后博客服务器只同步静态文件。

当然,有很多办法可以解决当前问题,比如可以使用 持续集成。本文只是提供一个发布思路,在项目的生成环境中,我们也很容易应用上这种发布思路,开发出自己的发布系统。

相关文章 »

查看原文

赞 8 收藏 4 评论 0

天翊 发布了文章 · 2019-03-25

自如2018新年活动系统 — 抢红包

首发于 樊浩柏科学院

2017 年是自如快速增长的一年,自如客突破 100 万,管理资产达到 50 万间,在年底成功获得了 40 亿 A 轮融资,而这些都要感谢广大的自如客,公司为了回馈自如客,在六周年活动时就发放了 6000 万租住基金,当然年底散币活动也够疯狂。

2018口碑年

活动规模

既然公司对自如客这么阔,那对我们员工也得够意思,所以年底我们共准备了 3 个活动。

1、针对 自如客 的服务费减免活动;
2、针对 自如客 的 1000 万现金礼包;
3、25 万的 员工 红包活动;

员工红包活动

散币活动 2 和 3 是通过微信红包形式进行,想散币就散吧,可微信告诉我们,想散币还得交税(>﹏<)。员工红包来说,25 万要交掉 10 多万税,此时心疼我的钱。好了,下面开始说点正事。

技术方案

说到红包,我们肯定会想到红包拆分和抢红包两个场景。红包拆分是指将指定金额拆分为指定数目红包的过程,即是用来确定每个红包的金额数;而抢红包就是典型的高并发场景,需要避免红包超发的情况。

红包拆分

可选的方案

拆分方式

1、实时拆分
实时拆分,指的是在抢红包时实时计算每个红包的金额,以实现红包的拆分过程,对系统性能和拆分算法要求较高,例如拆分过程要一直保证后续待拆分红包的金额不能为空,不容易做到拆分红包的金额服从正态分布规律。

2、预先生成
预先生成,指的是在红包开抢之前已经完成了红包的拆分,抢红包时只是依次取出拆分好的红包金额,对拆分算法要求较低,可以拆分出随机性很好的红包金额,通常需要结合队列使用。

拆分算法

我并没有找到业界的通用算法,但红包拆分算法应该是拆分金额要看起来随机,最好能够服从正态分布,可以参考 微信@lcode 提供的红包拆分算法。

微信拆分算法的优点是算法较简单,拆分效率高,同时,由于该算法天然的特性,可以保证后续红包金额一定不为空,特别适合实时拆分场景,但缺点是会导致大额红包较大概率地在拆分的最后出现。 @lcode 拆分算法的优点是拆分金额基本符合正态分布,适合随机性要求较高的拆分场景。

我们的方案

我们这次的业务对红包金额的随机性要求不高,但是对系统可靠性要求较高,所以我们选用了预算生成方式,使用 二倍均值法 的红包拆分算法,作为我们的红包拆分方案。

采用预算生成方式,我们预先生成红包并放入 Redis 的 List 中,当抢红包时只是 Pop List 即可,具体实现将在 抢红包 部分介绍。

拆分算法可以描述为:假设剩余拆分金额为 M,剩余待拆分红包个数为 N,红包最小金额为 1 元,红包最小单位为元,那么定义当前红包的金额为:

$$m = rand(1, floor(M/N*2))$$

其中,floor 表示向下取整,rand(min, max) 表示从 [min, max] 区间随机一个值。$M/N \ast 2$ 表示剩余待拆分金额平均金额的 2 倍,因为 N >= 2,所以 $M/N \ast 2 <= M$,表示一定能保证后续红包能拆分到金额。

代码实现为:

for ($i = 0; $i < $N - 1; $i++) {
    $max = (int)floor($M / ($N - $i)) * 2;
    $m[$i] = $max ? mt_rand(1, $max) : 0;
    $M -= $m[$i];
}

$m[] = $M;

值得一提的是,我们为了保证红包金额差异尽量小,先将总金额平均拆分成 N+1 份,将第 N+1 份红包按照上述的红包拆分算法拆分成 N 份,这 N 份红包加上之前的平均金额才作为最终的红包金额。

抢红包

可选的方案

限流

1、前端限流
前端限制用户在 n 秒之内只能提交一次请求,虽然这种方式只能挡住小白,不过这是 99% 的用户哟,所以也必须得做。

2、后端限流
常用的后端限流方法有 漏桶算法令牌桶算法漏桶算法 主要目的是控制请求数据注入的速率,如果此时漏桶溢出,后续的请求数据会被丢弃。而 令牌桶算法 是以一个恒定的速度往桶里放入令牌,而如果请求数据需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌时,这些请求才被丢弃,令牌桶算法的一个好处是可以方便地改变应用接受请求的速率。

防超发

1、库存加锁
可以通过加锁的方式解决资源抢占问题,但是加锁会增加系统开销,大流量下更容易拖垮系统,不过可以尝试一下基于版本号的乐观锁。

2、通过高速队列串行化请求
之所会出现超发问题,是因为并发时会出现多个进程同时获取同一资源的现象,如果使用高速队列将并行请求串行化,那么问题就不存在了。高速队列可以使用 Redis 缓存服务器来实现,当然光使用队列还不够,必要保证整个流程调用链要短、要快,否则队列会积压严重,甚至会拖垮整个服务。

我们的方案

在限流方面,由于我们预估的请求量还在系统承受范围,所以没有考虑引入后端限流方案。我们的抢红包系统流程图如下:

抢红包流程图

我们将抢红包拆分为 红包占有(流程①,同步) 和 红包发放 (流程②,异步)这两个过程,首先采用高速队列串行化请求,红包发放逻辑由一组 Worker 异步去完成。高速队列只是完成红包占有的过程,实现库存的控制,Worker 则处理耗时较长的红包发放过程。

当然,在实际应用中,红包占用过程还需要加上一些前置规则校验,比如用户是否已经领取过,领取次数是否已经达到上限等?红包占有流程图如下:

红包占有流程图

其中,red::list为 List 结构,存放预先生成的红包金额(流程①中的红包队列);red::task 也为 List 结构,红包异步发放队列(流程②中的任务队列);red::draw为 Hash 结构,存放红包领取记录,field为用户的 openid,value为序列化的红包信息;red::draw_count:u:openid为 k-v 结构,用户领取红包计数器。

下面,我将以以下 3 个问题为中心,来说说我们设计出的抢红包系统。

1、怎么保证不超发
我们需要关注的是红包占有过程,从红包占有流程图可看出,这个过程是很多 Key 操作的组合,那怎么保证原子性?可以使用 Redis 事务,但我们选用了 Lua 方案,一方面是因为首先要保证性能,而 Lua 脚本嵌入 Redis 执行不存在性能瓶颈,另一方面 Lua 脚本执行时本身就是原子性的,满足需求。

红包占有的 Lua 脚本实现如下:

-- 领取人的openid为xxxxxxxxxxx
local openid = 'xxxxxxxxxxx'
local isDraw = redis.call('HEXISTS', 'red::draw', openid)
-- 已经领取
if isDraw ~= 0 then
    return true
end
-- 领取太多次了
local times = redis.call('INCR', 'red::draw_count:u:'..openid)
if times and tonumber(times) > 9 then
    return 0
end

local number = redis.call('RPOP', 'red::list')
-- 没有红包
if not number then
    return {}
end
-- 领取人昵称为Fhb,头像为https://xxxxxxx
local red = {money=number,name='Fhb',pic='https://xxxxxxx'}
-- 领取记录
redis.call('HSET', 'red::draw', openid, cjson.encode(red))
-- 处理队列
red['openid'] = openid
redis.call('RPUSH', 'red::task', cjson.encode(red))

return true
需要注意 Lua 脚本执行过程并不是事务的,脚本中的操作命令在执行时是有先后顺序的,当某个操作执行失败时不会回滚已经执行成功的操作,它的原子性是通过单线程模型实现。

2、怎么提高系统响应速度
如红包占有流程图所示,当用户发起抢红包请求时,若有红包则直接完成红包占有操作,同步告知用户是否抢到红包,这个过程要求快速响应。

但由于微信红包支付属于第三方调用,若抢到红包后同步调用红包支付,系统调用链又长又慢,所以红包占有和红包发放异步拆分是必然。拆分后,红包占有只需操作 Redis,响应性能已不是问题。

3、怎么提高系统处理能力
从上述分析可知,目前系统的压力都会集中在红包发放这个环节,因为用户抢到红包时,我们只是同步告知用户已抢到红包,然后异步去发放红包,因此用户并不会立即收到红包(受红包发放 Worker 处理能力和微信服务压力制约)。若红包发放的 Worker 处理能力较弱,那么红包发放的延迟就会很高,体验较差。

如抢红包流程图中所示,我们采用一组 Worker 去消费任务队列,并调用红包支付 API,以及数据持久化操作(后续对账)。尽管红包发放调用链又长又慢,但是注意到这些 Worker 是 无状态 的,所以可以通过增加 Worker 数量,以横向扩展提高系统的处理能力。

4、怎么保证数据一致性
其实,红包发放延时我们可以做到用户无感知,但是若红包发放(流程②)失败了,已经告知用户抢到红包,但是却木有发,估计他杀人的心都有了。根据 CAP 原理,我们无法同时满足数据一致性、数据可用性、分区耐受性,通常只需做到数据最终一致性。

为了达到数据最终一致性,我们就引入了重试机制,生成一个全局唯一的外部订单号,当某单红包发放失败,就会放回任务队列,使得有机会进行发放重试,当然这一切都需要 API 做幂等处理。

Worker可靠性保障

这里必须将 Worker 可靠性单独说,因为它实在太重要了。Worker 的实现如下:

$maxTask = 1000;
$sleepTime = 1000;

while (true) {
    while ($red = RedLogic::getTask()) {
        RedLogic::doTask($red);
        //处理多少个任务主动退出
        $maxTask--;
        if ($maxTask < 0) {
            return EXIT_CODE_NORMAL;
        }
    }
    //等待任务
    usleep($sleepTime);
}
这里使用 LPOP 命令获取任务,所以使用了 while 结构,并且无任务时需要等待,可以用阻塞命令 BLPOP 来改进。

由于 Worker 需要常驻内存运行,难免会出现异常退出的情况(也有主动退出), 所以需要保持 Worker 一直处于运行状态。我们使用进程管理工具 Supervisor 来监控 Worker 的运行状态,同时管理 Worker 的数量,当任务队列出现堆积时,增加 Worker 数量即可。Supervisor 的监控后台如下:

Supervisor进程管理

员工系统号散列

公司员工都用唯一一个系统号 emp_code(自增字段)标识,登录成功后返回 emp_code,系统后续所有交互流程都基于 emp_code,分享出去的红包也会携带 emp_code,为了保护员工敏感信息和防止恶意碰撞攻击,我们不能直接将 emp_code 暴露给前端,需要借助一个 token(无规律)的中间者来完成交互。

可选的方案

1、储存映射关系,时时查询
预先生成一个随机串 token,然后跟 emp_code 绑定,每次请求都根据 token 时时查询 emp_code。优点是可以定期更新,相对安全,缺点是性能不高。

2、建立映射关系函数,实时计算
建立一个映射关系函数,如 hash 散列或者加密解密算法,能够根据 emp_code 生成一个无规律的字符串 token,并且要能够根据 token 反映射出 emp_code。优点是需要存储介质存储关系,性能较高,缺点是很难做到定期失效并更新。

我们的方案

由于我们的红包活动只进行几天,所以我们选用了方案 2。对 emp_code 做了 hashids 散列算法,暴露的只是一串无规律的散列字符串。

hashids 是一个开源且轻量的唯一 id 生成器,支持 Java、PHP、C/C++、Python 等主流语言,PHP 想使用 hashids,只需composer require hashids/hashids命令安装即可。

然后,如下方式使用:

use Hashids\Hashids;

$hashids = new Hashids('salt', 6, 'abcdefghijk1234567890');

$hashids->encode(11002);    //994k2kk
$hashids->decode('994k2kk');  //[11002]

需要说明的是,其中salt是非常重要的散列加密盐串,6表示散列值最小长度,abcde...7890为散列字典,太长影响效率,太短不安全。由于默认的散列字典比较长,decode 效率并不高,所以这里移除了大写字母部分。

语音点赞

语音点赞就是用户以语音的形式助力好友,核心技术其实是语音识别,而我们一般都会使用第三方语音识别服务。

可选的方案

1、客户端调用第三方服务识别
客户端直接调用第三方语音识别服务,如微信提供了 JS-SDK 的语音识别 API ,返回识别的语音文本的信息,并且已经经过语义化。优点是识别较快,且不许关注语音存储问题,缺点是不安全,识别结果提交到服务端之前可能被恶意篡改。

2、服务端调用第三方服务识别
先将录制的语音上传至存储平台,然后服务端调用第三方语音识别服务,第三方语音识别服务去获取语音信息并识别,返回识别的语音文本的信息。优点是识别结果较安全,缺点是系统交互较多,识别效率不高。

我们的方案

我们业务场景的特殊性,存在用户可助力次数的限制,所以无需担心恶意刷赞的情况,因此可以选用方案 1,语音识别的交互流程如下:

语音识别交互图

此时,整个语音识别流程如下:

语音点赞流程图

当然中国文字博大精深,语音识别的文本在匹配时,需要考虑容错处理,可以将文本转化为拼音,然后匹配拼音,或者设置一个匹配百分比,达到匹配值则认为语音口令正确。

需要注意的是,微信只提供 3 天的语音存储服务,若语音播放周期较长,则要考虑实现语音的存储。

其他

红包发放测试

我们使用了线上公账号进行红包发放测试,为了让线上公众号能够授权到测试环境,在线上的微信授权回调地址新增一个参数,将带有to=feature参数的请求引流到测试环境,其他线上流量还是保持不变,匹配规则如下:

# Nginx不支持if嵌套,所以就这样变通实现
set $auth_redirect "";
if ($args ~* "r=auth/redirect") {
    set $auth_redirect "prod";
}
if ($args ~* "to=feature") {
    set $auth_redirect "feature";
}
if ($auth_redirect ~ "feature") {
    rewrite ^(.*)$ http://wx.t.ziroom.com/index.php last;
}
if ($auth_redirect ~ "prod") {
    rewrite ^(.*)$ http://wx.ziroom.com/index.php last;
}

CDN缓存

由于本次活动力度较大,预估流量会比以往增加不少(不能再出现机房带宽打满的情况了,不然 >﹏<),静态页面占流量的很大一部分,所以静态页面在发布时都会放置一份在 CDN 上,这样回源的流量就很小了。

灾备方案

尽管做了很多准备,还是无法确保万无一失,我们在每个关键节点都增加了开关,一点出现异常,通过配置中心可以人工介入做降级处理。

查看原文

赞 13 收藏 12 评论 2