用一个队列,或者消息,然后启多个worker去消费撒。
天翊 回答了问题 · 2019-04-19
用一个队列,或者消息,然后启多个worker去消费撒。
关注 6 回答 4
天翊 评论了文章 · 2019-04-08
google百度了下,PHP任务大体上可以分为三类
最近需要去定时请求数据,然后分析之后 指定相应的文本 通过socket广播给用户。
具体的分析 制定文本的业务 不复杂。 使用curl 请求数据 。但是对于定时任务这一块怎么使用都不行。
WIN服务器可以直接使用定时任务执行相关的应用程序,LINUX服务器则可以通过在/etc/crontab里添加定时任务来实现。
php -f 文件所在位置
缺点:最低一分钟的任务计划。 很多请求需要一分钟以内。 需要储存上一次的数据,下次再去取。
建立一个程序文件页面,通过ignore_user_abort来设置关闭浏览器页面后仍可执行,那么,在运行该程序页面后,您所需要的任务计划便会一直自动执行。对于一般的PHP程序员来说,如果没有足够的把握,这种操作是不被允许的,因为死循环极容易使用服务器当机。
set_time_limit(0); // 取消超时
// ignore_user_abort(true); // 浏览器关闭 继续执行
while (true) {
dongsomthing();
sleep( 120); // 定时 120秒 usleep() 毫秒 1000毫秒=1秒
}
die;
sleep 和 usleep 都不太精确
缺点:长时间占据内存,经常无缘无故 自己暂停。
···
<?php
use \Workerman\Worker;
use \Workerman\Lib\Timer;
require_once __DIR__ . './Workerman/Autoloader.php';
$task = new Worker();
// 开启多少个进程运行定时任务,注意多进程并发问题
$task->count = 1;
$task->onWorkerStart = function($task)
{
// 每2.5秒执行一次 支持小数,可以精确到0.001,即精确到毫秒级别
$time_interval = 30;
Timer::add($time_interval, function()
{
dosomthing()
echo "task run\n";
});
};
// 运行worker
Worker::runAll();
?>
···
可以精确到秒, 还是会经常暂停! 别人说他能跑半年 不停。 我是不是开的任务比较多,因为我又8个任务 没30秒 请求一次。
参考workman手册 (https://www.kancloud.cn/walko...)
使用方法很简单 配置好php环境,然后直接命令行执行改文件就行了
php -f 文件所在位置
https://www.workerman.net/workerman-chat
因为需要分析出文本之后广播到每个客户端,那如何在调用socket方法推送给每个客户呢?
参考其他项目中推送消息 (https://www.kancloud.cn/walko...)
那如何存入数据库呢?
参考https://www.kancloud.cn/walko...
参考(https://www.kancloud.cn/walko...
查看原文天翊 评论了文章 · 2019-04-08
这几天也是刚学习了shell脚本的编写,虽然他不像Python、Java那样能够编写很大型的项目(当然我是这么认为),但是在操控Linux系统方面,还是有独特的优势的,当然在学习过程中我们也能更好的了解Linux。接下来,就开始学习吧。后面会有几个小例子,当然都是别的地方挪过来的,我就是代码的搬运工,嘿嘿。喜欢学习的同志们可以点击Python聚焦专栏,查看更多知识。
在一般情况下,人们并不区分 Bourne Shell 和 Bourne Again Shell,所以,像 #!/bin/sh,它同样也可以改为 #!/bin/bash。
#!/bin/bash
chmod +x ./test.sh # 修改权限
./test.sh # 执行
运行时一定要写成./test.sh,因为直接写test.sh,linux会去PATH里寻找,而一般只有/bin,/sbin,/usr/bin,/usr/sbin等在PATH中,所以使用./test.sh告诉系统,就在本目录下找
/bin/sh test.sh
定义方式和python类似,只不过定义过程中不允许使用空格,默认都是字符串类型
declare命令定义有类型的变量
语法 | 说明 |
---|---|
${var_name#规则} | 从变量开头进行匹配,将符合最短的数据删除 |
${var_name##规则} | 从变量开头进行匹配,将符合最长的数据删除 |
${var_name%规则} | 从变量末尾进行匹配,将符合最短的数据删除 |
${var_name%%规则} | 从变量末尾进行匹配,将符合最长的数据删除 |
${变量名/旧字符串/新字符串} | 变量内容符合旧字符串,就将第一个替换 |
${变量名//旧字符串/新字符串} | 变量内容符合旧字符串,就全部替换 |
Shell字符串
在进行拼接的时候是可以出现以下形式的:
求长度
求字串索引
expr index $str substr_reg
匹配的字串的长度
截取
定义:
读取:
获取长度:
多行:
定义
返回值
局部变量
函数库
$* / $@:以一个单字符串显示所有向脚本传递的参数。
一个变量是否为0, [ $var -eq 0 ]
包含 echo `expr 2 + 2` # 输出4
浮点数计算:bc
read命令
read -p "请输入一段文字:" -n 6 -t 5 -s password
echo
printf
printf "%s" jim
test:一般用于替换中括号
test $num = $num2
if . . . else
if condition
then
command1
elif condition1
then
command2
else
commandN
fi
for . . . in
for var in data
do
command1
done
while
while condition
do
command
done
死循环
while :
do
command
done
# 使用true
while true
do
command
done
# 使用for
for (( ; ; ))
case
case value in
value1)
command1
;;
value2)
command2
;;
*)
command2
;;
esac
# 语法格式:
sed [option] "pattern command" file_name
# 删除文件第一行
sed -i '1d' file_name
option选项
pattern
command
反向引用
在使用替换字符的时候,修改内容使用&表示使用被替换的条件
# 在匹配到^la..jim的后面加shuai
# &:全匹配,\1:其使用了正则的分组,所以前面需要使用小括号括起来
sed -i 's/^la..jim/&shuai/g' sed.txt
# 语法格式:
awk 'BEGIN{}pattern{commands}END{}' file_name
for user in `cat /etc/passwd | cut -d ":" -f 1`
do
echo "$user"
done
nginx_num_process=$(ps -ef | grep nginx | grep -v grep | wc -l)
if [ nginx_num_process -eq 0 ];then
systemctl start nginx
fi
while true
do
read -p "pls input a positive number: " num
expr $num + 1 &> /dev/null
if [ $? -eq 0 ];then
if [ `expr $num \> 0` -eq 1 ];then
for((i=1;i<=$num;i++))
do
sum=`expr $sum + $i`
done
echo "1+2+3+....+$num = $sum"
exit
fi
fi
echo "error,input enlegal"
continue
done
this_pid=$$
while true
do
ps -ef | grep nginx | grep -v grep | grep -v $this_pid &> /dev/null
if [ $? -eq 0 ];then
echo "Nginx is running well"
sleep 3
else
systemctl start nginx
echo "Nginx is down,Start it...."
fi
done
FILE_NAME=/root/lesson/5.6/my.cnf
function get_all_segments
{
echo "`sed -n '/\[.*\]/p' $FILE_NAME | sed -e 's/\[//g' -e 's/\]//g'`"
}
function count_items_in_segment
{
items=`sed -n '/\['$1'\]/,/\[.*\]/p' $FILE_NAME | grep -v "^#" | grep -v "^$" | grep -v "\[.*\]"`
index=0
for item in $items
do
index=`expr $index + 1`
done
echo $index
}
number=0
for segment in `get_all_segments`
do
number=`expr $number + 1`
items_count=`count_items_in_segment $segment`
echo "$number: $segment $items_count"
done
sed -i '/[:blank:]*#/^$/d' config.cnf
sed -i 's/^[^#]/\*&/g' config.cnf
user=""
password=""
host=""
mysql_conn="mysql -u"$user" -p"$password" -h"$host""
IFS=":" # 内置分隔符变量
cat data.txt | while read id name birth sex
do
$mysql_conn -e "insert into school values('$id','$name','$birth','$sex')"
done
ftp -inv << EOF
open ftp_ip_addr
user user_name password
put file_name
bye
EOF #必须顶格写
根据其他表的结构创建新表
mysqldumps备份mysql
天翊 评论了文章 · 2019-03-27
首发于 樊浩柏科学院
假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。
需求点,可以描述为:
同样这个需求,定了以下两个硬指标:
首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:
为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。
改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:
1、数据表价格字段存卢比
将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。
2、接口输出数据时转化为卢比
原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。
针对以上方案,我们需要注意 2 个问题:
上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。
最终,为了系统的可扩展性,我们选择了方案 ②。
这里主要改造了商品中心,主要解决 透传地区标识 和 支持多币种价格 这 2 个问题。
我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。
思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location
携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。
由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:
class Location
{
public static function init()
{
global $context;
if (empty($context['location'])) {
return;
}
// API在这里直接获取X-Location头
if (!empty($_SERVER['HTTP_X_LOCATION'])) {
$context['location'] = $_SERVER['HTTP_X_LOCATION'];
}
// RPC Server会自动获取Context
}
}
上述init()
方法,需要在项目入口位置初始化。
其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context
值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context
。
RPC Client 传递 Context 实现如下:
protected function addGlobalContext($data)
{
global $context;
$context = !is_array($context) ? array() : $context;
// data为待请求的RPC协议数据
$data['Context'] = $context;
return $data;
}
RPC Server 获取 Context 实现如下:
public function getGlobalContext($packet)
{
global $context;
$context = array();
// packet为接收的RPC协议数据
if(isset($packet['Context'])) {
$context = $packet['Context'];
}
}
当设置了 Context 后,RPC 通信时协议数据会携带location
字段,内容如下:
RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:
class Location
{
public static function set($location)
{
global $context;
$context['location'] = $location;
// API需要在这里单独设置X-Location头
header('X-Location: ' . $context['location']);
}
}
设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:
class Location
{
public static function get()
{
global $context;
if (!isset($context['location'])) {
return 'china';
}
return $context['location'];
}
}
有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。
下述的ReadBase
类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()
和getAll()
方法,所以我们只需要改造这两个方法。
class ReadBase
{
public function getOne(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
return $this->getExchangePrice($data);
}
public function getAll(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
if ($data) {
foreach ($data as &$one) {
$this->getExchangePrice($one);
}
}
return $data;
}
}
由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。
protected function isExchangeField($field)
{
$priceSuffix = array('cost', '_price');
$black = array();
$len = strlen($field) ;
foreach ($priceSuffix as $suffix) {
$lastPos = $len - strlen($suffix);
// 非黑名单且非is_
if (!in_array($field, $black)
&& false === strpos($field, 'is_')
&& $lastPos === strpos($field, $suffix)
) {
return true;
}
}
return false;
}
前缀为is_
的字段一般定义为标识字段,默认为非价格字段。
上述getExchangePrice()
方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin
后缀的人民币价格字段。
public function getExchangePrice(&$data)
{
if (empty($data)) {
return $data;
}
$originPrice = array();
foreach ($data as $field => &$value) {
// 是否是价格字段
if ($this->isExchangeField($field)) {
$originField = $field . '_origin';
$originPrice[$originField] = $value;
// 获取对应地区的价格
$value = $this->getExchangePrice($value);
}
}
$data = array_merge($originPrice, $data);
return $data;
}
public static function getExchangePrice($price)
{
// 获取地区标识
$location = Location::get();
// 汇率
$exchangeRateConfig = \Config::$exchangeRate;
if ($location === 'china') {
return $price;
} else if (isset($exchangeRateConfig[$location])) {
$exchangeRate = $exchangeRateConfig[$location];
} else {
throw new \BusinessException("not found $location exchange rate");
}
// 向上取值并保留两位小数
$exchangePrice = bcmul($price, $exchangeRate, 3);
return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}
其中,getExchangePrice()
方法会调用Location::get()
获取地区标识,并根据汇率计算实时价格。
最终,商品中心改造后,得到的部分商品价格信息,如下:
# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10
对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location
头即可。
GET /product/detail/1 HTTP/1.1
Request Headers
X-Location: india
API 项目需在入口文件处,初始化地区标识。如下:
Location::init();
对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:
Location::init();
// 地区设置为中国
Location::set('china');
为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!
查看原文天翊 评论了文章 · 2019-03-27
首发于 樊浩柏科学院
假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。
需求点,可以描述为:
同样这个需求,定了以下两个硬指标:
首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:
为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。
改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:
1、数据表价格字段存卢比
将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。
2、接口输出数据时转化为卢比
原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。
针对以上方案,我们需要注意 2 个问题:
上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。
最终,为了系统的可扩展性,我们选择了方案 ②。
这里主要改造了商品中心,主要解决 透传地区标识 和 支持多币种价格 这 2 个问题。
我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。
思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location
携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。
由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:
class Location
{
public static function init()
{
global $context;
if (empty($context['location'])) {
return;
}
// API在这里直接获取X-Location头
if (!empty($_SERVER['HTTP_X_LOCATION'])) {
$context['location'] = $_SERVER['HTTP_X_LOCATION'];
}
// RPC Server会自动获取Context
}
}
上述init()
方法,需要在项目入口位置初始化。
其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context
值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context
。
RPC Client 传递 Context 实现如下:
protected function addGlobalContext($data)
{
global $context;
$context = !is_array($context) ? array() : $context;
// data为待请求的RPC协议数据
$data['Context'] = $context;
return $data;
}
RPC Server 获取 Context 实现如下:
public function getGlobalContext($packet)
{
global $context;
$context = array();
// packet为接收的RPC协议数据
if(isset($packet['Context'])) {
$context = $packet['Context'];
}
}
当设置了 Context 后,RPC 通信时协议数据会携带location
字段,内容如下:
RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:
class Location
{
public static function set($location)
{
global $context;
$context['location'] = $location;
// API需要在这里单独设置X-Location头
header('X-Location: ' . $context['location']);
}
}
设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:
class Location
{
public static function get()
{
global $context;
if (!isset($context['location'])) {
return 'china';
}
return $context['location'];
}
}
有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。
下述的ReadBase
类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()
和getAll()
方法,所以我们只需要改造这两个方法。
class ReadBase
{
public function getOne(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
return $this->getExchangePrice($data);
}
public function getAll(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
if ($data) {
foreach ($data as &$one) {
$this->getExchangePrice($one);
}
}
return $data;
}
}
由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。
protected function isExchangeField($field)
{
$priceSuffix = array('cost', '_price');
$black = array();
$len = strlen($field) ;
foreach ($priceSuffix as $suffix) {
$lastPos = $len - strlen($suffix);
// 非黑名单且非is_
if (!in_array($field, $black)
&& false === strpos($field, 'is_')
&& $lastPos === strpos($field, $suffix)
) {
return true;
}
}
return false;
}
前缀为is_
的字段一般定义为标识字段,默认为非价格字段。
上述getExchangePrice()
方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin
后缀的人民币价格字段。
public function getExchangePrice(&$data)
{
if (empty($data)) {
return $data;
}
$originPrice = array();
foreach ($data as $field => &$value) {
// 是否是价格字段
if ($this->isExchangeField($field)) {
$originField = $field . '_origin';
$originPrice[$originField] = $value;
// 获取对应地区的价格
$value = $this->getExchangePrice($value);
}
}
$data = array_merge($originPrice, $data);
return $data;
}
public static function getExchangePrice($price)
{
// 获取地区标识
$location = Location::get();
// 汇率
$exchangeRateConfig = \Config::$exchangeRate;
if ($location === 'china') {
return $price;
} else if (isset($exchangeRateConfig[$location])) {
$exchangeRate = $exchangeRateConfig[$location];
} else {
throw new \BusinessException("not found $location exchange rate");
}
// 向上取值并保留两位小数
$exchangePrice = bcmul($price, $exchangeRate, 3);
return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}
其中,getExchangePrice()
方法会调用Location::get()
获取地区标识,并根据汇率计算实时价格。
最终,商品中心改造后,得到的部分商品价格信息,如下:
# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10
对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location
头即可。
GET /product/detail/1 HTTP/1.1
Request Headers
X-Location: india
API 项目需在入口文件处,初始化地区标识。如下:
Location::init();
对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:
Location::init();
// 地区设置为中国
Location::set('china');
为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!
查看原文天翊 评论了文章 · 2019-03-27
首发于 樊浩柏科学院
假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。
需求点,可以描述为:
同样这个需求,定了以下两个硬指标:
首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:
为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。
改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:
1、数据表价格字段存卢比
将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。
2、接口输出数据时转化为卢比
原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。
针对以上方案,我们需要注意 2 个问题:
上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。
最终,为了系统的可扩展性,我们选择了方案 ②。
这里主要改造了商品中心,主要解决 透传地区标识 和 支持多币种价格 这 2 个问题。
我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。
思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location
携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。
由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:
class Location
{
public static function init()
{
global $context;
if (empty($context['location'])) {
return;
}
// API在这里直接获取X-Location头
if (!empty($_SERVER['HTTP_X_LOCATION'])) {
$context['location'] = $_SERVER['HTTP_X_LOCATION'];
}
// RPC Server会自动获取Context
}
}
上述init()
方法,需要在项目入口位置初始化。
其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context
值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context
。
RPC Client 传递 Context 实现如下:
protected function addGlobalContext($data)
{
global $context;
$context = !is_array($context) ? array() : $context;
// data为待请求的RPC协议数据
$data['Context'] = $context;
return $data;
}
RPC Server 获取 Context 实现如下:
public function getGlobalContext($packet)
{
global $context;
$context = array();
// packet为接收的RPC协议数据
if(isset($packet['Context'])) {
$context = $packet['Context'];
}
}
当设置了 Context 后,RPC 通信时协议数据会携带location
字段,内容如下:
RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:
class Location
{
public static function set($location)
{
global $context;
$context['location'] = $location;
// API需要在这里单独设置X-Location头
header('X-Location: ' . $context['location']);
}
}
设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:
class Location
{
public static function get()
{
global $context;
if (!isset($context['location'])) {
return 'china';
}
return $context['location'];
}
}
有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。
下述的ReadBase
类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()
和getAll()
方法,所以我们只需要改造这两个方法。
class ReadBase
{
public function getOne(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
return $this->getExchangePrice($data);
}
public function getAll(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
if ($data) {
foreach ($data as &$one) {
$this->getExchangePrice($one);
}
}
return $data;
}
}
由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。
protected function isExchangeField($field)
{
$priceSuffix = array('cost', '_price');
$black = array();
$len = strlen($field) ;
foreach ($priceSuffix as $suffix) {
$lastPos = $len - strlen($suffix);
// 非黑名单且非is_
if (!in_array($field, $black)
&& false === strpos($field, 'is_')
&& $lastPos === strpos($field, $suffix)
) {
return true;
}
}
return false;
}
前缀为is_
的字段一般定义为标识字段,默认为非价格字段。
上述getExchangePrice()
方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin
后缀的人民币价格字段。
public function getExchangePrice(&$data)
{
if (empty($data)) {
return $data;
}
$originPrice = array();
foreach ($data as $field => &$value) {
// 是否是价格字段
if ($this->isExchangeField($field)) {
$originField = $field . '_origin';
$originPrice[$originField] = $value;
// 获取对应地区的价格
$value = $this->getExchangePrice($value);
}
}
$data = array_merge($originPrice, $data);
return $data;
}
public static function getExchangePrice($price)
{
// 获取地区标识
$location = Location::get();
// 汇率
$exchangeRateConfig = \Config::$exchangeRate;
if ($location === 'china') {
return $price;
} else if (isset($exchangeRateConfig[$location])) {
$exchangeRate = $exchangeRateConfig[$location];
} else {
throw new \BusinessException("not found $location exchange rate");
}
// 向上取值并保留两位小数
$exchangePrice = bcmul($price, $exchangeRate, 3);
return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}
其中,getExchangePrice()
方法会调用Location::get()
获取地区标识,并根据汇率计算实时价格。
最终,商品中心改造后,得到的部分商品价格信息,如下:
# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10
对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location
头即可。
GET /product/detail/1 HTTP/1.1
Request Headers
X-Location: india
API 项目需在入口文件处,初始化地区标识。如下:
Location::init();
对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:
Location::init();
// 地区设置为中国
Location::set('china');
为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!
查看原文天翊 评论了文章 · 2019-03-26
首发于 樊浩柏科学院
经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。
PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。
该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:
其中,主要涉及 3 个对象,分别为 入口脚本、Master 进程、Worker 进程。它们扮演的角色如下:
start
、stop
、reload
流程;整个过程,又包括 4 个流程:
fork
出一个 Master 进程;Master 进程先经过 保存 PID、注册信号处理器 操作,然后 创建 Worker 会fork
出多个 Worker 进程;在流程 ② 中,Worker 进程被 Master 进程fork
出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。
启动流程见 流程 ①,主要包括 守护进程、保存 PID、注册信号处理器、创建多进程 Worker 这 4 部分。
首先,在入口脚本中fork
一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:
这里使用了 2 次fork
,所以最后fork
的一个子进程才是 Master 进程,其实一次fork
也是可以的。代码如下:
protected static function daemonize()
{
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif ($pid > 0) {
exit(0);
}
// 将当前进程提升为会话leader
if (-1 === posix_setsid()) {
exit("process setsid fail\n");
}
// 再次fork以避免SVR4这种系统终端再一次获取到进程控制
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif (0 !== $pid) {
exit(0);
}
}
通常在启动时增加-d
参数,表示进程将运行于守护态模式。
当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:
protected static function resetStdFd()
{
global $STDERR, $STDOUT;
//重定向标准输出和错误输出
@fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(static::$stdoutFile, 'a');
$STDERR = fopen(static::$stdoutFile, 'a');
}
为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid
文件。代码如下:
protected static function saveMasterPid()
{
// 保存pid以实现重载和停止
static::$_masterPid = posix_getpid();
if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
exit("can not save pid to" . static::$pidFile . "\n");
}
echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}
因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。
这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:
protected static function installSignal()
{
pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);
// 忽略信号
pcntl_signal(SIGUSR2, SIG_IGN, false);
pcntl_signal(SIGHUP, SIG_IGN, false);
}
protected static function signalHandler($signal)
{
switch($signal) {
case SIGINT:
case SIGTERM:
static::stop();
break;
case SIGQUIT:
case SIGUSR1:
static::reload();
break;
default: break;
}
}
其中,SIGINT 和 SIGTERM 信号会触发stop
操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload
操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill
掉。
Master 进程通过fork
系统调用,就能创建多个 Worker 进程。实现代码,如下:
protected static function forkOneWorker()
{
$pid = pcntl_fork();
// 父进程
if ($pid > 0) {
static::$_workers[] = $pid;
} else if ($pid === 0) { // 子进程
static::setProcessTitle('PHPServer: worker');
// 子进程会阻塞在这里
static::run();
// 子进程退出
exit(0);
} else {
throw new \Exception("fork one worker fail");
}
}
protected static function forkWorkers()
{
while(count(static::$_workers) < static::$workerCount) {
static::forkOneWorker();
}
}
Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:
对于 Worker 进程,run()
方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while
来模拟调度,实际中应该使用事件(Select 等)驱动。
public static function run()
{
// 模拟调度,实际用event实现
while (1) {
// 捕获信号
pcntl_signal_dispatch();
call_user_func(function() {
// do something
usleep(200);
});
}
}
其中,pcntl_signal_dispatch()
会在每次调度过程中,捕获信号并执行注册的信号处理器。
Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:
对于 Master 进程的调度,这里也使用了while
,但是引入了wait
的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。
protected static function monitor()
{
while (1) {
// 这两处捕获触发信号,很重要
pcntl_signal_dispatch();
// 挂起当前进程的执行直到一个子进程退出或接收到一个信号
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
pcntl_signal_dispatch();
if ($pid >= 0) {
// worker健康检查
static::checkWorkerAlive();
}
// 其他你想监控的
}
}
第两次的pcntl_signal_dispatch()
捕获信号,是由于wait
挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。
其中,PHPServer 的 停止 和 重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。
由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:
代码实现,如下:
protected static function checkWorkerAlive()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $index => $pid) {
if (!static::isAlive($pid)) {
unset(static::$_workers[$index]);
}
}
static::forkWorkers();
}
Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:
入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()
方法。如下:
protected static function stop()
{
// 主进程给所有子进程发送退出信号
if (static::$_masterPid === posix_getpid()) {
static::stopAllWorkers();
if (is_file(static::$pidFile)) {
@unlink(static::$pidFile);
}
exit(0);
} else { // 子进程退出
// 退出前可以做一些事
exit(0);
}
}
若是 Master 进程执行该方法,会先调用stopAllWorkers()
方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;
由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。
protected static function stopAllWorkers()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $workerPid) {
posix_kill($workerPid, SIGINT);
}
// 子进程退出异常,强制kill
usleep(1000);
if (static::isAlive($allWorkerPid)) {
foreach ($allWorkerPid as $workerPid) {
static::forceKill($workerPid);
}
}
// 清空worker实例
static::$_workers = array();
}
代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:
整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()
方法,reload()
方法调用stopAllWorkers()
方法。如下:
protected static function reload()
{
// 停止所有worker即可,master会自动fork新worker
static::stopAllWorkers();
}
reload()
方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。
你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。
到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:
$ php server.php
Usage: Commands [mode]
Commands:
start Start worker.
stop Stop worker.
reload Reload codes.
Options:
-d to start in DAEMON mode.
Use "--help" for more information about a command.
首先,我们启动它:
$ php server.php start -d
PHPServer start [OK]
其次,查看进程树,如下:
$ pstree -p
init(1)-+-init(3)---bash(4)
|-php(1286)-+-php(1287)
`-php(1288)
最后,我们把它停止:
$ php server.php stop
PHPServer stopping ...
PHPServer stop success
现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)
我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:
首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。
相关文章 »
天翊 评论了文章 · 2019-03-26
首发于 樊浩柏科学院
经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。
PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。
该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:
其中,主要涉及 3 个对象,分别为 入口脚本、Master 进程、Worker 进程。它们扮演的角色如下:
start
、stop
、reload
流程;整个过程,又包括 4 个流程:
fork
出一个 Master 进程;Master 进程先经过 保存 PID、注册信号处理器 操作,然后 创建 Worker 会fork
出多个 Worker 进程;在流程 ② 中,Worker 进程被 Master 进程fork
出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。
启动流程见 流程 ①,主要包括 守护进程、保存 PID、注册信号处理器、创建多进程 Worker 这 4 部分。
首先,在入口脚本中fork
一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:
这里使用了 2 次fork
,所以最后fork
的一个子进程才是 Master 进程,其实一次fork
也是可以的。代码如下:
protected static function daemonize()
{
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif ($pid > 0) {
exit(0);
}
// 将当前进程提升为会话leader
if (-1 === posix_setsid()) {
exit("process setsid fail\n");
}
// 再次fork以避免SVR4这种系统终端再一次获取到进程控制
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif (0 !== $pid) {
exit(0);
}
}
通常在启动时增加-d
参数,表示进程将运行于守护态模式。
当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:
protected static function resetStdFd()
{
global $STDERR, $STDOUT;
//重定向标准输出和错误输出
@fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(static::$stdoutFile, 'a');
$STDERR = fopen(static::$stdoutFile, 'a');
}
为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid
文件。代码如下:
protected static function saveMasterPid()
{
// 保存pid以实现重载和停止
static::$_masterPid = posix_getpid();
if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
exit("can not save pid to" . static::$pidFile . "\n");
}
echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}
因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。
这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:
protected static function installSignal()
{
pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);
// 忽略信号
pcntl_signal(SIGUSR2, SIG_IGN, false);
pcntl_signal(SIGHUP, SIG_IGN, false);
}
protected static function signalHandler($signal)
{
switch($signal) {
case SIGINT:
case SIGTERM:
static::stop();
break;
case SIGQUIT:
case SIGUSR1:
static::reload();
break;
default: break;
}
}
其中,SIGINT 和 SIGTERM 信号会触发stop
操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload
操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill
掉。
Master 进程通过fork
系统调用,就能创建多个 Worker 进程。实现代码,如下:
protected static function forkOneWorker()
{
$pid = pcntl_fork();
// 父进程
if ($pid > 0) {
static::$_workers[] = $pid;
} else if ($pid === 0) { // 子进程
static::setProcessTitle('PHPServer: worker');
// 子进程会阻塞在这里
static::run();
// 子进程退出
exit(0);
} else {
throw new \Exception("fork one worker fail");
}
}
protected static function forkWorkers()
{
while(count(static::$_workers) < static::$workerCount) {
static::forkOneWorker();
}
}
Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:
对于 Worker 进程,run()
方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while
来模拟调度,实际中应该使用事件(Select 等)驱动。
public static function run()
{
// 模拟调度,实际用event实现
while (1) {
// 捕获信号
pcntl_signal_dispatch();
call_user_func(function() {
// do something
usleep(200);
});
}
}
其中,pcntl_signal_dispatch()
会在每次调度过程中,捕获信号并执行注册的信号处理器。
Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:
对于 Master 进程的调度,这里也使用了while
,但是引入了wait
的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。
protected static function monitor()
{
while (1) {
// 这两处捕获触发信号,很重要
pcntl_signal_dispatch();
// 挂起当前进程的执行直到一个子进程退出或接收到一个信号
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
pcntl_signal_dispatch();
if ($pid >= 0) {
// worker健康检查
static::checkWorkerAlive();
}
// 其他你想监控的
}
}
第两次的pcntl_signal_dispatch()
捕获信号,是由于wait
挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。
其中,PHPServer 的 停止 和 重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。
由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:
代码实现,如下:
protected static function checkWorkerAlive()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $index => $pid) {
if (!static::isAlive($pid)) {
unset(static::$_workers[$index]);
}
}
static::forkWorkers();
}
Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:
入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()
方法。如下:
protected static function stop()
{
// 主进程给所有子进程发送退出信号
if (static::$_masterPid === posix_getpid()) {
static::stopAllWorkers();
if (is_file(static::$pidFile)) {
@unlink(static::$pidFile);
}
exit(0);
} else { // 子进程退出
// 退出前可以做一些事
exit(0);
}
}
若是 Master 进程执行该方法,会先调用stopAllWorkers()
方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;
由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。
protected static function stopAllWorkers()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $workerPid) {
posix_kill($workerPid, SIGINT);
}
// 子进程退出异常,强制kill
usleep(1000);
if (static::isAlive($allWorkerPid)) {
foreach ($allWorkerPid as $workerPid) {
static::forceKill($workerPid);
}
}
// 清空worker实例
static::$_workers = array();
}
代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:
整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()
方法,reload()
方法调用stopAllWorkers()
方法。如下:
protected static function reload()
{
// 停止所有worker即可,master会自动fork新worker
static::stopAllWorkers();
}
reload()
方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。
你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。
到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:
$ php server.php
Usage: Commands [mode]
Commands:
start Start worker.
stop Stop worker.
reload Reload codes.
Options:
-d to start in DAEMON mode.
Use "--help" for more information about a command.
首先,我们启动它:
$ php server.php start -d
PHPServer start [OK]
其次,查看进程树,如下:
$ pstree -p
init(1)-+-init(3)---bash(4)
|-php(1286)-+-php(1287)
`-php(1288)
最后,我们把它停止:
$ php server.php stop
PHPServer stopping ...
PHPServer stop success
现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)
我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:
首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。
相关文章 »
天翊 评论了文章 · 2019-03-26
首发于 樊浩柏科学院
Supervisor 是一款使用 Python 开发的非常优秀的进程管理工具。它可以在类 UNIX 系统上让用户精确地监视与控制多组指定数量的服务进程。当监控的服务进程意外退出时,会尝试自动重启这些服务,以保持服务可用状态。
Supervisor 官方 提供的安装方式较多,这里采用 pip 方式安装。
$ yum install python-pip
# 升级pip
$ pip install --upgrade pip
$ pip -V
pip 9.0.1
通过 pip 安装 Supervisor:
$ pip install supervisor
Successfully installed supervisor-3.3.3
安装 Supervisor 后,会出现 supervisorctl 和 supervisord 两个程序,其中 supervisorctl 为服务监控终端,而 supervisord 才是所有监控服务的大脑。查看 supervisord 是否安装成功:
$ supervisord -v
3.3.3
将 supervisord 配置成开机启动服务,下载官方 init 脚本。
修改关键路径配置:
PIDFILE=/var/run/supervisord.pid
LOCKFILE=/var/lock/subsys/supervisord
OPTIONS="-c /etc/supervisord.conf"
移到该文件到/etc/init.d
目录下,并重命名为 supervisor,添加可执行权限:
$ chmod 777 /etc/init.d/supervisor
配置成开机启动服务:
$ chkconfig --add supervisor
$ chkconfig supervisor on
$ chkconfig --list | grep "supervisor"
supervisor 0:off 1:off 2:on 3:on 4:on 5:on 6:off
Supervisord 安装后,需要使用如下命令生成配置文件。
$ mkdir /etc/supervisor
$ echo_supervisord_conf > /etc/supervisor/supervisord.conf
supervisord.conf
的主配置部分说明:
[unix_http_server]
file=/tmp/supervisor.sock ; socket文件的路径
;chmod=0700 ; socket文件权限
;chown=nobody:nogroup ; socket文件用户和用户组
;username=user ; 连接时认证的用户名
;password=123 ; 连接时认证的密码
[inet_http_server] ; 监听TCP
port=127.0.0.1:9001 ; 监听ip和端口
username=user ; 连接时认证的用户名
password=123 ; 连接时认证的密码
[supervisord]
logfile=/var/log/supervisord.log ; log目录
logfile_maxbytes=50MB ; log文件最大空间
logfile_backups=10 ; log文件保持的数量
loglevel=info ; log级别
pidfile=/var/run/supervisord.pid
nodaemon=false ; 是否非守护进程态运行
minfds=1024 ; 系统空闲的最少文件描述符
minprocs=200 ; 可用的最小进程描述符
;umask=022 ; 进程创建文件的掩码
;identifier=supervisor ; supervisord标识符
;directory=/tmp ; 启动前切换到的目录
;nocleanup=true ; 启动前是否清除子进程的日志文件
;childlogdir=/tmp ; AUTO模式,子进程日志路径
;environment=KEY="value" ; 设置环境变量
[rpcinterface:supervisor] ; XML_RPC配置
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; 连接的socket路径
;username=chris ; 用户名
;password=123 ; 密码
prompt=mysupervisor ; 输入用户名和密码时的提示符
;history_file=~/.sc_history ; 历史操作记录存储路径
[include] ; 包含文件,将每个进程配置为一个文件并包含
files = /etc/supervisor/*.ini ; 多个进程的配置文件
这部分我们不需要做太多的配置修改,如果需要开启 WEB 终端监控,则需要配置并开启 inet_http_server 项。
Supervisor 需管理的进程服务配置,示例如下:
[program:work] ; 服务名,例如work
command=php -r "sleep(10);exit(1);" ; 带有参数的可执行命令
process_name=%(process_num)s ; 进程名,当numprocs>1时,需包含%(process_num)s
numprocs=2 ; 启动进程的数目数
;directory=/tmp ; 运行前切换到该目录
;umask=022 ; 进程掩码
;priority=999 ; 子进程启动关闭优先级
autostart=true ; 子进程是否被自动启动
startsecs=1 ; 成功启动几秒后则认为成功启动
;startretries=3 ; 子进程启动失败后,最大尝试启动的次数
autorestart=unexpected ; 子进程意外退出后自动重启的选项,false, unexpected, true。unexpected表示不在exitcodes列表时重启
exitcodes=0,2 ; 期待的子程序退出码
;stopsignal=QUIT ; 进程停止信号,可以为TERM,HUP,INT,QUIT,KILL,USR1,or USR2等信号,默认为TERM
;stopwaitsecs=10 ; 发送停止信号后等待的最大时间
;stopasgroup=false ; 是否向子进程组发送停止信号
;killasgroup=false ; 是否向子进程组发送kill信号
;redirect_stderr=true ; 是否重定向日志到标准输出
stdout_logfile=/data/logs/work.log ; 进程的stdout的日志路径
;stdout_logfile_maxbytes=1MB ; 日志文件最大大小
;stdout_logfile_backups=10
;stdout_capture_maxbytes=1MB
;stderr_logfile=/a/path ; stderr的日志路径
;stderr_logfile_maxbytes=1MB
;stderr_logfile_backups=10
;stderr_capture_maxbytes=1MB
;environment=A="1",B="2" ; 子进程的环境变量
;serverurl=AUTO ; 子进程的环境变量SUPERVISOR_SERVER_URL
通常将每个进程的配置信息配置成独立文件,并通过 include 模块包含,这样方便修改和管理配置文件。
配置完成后,启动 supervisord 守护服务:
$ supervisord -c /etc/supervisor/supervisord.conf
常用的命令参数说明:
查看 supervisord 启动情况:
$ ps -ef | grep "supervisor"
root 24901 1 0 Sep23 ? 00:00:30 /usr/bin/python /usr/bin/supervisord -c /etc/supervisor/supervisord.conf
$ netstat -tunpl
tcp 0 0 127.0.0.1:9001 0.0.0.0:* LISTEN 24901/python
Supervisor 提供了多种监控服务的方式,包括 supervisorctl 命令行终端、Web 端、XML_RPC 接口多种方式。
直接使用 supervisorctl 即可在命令行终端查看所有服务的情况,如下:
$ supervisorctl
work:0 RUNNING pid 31313, uptime 0:00:07
work:1 RUNNING pid 31318, uptime 0:00:06
# -u 用户名 -p 密码
supervisorctl 常用命令列表如下;
在配置中开启 inet_http_server 后,即可通过 Web 界面便捷地监控进程服务了。
天翊 评论了文章 · 2019-03-26
首发于 樊浩柏科学院
经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。
PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。
该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:
其中,主要涉及 3 个对象,分别为 入口脚本、Master 进程、Worker 进程。它们扮演的角色如下:
start
、stop
、reload
流程;整个过程,又包括 4 个流程:
fork
出一个 Master 进程;Master 进程先经过 保存 PID、注册信号处理器 操作,然后 创建 Worker 会fork
出多个 Worker 进程;在流程 ② 中,Worker 进程被 Master 进程fork
出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。
启动流程见 流程 ①,主要包括 守护进程、保存 PID、注册信号处理器、创建多进程 Worker 这 4 部分。
首先,在入口脚本中fork
一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:
这里使用了 2 次fork
,所以最后fork
的一个子进程才是 Master 进程,其实一次fork
也是可以的。代码如下:
protected static function daemonize()
{
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif ($pid > 0) {
exit(0);
}
// 将当前进程提升为会话leader
if (-1 === posix_setsid()) {
exit("process setsid fail\n");
}
// 再次fork以避免SVR4这种系统终端再一次获取到进程控制
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif (0 !== $pid) {
exit(0);
}
}
通常在启动时增加-d
参数,表示进程将运行于守护态模式。
当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:
protected static function resetStdFd()
{
global $STDERR, $STDOUT;
//重定向标准输出和错误输出
@fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(static::$stdoutFile, 'a');
$STDERR = fopen(static::$stdoutFile, 'a');
}
为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid
文件。代码如下:
protected static function saveMasterPid()
{
// 保存pid以实现重载和停止
static::$_masterPid = posix_getpid();
if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
exit("can not save pid to" . static::$pidFile . "\n");
}
echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}
因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。
这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:
protected static function installSignal()
{
pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);
// 忽略信号
pcntl_signal(SIGUSR2, SIG_IGN, false);
pcntl_signal(SIGHUP, SIG_IGN, false);
}
protected static function signalHandler($signal)
{
switch($signal) {
case SIGINT:
case SIGTERM:
static::stop();
break;
case SIGQUIT:
case SIGUSR1:
static::reload();
break;
default: break;
}
}
其中,SIGINT 和 SIGTERM 信号会触发stop
操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload
操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill
掉。
Master 进程通过fork
系统调用,就能创建多个 Worker 进程。实现代码,如下:
protected static function forkOneWorker()
{
$pid = pcntl_fork();
// 父进程
if ($pid > 0) {
static::$_workers[] = $pid;
} else if ($pid === 0) { // 子进程
static::setProcessTitle('PHPServer: worker');
// 子进程会阻塞在这里
static::run();
// 子进程退出
exit(0);
} else {
throw new \Exception("fork one worker fail");
}
}
protected static function forkWorkers()
{
while(count(static::$_workers) < static::$workerCount) {
static::forkOneWorker();
}
}
Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:
对于 Worker 进程,run()
方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while
来模拟调度,实际中应该使用事件(Select 等)驱动。
public static function run()
{
// 模拟调度,实际用event实现
while (1) {
// 捕获信号
pcntl_signal_dispatch();
call_user_func(function() {
// do something
usleep(200);
});
}
}
其中,pcntl_signal_dispatch()
会在每次调度过程中,捕获信号并执行注册的信号处理器。
Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:
对于 Master 进程的调度,这里也使用了while
,但是引入了wait
的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。
protected static function monitor()
{
while (1) {
// 这两处捕获触发信号,很重要
pcntl_signal_dispatch();
// 挂起当前进程的执行直到一个子进程退出或接收到一个信号
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
pcntl_signal_dispatch();
if ($pid >= 0) {
// worker健康检查
static::checkWorkerAlive();
}
// 其他你想监控的
}
}
第两次的pcntl_signal_dispatch()
捕获信号,是由于wait
挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。
其中,PHPServer 的 停止 和 重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。
由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:
代码实现,如下:
protected static function checkWorkerAlive()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $index => $pid) {
if (!static::isAlive($pid)) {
unset(static::$_workers[$index]);
}
}
static::forkWorkers();
}
Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:
入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()
方法。如下:
protected static function stop()
{
// 主进程给所有子进程发送退出信号
if (static::$_masterPid === posix_getpid()) {
static::stopAllWorkers();
if (is_file(static::$pidFile)) {
@unlink(static::$pidFile);
}
exit(0);
} else { // 子进程退出
// 退出前可以做一些事
exit(0);
}
}
若是 Master 进程执行该方法,会先调用stopAllWorkers()
方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;
由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。
protected static function stopAllWorkers()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $workerPid) {
posix_kill($workerPid, SIGINT);
}
// 子进程退出异常,强制kill
usleep(1000);
if (static::isAlive($allWorkerPid)) {
foreach ($allWorkerPid as $workerPid) {
static::forceKill($workerPid);
}
}
// 清空worker实例
static::$_workers = array();
}
代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:
整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()
方法,reload()
方法调用stopAllWorkers()
方法。如下:
protected static function reload()
{
// 停止所有worker即可,master会自动fork新worker
static::stopAllWorkers();
}
reload()
方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。
你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。
到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:
$ php server.php
Usage: Commands [mode]
Commands:
start Start worker.
stop Stop worker.
reload Reload codes.
Options:
-d to start in DAEMON mode.
Use "--help" for more information about a command.
首先,我们启动它:
$ php server.php start -d
PHPServer start [OK]
其次,查看进程树,如下:
$ pstree -p
init(1)-+-init(3)---bash(4)
|-php(1286)-+-php(1287)
`-php(1288)
最后,我们把它停止:
$ php server.php stop
PHPServer stopping ...
PHPServer stop success
现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)
我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:
首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。
相关文章 »
天翊 评论了文章 · 2019-03-26
首发于 樊浩柏科学院
经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。
PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。
该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:
其中,主要涉及 3 个对象,分别为 入口脚本、Master 进程、Worker 进程。它们扮演的角色如下:
start
、stop
、reload
流程;整个过程,又包括 4 个流程:
fork
出一个 Master 进程;Master 进程先经过 保存 PID、注册信号处理器 操作,然后 创建 Worker 会fork
出多个 Worker 进程;在流程 ② 中,Worker 进程被 Master 进程fork
出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。
启动流程见 流程 ①,主要包括 守护进程、保存 PID、注册信号处理器、创建多进程 Worker 这 4 部分。
首先,在入口脚本中fork
一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:
这里使用了 2 次fork
,所以最后fork
的一个子进程才是 Master 进程,其实一次fork
也是可以的。代码如下:
protected static function daemonize()
{
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif ($pid > 0) {
exit(0);
}
// 将当前进程提升为会话leader
if (-1 === posix_setsid()) {
exit("process setsid fail\n");
}
// 再次fork以避免SVR4这种系统终端再一次获取到进程控制
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif (0 !== $pid) {
exit(0);
}
}
通常在启动时增加-d
参数,表示进程将运行于守护态模式。
当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:
protected static function resetStdFd()
{
global $STDERR, $STDOUT;
//重定向标准输出和错误输出
@fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(static::$stdoutFile, 'a');
$STDERR = fopen(static::$stdoutFile, 'a');
}
为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid
文件。代码如下:
protected static function saveMasterPid()
{
// 保存pid以实现重载和停止
static::$_masterPid = posix_getpid();
if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
exit("can not save pid to" . static::$pidFile . "\n");
}
echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}
因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。
这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:
protected static function installSignal()
{
pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);
// 忽略信号
pcntl_signal(SIGUSR2, SIG_IGN, false);
pcntl_signal(SIGHUP, SIG_IGN, false);
}
protected static function signalHandler($signal)
{
switch($signal) {
case SIGINT:
case SIGTERM:
static::stop();
break;
case SIGQUIT:
case SIGUSR1:
static::reload();
break;
default: break;
}
}
其中,SIGINT 和 SIGTERM 信号会触发stop
操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload
操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill
掉。
Master 进程通过fork
系统调用,就能创建多个 Worker 进程。实现代码,如下:
protected static function forkOneWorker()
{
$pid = pcntl_fork();
// 父进程
if ($pid > 0) {
static::$_workers[] = $pid;
} else if ($pid === 0) { // 子进程
static::setProcessTitle('PHPServer: worker');
// 子进程会阻塞在这里
static::run();
// 子进程退出
exit(0);
} else {
throw new \Exception("fork one worker fail");
}
}
protected static function forkWorkers()
{
while(count(static::$_workers) < static::$workerCount) {
static::forkOneWorker();
}
}
Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:
对于 Worker 进程,run()
方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while
来模拟调度,实际中应该使用事件(Select 等)驱动。
public static function run()
{
// 模拟调度,实际用event实现
while (1) {
// 捕获信号
pcntl_signal_dispatch();
call_user_func(function() {
// do something
usleep(200);
});
}
}
其中,pcntl_signal_dispatch()
会在每次调度过程中,捕获信号并执行注册的信号处理器。
Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:
对于 Master 进程的调度,这里也使用了while
,但是引入了wait
的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。
protected static function monitor()
{
while (1) {
// 这两处捕获触发信号,很重要
pcntl_signal_dispatch();
// 挂起当前进程的执行直到一个子进程退出或接收到一个信号
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
pcntl_signal_dispatch();
if ($pid >= 0) {
// worker健康检查
static::checkWorkerAlive();
}
// 其他你想监控的
}
}
第两次的pcntl_signal_dispatch()
捕获信号,是由于wait
挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。
其中,PHPServer 的 停止 和 重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。
由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:
代码实现,如下:
protected static function checkWorkerAlive()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $index => $pid) {
if (!static::isAlive($pid)) {
unset(static::$_workers[$index]);
}
}
static::forkWorkers();
}
Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:
入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()
方法。如下:
protected static function stop()
{
// 主进程给所有子进程发送退出信号
if (static::$_masterPid === posix_getpid()) {
static::stopAllWorkers();
if (is_file(static::$pidFile)) {
@unlink(static::$pidFile);
}
exit(0);
} else { // 子进程退出
// 退出前可以做一些事
exit(0);
}
}
若是 Master 进程执行该方法,会先调用stopAllWorkers()
方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;
由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。
protected static function stopAllWorkers()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $workerPid) {
posix_kill($workerPid, SIGINT);
}
// 子进程退出异常,强制kill
usleep(1000);
if (static::isAlive($allWorkerPid)) {
foreach ($allWorkerPid as $workerPid) {
static::forceKill($workerPid);
}
}
// 清空worker实例
static::$_workers = array();
}
代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:
整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()
方法,reload()
方法调用stopAllWorkers()
方法。如下:
protected static function reload()
{
// 停止所有worker即可,master会自动fork新worker
static::stopAllWorkers();
}
reload()
方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。
你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。
到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:
$ php server.php
Usage: Commands [mode]
Commands:
start Start worker.
stop Stop worker.
reload Reload codes.
Options:
-d to start in DAEMON mode.
Use "--help" for more information about a command.
首先,我们启动它:
$ php server.php start -d
PHPServer start [OK]
其次,查看进程树,如下:
$ pstree -p
init(1)-+-init(3)---bash(4)
|-php(1286)-+-php(1287)
`-php(1288)
最后,我们把它停止:
$ php server.php stop
PHPServer stopping ...
PHPServer stop success
现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)
我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:
首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。
相关文章 »
天翊 评论了文章 · 2019-03-26
首发于 樊浩柏科学院
经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。
PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。
该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:
其中,主要涉及 3 个对象,分别为 入口脚本、Master 进程、Worker 进程。它们扮演的角色如下:
start
、stop
、reload
流程;整个过程,又包括 4 个流程:
fork
出一个 Master 进程;Master 进程先经过 保存 PID、注册信号处理器 操作,然后 创建 Worker 会fork
出多个 Worker 进程;在流程 ② 中,Worker 进程被 Master 进程fork
出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。
启动流程见 流程 ①,主要包括 守护进程、保存 PID、注册信号处理器、创建多进程 Worker 这 4 部分。
首先,在入口脚本中fork
一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:
这里使用了 2 次fork
,所以最后fork
的一个子进程才是 Master 进程,其实一次fork
也是可以的。代码如下:
protected static function daemonize()
{
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif ($pid > 0) {
exit(0);
}
// 将当前进程提升为会话leader
if (-1 === posix_setsid()) {
exit("process setsid fail\n");
}
// 再次fork以避免SVR4这种系统终端再一次获取到进程控制
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif (0 !== $pid) {
exit(0);
}
}
通常在启动时增加-d
参数,表示进程将运行于守护态模式。
当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:
protected static function resetStdFd()
{
global $STDERR, $STDOUT;
//重定向标准输出和错误输出
@fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(static::$stdoutFile, 'a');
$STDERR = fopen(static::$stdoutFile, 'a');
}
为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid
文件。代码如下:
protected static function saveMasterPid()
{
// 保存pid以实现重载和停止
static::$_masterPid = posix_getpid();
if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
exit("can not save pid to" . static::$pidFile . "\n");
}
echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}
因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。
这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:
protected static function installSignal()
{
pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);
// 忽略信号
pcntl_signal(SIGUSR2, SIG_IGN, false);
pcntl_signal(SIGHUP, SIG_IGN, false);
}
protected static function signalHandler($signal)
{
switch($signal) {
case SIGINT:
case SIGTERM:
static::stop();
break;
case SIGQUIT:
case SIGUSR1:
static::reload();
break;
default: break;
}
}
其中,SIGINT 和 SIGTERM 信号会触发stop
操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload
操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill
掉。
Master 进程通过fork
系统调用,就能创建多个 Worker 进程。实现代码,如下:
protected static function forkOneWorker()
{
$pid = pcntl_fork();
// 父进程
if ($pid > 0) {
static::$_workers[] = $pid;
} else if ($pid === 0) { // 子进程
static::setProcessTitle('PHPServer: worker');
// 子进程会阻塞在这里
static::run();
// 子进程退出
exit(0);
} else {
throw new \Exception("fork one worker fail");
}
}
protected static function forkWorkers()
{
while(count(static::$_workers) < static::$workerCount) {
static::forkOneWorker();
}
}
Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:
对于 Worker 进程,run()
方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while
来模拟调度,实际中应该使用事件(Select 等)驱动。
public static function run()
{
// 模拟调度,实际用event实现
while (1) {
// 捕获信号
pcntl_signal_dispatch();
call_user_func(function() {
// do something
usleep(200);
});
}
}
其中,pcntl_signal_dispatch()
会在每次调度过程中,捕获信号并执行注册的信号处理器。
Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:
对于 Master 进程的调度,这里也使用了while
,但是引入了wait
的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。
protected static function monitor()
{
while (1) {
// 这两处捕获触发信号,很重要
pcntl_signal_dispatch();
// 挂起当前进程的执行直到一个子进程退出或接收到一个信号
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
pcntl_signal_dispatch();
if ($pid >= 0) {
// worker健康检查
static::checkWorkerAlive();
}
// 其他你想监控的
}
}
第两次的pcntl_signal_dispatch()
捕获信号,是由于wait
挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。
其中,PHPServer 的 停止 和 重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。
由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:
代码实现,如下:
protected static function checkWorkerAlive()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $index => $pid) {
if (!static::isAlive($pid)) {
unset(static::$_workers[$index]);
}
}
static::forkWorkers();
}
Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:
入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()
方法。如下:
protected static function stop()
{
// 主进程给所有子进程发送退出信号
if (static::$_masterPid === posix_getpid()) {
static::stopAllWorkers();
if (is_file(static::$pidFile)) {
@unlink(static::$pidFile);
}
exit(0);
} else { // 子进程退出
// 退出前可以做一些事
exit(0);
}
}
若是 Master 进程执行该方法,会先调用stopAllWorkers()
方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;
由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。
protected static function stopAllWorkers()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $workerPid) {
posix_kill($workerPid, SIGINT);
}
// 子进程退出异常,强制kill
usleep(1000);
if (static::isAlive($allWorkerPid)) {
foreach ($allWorkerPid as $workerPid) {
static::forceKill($workerPid);
}
}
// 清空worker实例
static::$_workers = array();
}
代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:
整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()
方法,reload()
方法调用stopAllWorkers()
方法。如下:
protected static function reload()
{
// 停止所有worker即可,master会自动fork新worker
static::stopAllWorkers();
}
reload()
方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。
你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。
到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:
$ php server.php
Usage: Commands [mode]
Commands:
start Start worker.
stop Stop worker.
reload Reload codes.
Options:
-d to start in DAEMON mode.
Use "--help" for more information about a command.
首先,我们启动它:
$ php server.php start -d
PHPServer start [OK]
其次,查看进程树,如下:
$ pstree -p
init(1)-+-init(3)---bash(4)
|-php(1286)-+-php(1287)
`-php(1288)
最后,我们把它停止:
$ php server.php stop
PHPServer stopping ...
PHPServer stop success
现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)
我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:
首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。
相关文章 »
天翊 发布了文章 · 2019-03-25
首发于 樊浩柏科学院
假若,你是某个国内电商平台的商品中心项目负责人。突然今天,接到了一个这样的需求:商品在原人民币价格的基础架构上,须支持卢比(印度)价格。
需求点,可以描述为:
同样这个需求,定了以下两个硬指标:
首先,我们必须承认的是,这确实是个简单的需求,但这也是个够坑爹的需求。主要遇到的问题如下:
为了实现快速上线,我们在原人民币的商品价格基础架构上,只能进行少量且合适的改造。所以,最后我们的改造方向为:尽量只改造商品价格源头系统,即商品中心,其他上层系统尽量不改动。
改造商品中心,商品价格支持卢比。可行的改造方案有 2 种:
1、数据表价格字段存卢比
将原人名币价格相关的数据表字段,存卢比值,数据表并新增人名币字段。
2、接口输出数据时转化为卢比
原人名币相关的数据表字段依然存人民币值,在接口输出数据时,将价格相关字段值转化为卢比。
针对以上方案,我们需要注意 2 个问题:
上述 方案 ①,商品中心只需改造数据表。然后每天根据汇率刷新商品价格,原价格字段就都变成了卢比。方案相对简单,也容易操作,但缺点是:对任然需要人民币价格的系统,即商品管理系统须改造。
方案 ②,需要改造商品中心业务逻辑。由于涉及的价格字段较多,改造较复杂,主要优点是:汇率变动对商品价格影响较小,且可拓展支持多币种价格(可以根据地区标识,获取相应的商品价格)。
最终,为了系统的可扩展性,我们选择了方案 ②。
这里主要改造了商品中心,主要解决 透传地区标识 和 支持多币种价格 这 2 个问题。
我们的业务系统主要分为 API 和 Service 项目,API 暴露出 HTTP 接口,API 与 Service 和 Service 与 Service 之前使用 RPC 接口通信。由于商品中心涉及到价格的接口繁多,不可能对每个接口都增加地区标识的参数。所以我们弄了一套调用链路透传地区标识的机制。
思路就是,先将地区标识放在全局上下文中,API 接口通过 Header 头X-Location
携带地区标识;而对于 RPC 接口,我们的 RPC 框架已支持了 Context,不需要改造。
由于 RPC 框架已支持了 Context,所以 API 和 RPC 接口透传全局上下文略有不同。实现如下:
class Location
{
public static function init()
{
global $context;
if (empty($context['location'])) {
return;
}
// API在这里直接获取X-Location头
if (!empty($_SERVER['HTTP_X_LOCATION'])) {
$context['location'] = $_SERVER['HTTP_X_LOCATION'];
}
// RPC Server会自动获取Context
}
}
上述init()
方法,需要在项目入口位置初始化。
其中,RPC 接口不需要操作全局上下文。因为 RPC Client 在调用时会自动获取全局变量$context
值并在 RPC 协议数据中追加 Context,同时 RPC Server 在收到请求时会自动获取 RPC 协议数据中的 Context 值并设置全局变量$context
。
RPC Client 传递 Context 实现如下:
protected function addGlobalContext($data)
{
global $context;
$context = !is_array($context) ? array() : $context;
// data为待请求的RPC协议数据
$data['Context'] = $context;
return $data;
}
RPC Server 获取 Context 实现如下:
public function getGlobalContext($packet)
{
global $context;
$context = array();
// packet为接收的RPC协议数据
if(isset($packet['Context'])) {
$context = $packet['Context'];
}
}
当设置了 Context 后,RPC 通信时协议数据会携带location
字段,内容如下:
RPC
325
{"data":"{\"version\":\"1.0\",\"user\":\"xxx\",\"password\":\"xxx\",\"timestamp\":1553225486.5455,\"class\":\"xxx\",\"method\":\"xxx\",\"params\":[1]}","signature":"xxx","Context":{"location":"india"}}
到这里,我们只需要在全局上下文设置地区标识即可。一旦我们设置了地区标识,所有业务系统就会在本次的调用链路中透传这个地区标识。实现如下:
class Location
{
public static function set($location)
{
global $context;
$context['location'] = $location;
// API需要在这里单独设置X-Location头
header('X-Location: ' . $context['location']);
}
}
设置了地区标识后,就可以在本次调用链路的所有业务系统中直接获取。实现如下:
class Location
{
public static function get()
{
global $context;
if (!isset($context['location'])) {
return 'china';
}
return $context['location'];
}
}
有了地区标识后,商品中心服务就可以根据地区标识对价格字段进行转化了。因为设计到价格的数据表和价格字段较多,这里直接从数据层(Model)进行改造。
下述的ReadBase
类是所有数据表 Model 的基类,所有获取数据表数据的方法都继承或调用自getOne()
和getAll()
方法,所以我们只需要改造这两个方法。
class ReadBase
{
public function getOne(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryRow();
return $this->getExchangePrice($data);
}
public function getAll(array $cond, $fields)
{
$data = $this->getReader()->select($this->getFields($fields))->from($this->getTableName())->where($cond)->queryAll();
if ($data) {
foreach ($data as &$one) {
$this->getExchangePrice($one);
}
}
return $data;
}
}
由于涉及到价格字段名字较多,且具有不确定性,所以这里使用后缀方式匹配。为了防止一些字段命名不规范,这里引入了黑名单机制。
protected function isExchangeField($field)
{
$priceSuffix = array('cost', '_price');
$black = array();
$len = strlen($field) ;
foreach ($priceSuffix as $suffix) {
$lastPos = $len - strlen($suffix);
// 非黑名单且非is_
if (!in_array($field, $black)
&& false === strpos($field, 'is_')
&& $lastPos === strpos($field, $suffix)
) {
return true;
}
}
return false;
}
前缀为is_
的字段一般定义为标识字段,默认为非价格字段。
上述getExchangePrice()
方法,用来根据地区标识转化价格覆盖到原价格字段,并自增以_origin
后缀的人民币价格字段。
public function getExchangePrice(&$data)
{
if (empty($data)) {
return $data;
}
$originPrice = array();
foreach ($data as $field => &$value) {
// 是否是价格字段
if ($this->isExchangeField($field)) {
$originField = $field . '_origin';
$originPrice[$originField] = $value;
// 获取对应地区的价格
$value = $this->getExchangePrice($value);
}
}
$data = array_merge($originPrice, $data);
return $data;
}
public static function getExchangePrice($price)
{
// 获取地区标识
$location = Location::get();
// 汇率
$exchangeRateConfig = \Config::$exchangeRate;
if ($location === 'china') {
return $price;
} else if (isset($exchangeRateConfig[$location])) {
$exchangeRate = $exchangeRateConfig[$location];
} else {
throw new \BusinessException("not found $location exchange rate");
}
// 向上取值并保留两位小数
$exchangePrice = bcmul($price, $exchangeRate, 3);
return number_format(ceil($exchangePrice * 100) / 100, 2, '.', '');
}
其中,getExchangePrice()
方法会调用Location::get()
获取地区标识,并根据汇率计算实时价格。
最终,商品中心改造后,得到的部分商品价格信息,如下:
# 人民币价格10,汇率10.87
market_price: 108.7
market_price_origin: 10
对于所有 API 的项目,我们只需要让客户端在所有的请求中增加X-Location
头即可。
GET /product/detail/1 HTTP/1.1
Request Headers
X-Location: india
API 项目需在入口文件处,初始化地区标识。如下:
Location::init();
对于商品管理系统,我们为了方便运营操作,所有商品价格都应以人民币。因此,我们只需要初始化地区标识为中国,如下:
Location::init();
// 地区设置为中国
Location::set('china');
为了实现需求很容易,但是要做到合理且快速却不简单。本文的实现的方案,避免了很多坑,但同时也可能又埋下了一些坑。没有一套方案是万能的,慢慢去优化吧!
查看原文赞 10 收藏 5 评论 5
天翊 发布了文章 · 2019-03-25
首发于 樊浩柏科学院
在 负载均衡算法 — 轮询 一文中,我们就指出了加权轮询算法一个明显的缺陷。即在某些特殊的权重下,加权轮询调度会生成不均匀的实例序列,这种不平滑的负载可能会使某些实例出现瞬时高负载的现象,导致系统存在宕机的风险。为了解决这个调度缺陷,就提出了 平滑加权轮询 调度算法。
为了说明平滑加权轮询调度的平滑性,使用以下 3 个特殊的权重实例来演示调度过程。
服务实例 | 权重值 |
---|---|
192.168.10.1:2202 | 5 |
192.168.10.2:2202 | 1 |
192.168.10.3:2202 | 1 |
我们已经知道通过 加权轮询 算法调度后,会生成如下不均匀的调度序列。
请求 | 选中的实例 |
---|---|
1 | 192.168.10.1:2202 |
2 | 192.168.10.1:2202 |
3 | 192.168.10.1:2202 |
4 | 192.168.10.1:2202 |
5 | 192.168.10.1:2202 |
6 | 192.168.10.2:2202 |
7 | 192.168.10.3:2202 |
接下来,我们就使用平滑加权轮询算法调度上述实例,看看生成的实例序列如何?
假设有 N 台实例 S = {S1, S2, …, Sn},配置权重 W = {W1, W2, …, Wn},有效权重 CW = {CW1, CW2, …, CWn}。每个实例 i 除了存在一个配置权重 Wi 外,还存在一个当前有效权重 CWi,且 CWi 初始化为 Wi;指示变量 currentPos 表示当前选择的实例 ID,初始化为 -1;所有实例的配置权重和为 weightSum;
那么,调度算法可以描述为:
1、初始每个实例 i 的 当前有效权重 CWi 为 配置权重 Wi,并求得配置权重和 weightSum;
2、选出 当前有效权重最大 的实例,将 当前有效权重 CWi 减去所有实例的 权重和 weightSum,且变量 currentPos 指向此位置;
3、将每个实例 i 的 当前有效权重 CWi 都加上 配置权重 Wi;
4、此时变量 currentPos 指向的实例就是需调度的实例;
5、每次调度重复上述步骤 2、3、4;
上述 3 个服务,配置权重和 weightSum 为 7,其调度过程如下:
请求 | 选中前的当前权重 | currentPos | 选中的实例 | 选中后的当前权重 |
---|---|---|---|---|
1 | {5, 1, 1} | 0 | 192.168.10.1:2202 | {-2, 1, 1} |
2 | {3, 2, 2} | 0 | 192.168.10.1:2202 | {-4, 2, 2} |
3 | {1, 3, 3} | 1 | 192.168.10.2:2202 | {1, -4, 3} |
4 | {6, -3, 4} | 0 | 192.168.10.1:2202 | {-1, -3, 4} |
5 | {4, -2, 5} | 2 | 192.168.10.3:2202 | {4, -2, -2} |
6 | {9, -1, -1} | 0 | 192.168.10.1:2202 | {2, -1, -1} |
7 | {7, 0, 0} | 0 | 192.168.10.1:2202 | {0, 0, 0} |
8 | {5, 1, 1} | 0 | 192.168.10.1:2202 | {-2, 1, 1} |
可以看出上述调度序列分散是非常均匀的,且第 8 次调度时当前有效权重值又回到 {0, 0, 0},实例的状态同初始状态一致,所以后续可以一直重复调度操作。
此轮询调度算法思路首先被 Nginx 开发者提出,见 phusion/nginx 部分。
这里使用 PHP 来实现,源码见 fan-haobai/load-balance 部分。
class SmoothWeightedRobin implements RobinInterface
{
private $services = array();
private $total;
private $currentPos = -1;
public function init(array $services)
{
foreach ($services as $ip => $weight) {
$this->services[] = [
'ip' => $ip,
'weight' => $weight,
'current_weight' => $weight,
];
}
$this->total = count($this->services);
}
public function next()
{
// 获取最大当前有效权重实例的位置
$this->currentPos = $this->getMaxCurrentWeightPos();
// 当前权重减去权重和
$currentWeight = $this->getCurrentWeight($this->currentPos) - $this->getSumWeight();
$this->setCurrentWeight($this->currentPos, $currentWeight);
// 每个实例的当前有效权重加上配置权重
$this->recoverCurrentWeight();
return $this->services[$this->currentPos]['ip'];
}
}
其中,getSumWeight()
为所有实例的配置权重和;getCurrentWeight()
和 setCurrentWeight()
分别用于获取和设置指定实例的当前有效权重;getMaxCurrentWeightPos()
求得最大当前有效权重的实例位置,实现如下:
public function getMaxCurrentWeightPos()
{
$currentWeight = $pos = 0;
foreach ($this->services as $index => $service) {
if ($service['current_weight'] > $currentWeight) {
$currentWeight = $service['current_weight'];
$pos = $index;
}
}
return $pos;
}
recoverCurrentWeight()
用于调整每个实例的当前有效权重,即加上配置权重,实现如下:
public function recoverCurrentWeight()
{
foreach ($this->services as $index => &$service) {
$service['current_weight'] += $service['weight'];
}
}
需要注意的是,在配置services
服务列表时,同样需要指定其权重:
$services = [
'192.168.10.1:2202' => 5,
'192.168.10.2:2202' => 1,
'192.168.10.3:2202' => 1,
];
可惜的是,关于此调度算法严谨的数学证明少之又少,不过网友 tenfy 给出的 安大神 证明过程,非常值得参考和学习。
假如有 n 个结点,记第 i 个结点的权重是 $x_i$,设总权重为 $S = x_1 + x_2 + … + x_n$。选择分两步:
1、为每个节点加上它的权重值;
2、选择最大的节点减去总的权重值;
n 个节点的初始化值为 [0, 0, …, 0],数组长度为 n,值都为 0。第一轮选择的第 1 步执行后,数组的值为 $[x_1, x_2, …, x_n]$。
假设第 1 步后,最大的节点为 j,则第 j 个节点减去 S。
所以第 2 步的数组为 $[x_1, x_2, …, x_j-S, …, x_n]$。 执行完第 2 步后,数组的和为:
$x_1 + x_2 + … + x_j-S + … + x_n => x_1 + x_2 + … + x_n - S = S - S = 0$
由此可见,每轮选择第 1 步操作都是数组的总和加上 S,第 2 步总和再减去 S,所以每轮选择完后的数组总和都为 0。
假设总共执行 S 轮选择,记第 i 个结点选择 $m_i$ 次。第 i 个结点的当前权重为 $w_i$。 假设节点 j 在第 t 轮(t < S)之前,已经被选择了 $x_j$ 次,记此时第 j 个结点的当前权重为 $w_j = t \* x_j - x_j \* S = (t - S) \* x_j < 0$, 因为 t 恒小于 S,所以 $w_j < 0$。
前面假设总共执行 S 轮选择,则剩下 S-t 轮 j 都不会被选中,上面的公式 $w_j = (t - S) \* x_j + (S - t) \* x_j = 0$。 所以在剩下的选择中,$w_j$ 永远小于等于 0,由于上面已经证明任何一轮选择后,数组总和都为 0,则必定存在一个节点 k 使得 $w_k > 0$,永远不会再选中节点 j。
由此可以得出,第 i 个结点最多被选中 $x_i$ 次,即 $m_i <= x_i$。
因为 $S = m_1 + m_2 + … + m_n$ 且 $S = x_1 + x_2 + … + x_n$。 所以,可以得出 $m_i == x_i$。
证明平滑性,只要证明不要一直都是连续选择那一个节点即可。
跟上面一样,假设总权重为 S,假如某个节点 i 连续选择了 t($t < x_i$) 次,只要存在下一次选择的不是节点 i,即可证明是平滑的。
假设 $t = x_i - 1$,此时第 i 个结点的当前权重为 $w_i = t \* x_i - t \* S = (x_i - 1) \* x_i - (x_i - 1) \* S$。证明下一轮的第 1 步执行完的值 $w_i + x_i$ 不是最大的即可。
$w_i + x_i => (x_i - 1) \* x_i - (x_i - 1) \* S + x_i =>$
$x_i^2 - x_i \* S + S => (x_i - 1) \* (x_i - S) + x_i$
因为 $x_i$ 恒小于 S,所以 $x_i - S <= -1$。 所以上面:
$(x_i - 1) \* (x_i - S) + x_i <= (x_i - 1) \* -1 + x_i = -x_i + 1 + x_i = 1$
所以第 t 轮后,再执行完第 1 步的值 $w_i + x_i <= 1$。
如果这 t 轮刚好是最开始的 t 轮,则必定存在另一个结点 j 的值为 $x_j \* t$,所以有 $w_i + x_i <= 1 < 1 \* t < x_j \* t$。所以下一轮肯定不会选中 i。
尽管,平滑加权轮询算法改善了加权轮询算法调度的缺陷,即调度序列分散的不均匀,避免了实例负载突然加重的可能,但是仍然不能动态感知每个实例的负载。
若由于实例权重配置不合理,或者一些其他原因加重系统负载的情况,平滑加权轮询都无法实现每个实例的负载均衡,这时就需要 有状态 的调度算法来完成。
相关文章 »
赞 43 收藏 29 评论 2
天翊 发布了文章 · 2019-03-25
首发于 樊浩柏科学院
经过 用 PHP 玩转进程之一 — 基础 的回顾复习,我们已经掌握了进程的基础知识,现在可以尝试用 PHP 做一些简单的进程控制和管理,来加深我们对进程的理解。接下来,我将用多进程模型实现一个简单的 PHPServer,基于它你可以做任何事。
PHPServer 完整的源代码,可前往 fan-haobai/php-server 获取。
该 PHPServer 的 Master 和 Worker 进程主要控制流程,如下图所示:
其中,主要涉及 3 个对象,分别为 入口脚本、Master 进程、Worker 进程。它们扮演的角色如下:
start
、stop
、reload
流程;整个过程,又包括 4 个流程:
fork
出一个 Master 进程;Master 进程先经过 保存 PID、注册信号处理器 操作,然后 创建 Worker 会fork
出多个 Worker 进程;在流程 ② 中,Worker 进程被 Master 进程fork
出来后,就会 持续运行 并阻塞于此,只有 Master 进程才会继续后续的流程。
启动流程见 流程 ①,主要包括 守护进程、保存 PID、注册信号处理器、创建多进程 Worker 这 4 部分。
首先,在入口脚本中fork
一个子进程,然后该进程退出,并设置新的子进程为会话组长,此时的这个子进程就会脱离当前终端的控制。如下图所示:
这里使用了 2 次fork
,所以最后fork
的一个子进程才是 Master 进程,其实一次fork
也是可以的。代码如下:
protected static function daemonize()
{
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif ($pid > 0) {
exit(0);
}
// 将当前进程提升为会话leader
if (-1 === posix_setsid()) {
exit("process setsid fail\n");
}
// 再次fork以避免SVR4这种系统终端再一次获取到进程控制
$pid = pcntl_fork();
if (-1 === $pid) {
exit("process fork fail\n");
} elseif (0 !== $pid) {
exit(0);
}
}
通常在启动时增加-d
参数,表示进程将运行于守护态模式。
当顺利成为一个守护进程后,Master 进程已经脱离了终端控制,所以有必要关闭标准输出和标准错误输出。如下:
protected static function resetStdFd()
{
global $STDERR, $STDOUT;
//重定向标准输出和错误输出
@fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(static::$stdoutFile, 'a');
$STDERR = fopen(static::$stdoutFile, 'a');
}
为了实现 PHPServer 的重载或停止,我们需要将 Master 进程的 PID 保存于 PID 文件中,如php-server.pid
文件。代码如下:
protected static function saveMasterPid()
{
// 保存pid以实现重载和停止
static::$_masterPid = posix_getpid();
if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
exit("can not save pid to" . static::$pidFile . "\n");
}
echo "PHPServer start\t \033[32m [OK] \033[0m\n";
}
因为守护进程一旦脱离了终端控制,就犹如一匹脱缰的野马,任由其奔腾可能会为所欲为,所以我们需要去驯服它。
这里使用信号来实现进程间通信并控制进程的行为,注册信号处理器如下:
protected static function installSignal()
{
pcntl_signal(SIGINT, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGTERM, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGUSR1, array('\PHPServer\Worker', 'signalHandler'), false);
pcntl_signal(SIGQUIT, array('\PHPServer\Worker', 'signalHandler'), false);
// 忽略信号
pcntl_signal(SIGUSR2, SIG_IGN, false);
pcntl_signal(SIGHUP, SIG_IGN, false);
}
protected static function signalHandler($signal)
{
switch($signal) {
case SIGINT:
case SIGTERM:
static::stop();
break;
case SIGQUIT:
case SIGUSR1:
static::reload();
break;
default: break;
}
}
其中,SIGINT 和 SIGTERM 信号会触发stop
操作,即终止所有进程;SIGQUIT 和 SIGUSR1 信号会触发reload
操作,即重新加载所有 Worker 进程;此处忽略了 SIGUSR2 和 SIGHUP 信号,但是并未忽略 SIGKILL 信号,即所有进程都可以被强制kill
掉。
Master 进程通过fork
系统调用,就能创建多个 Worker 进程。实现代码,如下:
protected static function forkOneWorker()
{
$pid = pcntl_fork();
// 父进程
if ($pid > 0) {
static::$_workers[] = $pid;
} else if ($pid === 0) { // 子进程
static::setProcessTitle('PHPServer: worker');
// 子进程会阻塞在这里
static::run();
// 子进程退出
exit(0);
} else {
throw new \Exception("fork one worker fail");
}
}
protected static function forkWorkers()
{
while(count(static::$_workers) < static::$workerCount) {
static::forkOneWorker();
}
}
Worker 进程的持续运行,见 流程 ③ 。其内部调度流程,如下图:
对于 Worker 进程,run()
方法主要执行具体业务逻辑,当然 Worker 进程会被阻塞于此。对于 任务 ① 这里简单地使用while
来模拟调度,实际中应该使用事件(Select 等)驱动。
public static function run()
{
// 模拟调度,实际用event实现
while (1) {
// 捕获信号
pcntl_signal_dispatch();
call_user_func(function() {
// do something
usleep(200);
});
}
}
其中,pcntl_signal_dispatch()
会在每次调度过程中,捕获信号并执行注册的信号处理器。
Master 进程的持续监控,见 流程 ② 。其内部调度流程,如下图:
对于 Master 进程的调度,这里也使用了while
,但是引入了wait
的系统调用,它会挂起当前进程,直到一个子进程退出或接收到一个信号。
protected static function monitor()
{
while (1) {
// 这两处捕获触发信号,很重要
pcntl_signal_dispatch();
// 挂起当前进程的执行直到一个子进程退出或接收到一个信号
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
pcntl_signal_dispatch();
if ($pid >= 0) {
// worker健康检查
static::checkWorkerAlive();
}
// 其他你想监控的
}
}
第两次的pcntl_signal_dispatch()
捕获信号,是由于wait
挂起时间可能会很长,而这段时间可能恰恰会有信号,所以需要再次进行捕获。
其中,PHPServer 的 停止 和 重载 操作是由信号触发,在信号处理器中完成具体操作;Worker 进程的健康检查 会在每一次的调度过程中触发。
由于 Worker 进程执行繁重的业务逻辑,所以可能会异常崩溃。因此 Master 进程需要监控 Worker 进程健康状态,并尝试维持一定数量的 Worker 进程。健康检查流程,如下图:
代码实现,如下:
protected static function checkWorkerAlive()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $index => $pid) {
if (!static::isAlive($pid)) {
unset(static::$_workers[$index]);
}
}
static::forkWorkers();
}
Master 进程的持续监控,见 流程 ④ 。其详细流程,如下图:
入口脚本给 Master 进程发送 SIGINT 信号,Master 进程捕获到该信号并执行 信号处理器,调用stop()
方法。如下:
protected static function stop()
{
// 主进程给所有子进程发送退出信号
if (static::$_masterPid === posix_getpid()) {
static::stopAllWorkers();
if (is_file(static::$pidFile)) {
@unlink(static::$pidFile);
}
exit(0);
} else { // 子进程退出
// 退出前可以做一些事
exit(0);
}
}
若是 Master 进程执行该方法,会先调用stopAllWorkers()
方法,向所有的 Worker 进程发送 SIGINT 信号并等待所有 Worker 进程终止退出,再清除 PID 文件并退出。有一种特殊情况,Worker 进程退出超时时,Master 进程则会再次发送 SIGKILL 信号强制杀死所有 Worker 进程;
由于 Master 进程会发送 SIGINT 信号给 Worker 进程,所以 Worker 进程也会执行该方法,并会直接退出。
protected static function stopAllWorkers()
{
$allWorkerPid = static::getAllWorkerPid();
foreach ($allWorkerPid as $workerPid) {
posix_kill($workerPid, SIGINT);
}
// 子进程退出异常,强制kill
usleep(1000);
if (static::isAlive($allWorkerPid)) {
foreach ($allWorkerPid as $workerPid) {
static::forceKill($workerPid);
}
}
// 清空worker实例
static::$_workers = array();
}
代码发布后,往往都需要进行重新加载。其实,重载过程只需要重启所有 Worker 进程即可。流程如下图:
整个过程共有 2 个流程,流程 ① 终止所有的 Worker 进程,流程 ② 为 Worker 进程的健康检查 。其中流程 ① ,入口脚本给 Master 进程发送 SIGUSR1 信号,Master 进程捕获到该信号,执行信号处理器调用reload()
方法,reload()
方法调用stopAllWorkers()
方法。如下:
protected static function reload()
{
// 停止所有worker即可,master会自动fork新worker
static::stopAllWorkers();
}
reload()
方法只会在 Master 进程中执行,因为 SIGQUIT 和 SIGUSR1 信号不会发送给 Worker 进程。
你可能会纳闷,为什么我们需要重启所有的 Worker 进程,而这里只是停止了所有的 Worker 进程?这是因为,在 Worker 进程终止退出后,由于 Master 进程对 Worker 进程的健康检查 作用,会自动重新创建所有 Worker 进程。
到这里,我们已经完成了一个多进程 PHPServer。我们来体验一下:
$ php server.php
Usage: Commands [mode]
Commands:
start Start worker.
stop Stop worker.
reload Reload codes.
Options:
-d to start in DAEMON mode.
Use "--help" for more information about a command.
首先,我们启动它:
$ php server.php start -d
PHPServer start [OK]
其次,查看进程树,如下:
$ pstree -p
init(1)-+-init(3)---bash(4)
|-php(1286)-+-php(1287)
`-php(1288)
最后,我们把它停止:
$ php server.php stop
PHPServer stopping ...
PHPServer stop success
现在,你是不是感觉进程控制其实很简单,并没有我们想象的那么复杂。( ̄┰ ̄*)
我们已经实现了一个简易的多进程 PHPServer,模拟了进程的管理与控制。需要说明的是,Master 进程可能偶尔也会异常地崩溃,为了避免这种情况的发生:
首先,我们不应该给 Master 进程分配繁重的任务,它更适合做一些类似于调度和管理性质的工作;
其次,可以使用 Supervisor 等工具来管理我们的程序,当 Master 进程异常崩溃时,可以再次尝试被拉起,避免 Master 进程异常退出的情况发生。
相关文章 »
赞 34 收藏 25 评论 10
天翊 评论了文章 · 2019-03-25
首发于 樊浩柏科学院
2017 年是自如快速增长的一年,自如客突破 100 万,管理资产达到 50 万间,在年底成功获得了 40 亿 A 轮融资,而这些都要感谢广大的自如客,公司为了回馈自如客,在六周年活动时就发放了 6000 万租住基金,当然年底散币活动也够疯狂。
既然公司对自如客这么阔,那对我们员工也得够意思,所以年底我们共准备了 3 个活动。
1、针对 自如客 的服务费减免活动;
2、针对 自如客 的 1000 万现金礼包;
3、25 万的 员工 红包活动;
散币活动 2 和 3 是通过微信红包形式进行,想散币就散吧,可微信告诉我们,想散币还得交税(>﹏<)。员工红包来说,25 万要交掉 10 多万税,此时心疼我的钱。好了,下面开始说点正事。
说到红包,我们肯定会想到红包拆分和抢红包两个场景。红包拆分是指将指定金额拆分为指定数目红包的过程,即是用来确定每个红包的金额数;而抢红包就是典型的高并发场景,需要避免红包超发的情况。
拆分方式
1、实时拆分
实时拆分,指的是在抢红包时实时计算每个红包的金额,以实现红包的拆分过程,对系统性能和拆分算法要求较高,例如拆分过程要一直保证后续待拆分红包的金额不能为空,不容易做到拆分红包的金额服从正态分布规律。
2、预先生成
预先生成,指的是在红包开抢之前已经完成了红包的拆分,抢红包时只是依次取出拆分好的红包金额,对拆分算法要求较低,可以拆分出随机性很好的红包金额,通常需要结合队列使用。
拆分算法
我并没有找到业界的通用算法,但红包拆分算法应该是拆分金额要看起来随机,最好能够服从正态分布,可以参考 微信 和 @lcode 提供的红包拆分算法。
微信拆分算法的优点是算法较简单,拆分效率高,同时,由于该算法天然的特性,可以保证后续红包金额一定不为空,特别适合实时拆分场景,但缺点是会导致大额红包较大概率地在拆分的最后出现。 @lcode 拆分算法的优点是拆分金额基本符合正态分布,适合随机性要求较高的拆分场景。
我们这次的业务对红包金额的随机性要求不高,但是对系统可靠性要求较高,所以我们选用了预算生成方式,使用 二倍均值法 的红包拆分算法,作为我们的红包拆分方案。
采用预算生成方式,我们预先生成红包并放入 Redis 的 List 中,当抢红包时只是 Pop List 即可,具体实现将在 抢红包 部分介绍。
拆分算法可以描述为:假设剩余拆分金额为 M,剩余待拆分红包个数为 N,红包最小金额为 1 元,红包最小单位为元,那么定义当前红包的金额为:
$$m = rand(1, floor(M/N*2))$$
其中,floor 表示向下取整,rand(min, max) 表示从 [min, max] 区间随机一个值。$M/N \ast 2$ 表示剩余待拆分金额平均金额的 2 倍,因为 N >= 2,所以 $M/N \ast 2 <= M$,表示一定能保证后续红包能拆分到金额。
代码实现为:
for ($i = 0; $i < $N - 1; $i++) {
$max = (int)floor($M / ($N - $i)) * 2;
$m[$i] = $max ? mt_rand(1, $max) : 0;
$M -= $m[$i];
}
$m[] = $M;
值得一提的是,我们为了保证红包金额差异尽量小,先将总金额平均拆分成 N+1 份,将第 N+1 份红包按照上述的红包拆分算法拆分成 N 份,这 N 份红包加上之前的平均金额才作为最终的红包金额。
限流
1、前端限流
前端限制用户在 n 秒之内只能提交一次请求,虽然这种方式只能挡住小白,不过这是 99% 的用户哟,所以也必须得做。
2、后端限流
常用的后端限流方法有 漏桶算法 和 令牌桶算法。漏桶算法 主要目的是控制请求数据注入的速率,如果此时漏桶溢出,后续的请求数据会被丢弃。而 令牌桶算法 是以一个恒定的速度往桶里放入令牌,而如果请求数据需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌时,这些请求才被丢弃,令牌桶算法的一个好处是可以方便地改变应用接受请求的速率。
防超发
1、库存加锁
可以通过加锁的方式解决资源抢占问题,但是加锁会增加系统开销,大流量下更容易拖垮系统,不过可以尝试一下基于版本号的乐观锁。
2、通过高速队列串行化请求
之所会出现超发问题,是因为并发时会出现多个进程同时获取同一资源的现象,如果使用高速队列将并行请求串行化,那么问题就不存在了。高速队列可以使用 Redis 缓存服务器来实现,当然光使用队列还不够,必要保证整个流程调用链要短、要快,否则队列会积压严重,甚至会拖垮整个服务。
在限流方面,由于我们预估的请求量还在系统承受范围,所以没有考虑引入后端限流方案。我们的抢红包系统流程图如下:
我们将抢红包拆分为 红包占有(流程①,同步) 和 红包发放 (流程②,异步)这两个过程,首先采用高速队列串行化请求,红包发放逻辑由一组 Worker 异步去完成。高速队列只是完成红包占有的过程,实现库存的控制,Worker 则处理耗时较长的红包发放过程。
当然,在实际应用中,红包占用过程还需要加上一些前置规则校验,比如用户是否已经领取过,领取次数是否已经达到上限等?红包占有流程图如下:
其中,red::list
为 List 结构,存放预先生成的红包金额(流程①中的红包队列);red::task
也为 List 结构,红包异步发放队列(流程②中的任务队列);red::draw
为 Hash 结构,存放红包领取记录,field
为用户的 openid,value
为序列化的红包信息;red::draw_count:u:openid
为 k-v 结构,用户领取红包计数器。
下面,我将以以下 3 个问题为中心,来说说我们设计出的抢红包系统。
1、怎么保证不超发
我们需要关注的是红包占有过程,从红包占有流程图可看出,这个过程是很多 Key 操作的组合,那怎么保证原子性?可以使用 Redis 事务,但我们选用了 Lua 方案,一方面是因为首先要保证性能,而 Lua 脚本嵌入 Redis 执行不存在性能瓶颈,另一方面 Lua 脚本执行时本身就是原子性的,满足需求。
红包占有的 Lua 脚本实现如下:
-- 领取人的openid为xxxxxxxxxxx
local openid = 'xxxxxxxxxxx'
local isDraw = redis.call('HEXISTS', 'red::draw', openid)
-- 已经领取
if isDraw ~= 0 then
return true
end
-- 领取太多次了
local times = redis.call('INCR', 'red::draw_count:u:'..openid)
if times and tonumber(times) > 9 then
return 0
end
local number = redis.call('RPOP', 'red::list')
-- 没有红包
if not number then
return {}
end
-- 领取人昵称为Fhb,头像为https://xxxxxxx
local red = {money=number,name='Fhb',pic='https://xxxxxxx'}
-- 领取记录
redis.call('HSET', 'red::draw', openid, cjson.encode(red))
-- 处理队列
red['openid'] = openid
redis.call('RPUSH', 'red::task', cjson.encode(red))
return true
需要注意 Lua 脚本执行过程并不是事务的,脚本中的操作命令在执行时是有先后顺序的,当某个操作执行失败时不会回滚已经执行成功的操作,它的原子性是通过单线程模型实现。
2、怎么提高系统响应速度
如红包占有流程图所示,当用户发起抢红包请求时,若有红包则直接完成红包占有操作,同步告知用户是否抢到红包,这个过程要求快速响应。
但由于微信红包支付属于第三方调用,若抢到红包后同步调用红包支付,系统调用链又长又慢,所以红包占有和红包发放异步拆分是必然。拆分后,红包占有只需操作 Redis,响应性能已不是问题。
3、怎么提高系统处理能力
从上述分析可知,目前系统的压力都会集中在红包发放这个环节,因为用户抢到红包时,我们只是同步告知用户已抢到红包,然后异步去发放红包,因此用户并不会立即收到红包(受红包发放 Worker 处理能力和微信服务压力制约)。若红包发放的 Worker 处理能力较弱,那么红包发放的延迟就会很高,体验较差。
如抢红包流程图中所示,我们采用一组 Worker 去消费任务队列,并调用红包支付 API,以及数据持久化操作(后续对账)。尽管红包发放调用链又长又慢,但是注意到这些 Worker 是 无状态 的,所以可以通过增加 Worker 数量,以横向扩展提高系统的处理能力。
4、怎么保证数据一致性
其实,红包发放延时我们可以做到用户无感知,但是若红包发放(流程②)失败了,已经告知用户抢到红包,但是却木有发,估计他杀人的心都有了。根据 CAP 原理,我们无法同时满足数据一致性、数据可用性、分区耐受性,通常只需做到数据最终一致性。
为了达到数据最终一致性,我们就引入了重试机制,生成一个全局唯一的外部订单号,当某单红包发放失败,就会放回任务队列,使得有机会进行发放重试,当然这一切都需要 API 做幂等处理。
这里必须将 Worker 可靠性单独说,因为它实在太重要了。Worker 的实现如下:
$maxTask = 1000;
$sleepTime = 1000;
while (true) {
while ($red = RedLogic::getTask()) {
RedLogic::doTask($red);
//处理多少个任务主动退出
$maxTask--;
if ($maxTask < 0) {
return EXIT_CODE_NORMAL;
}
}
//等待任务
usleep($sleepTime);
}
这里使用 LPOP 命令获取任务,所以使用了 while 结构,并且无任务时需要等待,可以用阻塞命令 BLPOP 来改进。
由于 Worker 需要常驻内存运行,难免会出现异常退出的情况(也有主动退出), 所以需要保持 Worker 一直处于运行状态。我们使用进程管理工具 Supervisor 来监控 Worker 的运行状态,同时管理 Worker 的数量,当任务队列出现堆积时,增加 Worker 数量即可。Supervisor 的监控后台如下:
公司员工都用唯一一个系统号 emp_code(自增字段)标识,登录成功后返回 emp_code,系统后续所有交互流程都基于 emp_code,分享出去的红包也会携带 emp_code,为了保护员工敏感信息和防止恶意碰撞攻击,我们不能直接将 emp_code 暴露给前端,需要借助一个 token(无规律)的中间者来完成交互。
1、储存映射关系,时时查询
预先生成一个随机串 token,然后跟 emp_code 绑定,每次请求都根据 token 时时查询 emp_code。优点是可以定期更新,相对安全,缺点是性能不高。
2、建立映射关系函数,实时计算
建立一个映射关系函数,如 hash 散列或者加密解密算法,能够根据 emp_code 生成一个无规律的字符串 token,并且要能够根据 token 反映射出 emp_code。优点是需要存储介质存储关系,性能较高,缺点是很难做到定期失效并更新。
由于我们的红包活动只进行几天,所以我们选用了方案 2。对 emp_code 做了 hashids 散列算法,暴露的只是一串无规律的散列字符串。
hashids 是一个开源且轻量的唯一 id 生成器,支持 Java、PHP、C/C++、Python 等主流语言,PHP 想使用 hashids,只需composer require hashids/hashids
命令安装即可。
然后,如下方式使用:
use Hashids\Hashids;
$hashids = new Hashids('salt', 6, 'abcdefghijk1234567890');
$hashids->encode(11002); //994k2kk
$hashids->decode('994k2kk'); //[11002]
需要说明的是,其中salt
是非常重要的散列加密盐串,6
表示散列值最小长度,abcde...7890
为散列字典,太长影响效率,太短不安全。由于默认的散列字典比较长,decode 效率并不高,所以这里移除了大写字母部分。
语音点赞就是用户以语音的形式助力好友,核心技术其实是语音识别,而我们一般都会使用第三方语音识别服务。
1、客户端调用第三方服务识别
客户端直接调用第三方语音识别服务,如微信提供了 JS-SDK 的语音识别 API ,返回识别的语音文本的信息,并且已经经过语义化。优点是识别较快,且不许关注语音存储问题,缺点是不安全,识别结果提交到服务端之前可能被恶意篡改。
2、服务端调用第三方服务识别
先将录制的语音上传至存储平台,然后服务端调用第三方语音识别服务,第三方语音识别服务去获取语音信息并识别,返回识别的语音文本的信息。优点是识别结果较安全,缺点是系统交互较多,识别效率不高。
我们业务场景的特殊性,存在用户可助力次数的限制,所以无需担心恶意刷赞的情况,因此可以选用方案 1,语音识别的交互流程如下:
此时,整个语音识别流程如下:
当然中国文字博大精深,语音识别的文本在匹配时,需要考虑容错处理,可以将文本转化为拼音,然后匹配拼音,或者设置一个匹配百分比,达到匹配值则认为语音口令正确。
需要注意的是,微信只提供 3 天的语音存储服务,若语音播放周期较长,则要考虑实现语音的存储。
我们使用了线上公账号进行红包发放测试,为了让线上公众号能够授权到测试环境,在线上的微信授权回调地址新增一个参数,将带有to=feature
参数的请求引流到测试环境,其他线上流量还是保持不变,匹配规则如下:
# Nginx不支持if嵌套,所以就这样变通实现
set $auth_redirect "";
if ($args ~* "r=auth/redirect") {
set $auth_redirect "prod";
}
if ($args ~* "to=feature") {
set $auth_redirect "feature";
}
if ($auth_redirect ~ "feature") {
rewrite ^(.*)$ http://wx.t.ziroom.com/index.php last;
}
if ($auth_redirect ~ "prod") {
rewrite ^(.*)$ http://wx.ziroom.com/index.php last;
}
由于本次活动力度较大,预估流量会比以往增加不少(不能再出现机房带宽打满的情况了,不然 >﹏<),静态页面占流量的很大一部分,所以静态页面在发布时都会放置一份在 CDN 上,这样回源的流量就很小了。
尽管做了很多准备,还是无法确保万无一失,我们在每个关键节点都增加了开关,一点出现异常,通过配置中心可以人工介入做降级处理。
查看原文天翊 评论了文章 · 2019-03-25
首发于 樊浩柏科学院
我们经常会使用 PhpStorm 结合 Xdebug 进行代码断点调试,这样能追踪程序执行流程,方便调试代码和发现潜在问题。博主将开发环境迁入 Docker 后,Xdebug 调试遇到了些问题,在这里整理出 Docker 中使用 Xdebug 的方法和注意事项。
说明:开发和调试环境为本地 Docker 中的 LNMP,IDE 环境为本地 Win10 下的 PhpStorm。这种情况下 Xdebug 属于远程调试模式,IDE 和本地 IP 为 192.168.1.101,Docker 中 LNMP 容器 IP 为 172.17.0.2。
在 Docker 中安装并配置完 Xdebug ,并设置 PhpStorm 中对应的 Debug 参数后,但是 Debug 并不能正常工作。
此时,php.ini
中 Xdebug 配置如下:
xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_connect_back = on
xdebug.remote_port = 9001 //PhpStorm监听本地9001端口
xdebug.remote_handler = dbgp
xdebug.remote_log = /home/tmp/xdebug.log
开始收集问题详细表述。首先,观察到 PhpStorm 的 Debug 控制台出现状态:
Waiting for incoming connection with ide key ***
然后查看 Xdebug 调试日志xdebug.log
,存在如下错误:
I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(
查看这些问题表述,基本上可以定位为 Xdebug 和 PhpStorm 之间的 网络通信 问题,接下来一步步定位具体问题。
Win 下执行 netstat -ant
命令:
协议 本地地址 外部地址 状态 卸载状态
TCP 0.0.0.0:9001 0.0.0.0:0 LISTENING InHost
端口 9001 监听正常,然后在容器中使用 telnet 尝试同本地 9001 端口建立 TCP 连接:
$ telnet 192.168.1.101 9001
Trying 192.168.1.101...
Connected to 192.168.1.101.
Escape character is '^]'.
说明容器同本地 9001 建立 TCP 连接正常,但是 Xdebug 为什么会报连接失败呢?此时,至少可以排除不会是因为 PhpStorm 端配置的问题。
回过头来看看 Xdebug 的错误日志,注意观察到失败时的连接信息:
I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(
此时,在容器中使用 tcpdump 截获的数据包如下:
$ tcpdump -nnA port 9001
# 尝试建立连接,但是失败了
12:20:34.318080 IP 172.17.0.2.40720 > 172.17.0.1.9001: Flags [S], seq 2365657644, win 29200, options [mss 1460,sackOK,TS val 833443 ecr 0,nop,wscale 7], length 0
E..<..@.@.=...........#)...,......r.XT.........
............
12:20:34.318123 IP 172.17.0.1.9001 > 172.17.0.2.40720: Flags [R.], seq 0, ack 2365657645, win 0, length 0
E..(.]@.@..M........#).........-P....B..
可以确定的是, Xdebug 是向 IP 为 172.17.0.1 且端口为 9001 的目标机器尝试建立 TCP 连接,而非正确的 192.168.1.101 本地 IP。到底发生了什么?
首先,为了搞懂 Xdebug 和 PhpStorm 的交互过程,查了 官方手册 得知,Xdebug 工作在远程调试模式时,有两种工作方式:
1、IDE 所在机器 IP 确定/单人开发
图中,由于 IDE 的 IP 和监听端口都已知,所以 Xdebug 端可以很明确知道 DBGP 交互时 IDE 目标机器信息,所以 Xdebug 只需配置 xdebug.remote_host、xdebug.remote_port 即可。
2、IDE 所在机器 IP 未知/团队开发
由于 IDE 的 IP 未知或者 IDE 存在多个 ,那么 Xdebug 无法提前预知 DBGP 交互时的目标 IP,所以不能直接配置 xdebug.remote_host 项(remote_port 项可以确定),必须设置 xdebug.remote_connect_back 为 On 标识(会忽略 xdebug.remote_host 项)。这时,Xdebug 会优先获取 HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR 中的一个值作为通信时 IDE 端的目标 IP,通过Xdebug.log
记录可以确认。
I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found
接下来,可以知道 Xdebug 端是工作在远程调试的模式 2 上,Xdebug 会通过 HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR 项获取目标机 IP。Docker 启动容器时已经做了 80 端口映射,忽略宿主机同 Docker 容器复杂的数据包转发规则,先截取容器 80 端口数据包:
$ tcpdump -nnA port 80
# 请求信息
13:30:07.017770 IP 172.17.0.1.33976 > 172.17.0.2.80: Flags [P.], seq 1:208, ack 1, win 229, options [nop,nop,TS val 1250713 ecr 1250713], length 207
E....=@.@..............P.. .+.......Y......
........GET /v2/room/list.json HTTP/1.1
Accept: */*
Cache-Control: no-cache
Host: localhost
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_152-release)
Accept-Encoding: gzip,deflate
可以看出,数据包的源地址为 172.17.0.1,并非真正的源地址 192.168.1.101,HTTP 请求头中也无 HTTP_X_FORWARDED_FOR 项。
说明:172.17.0.1 实际为 Docker 创建的虚拟网桥 docker0 的地址 ,也是所有容器的默认网关。Docker 网络通信方式默认为 Bridge 模式,通信时宿主机会对数据包进行 SNAT 转换,进而源地址变为 docker0,那么,怎么在 Docker 里获取客户端真正 IP 呢?。
最后,可以确定由于 HTTP_X_FORWARDED_FOR 未定义,因此 Xdebug 会取 REMOTE_ADDR 为 IDE 的 IP,同时由于 Docker 特殊的网络转发规则,导致 REMOTE_ADDR 变更为网关 IP,所以 Xdebug 同 PhpStorm 进行 DBGP 交互会失败。
由于 Docker 容器里获取真正客户端 IP 比较复杂,这里使用 Xdebug 的 远程模式 1 明确 IDE 端 IP 来规避源 IP 被修改的情况,最终解决 Xdebug 调试问题。
模式 1 的 Xdebug 主要配置为:
//并没有xdebug.remote_connect_back项
xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_host = 192.168.1.101
xdebug.remote_port = 9001
xdebug.remote_handler = dbgp
重启 php-fpm,使用php --ri xdebug
确定无误,使用 PhpStorm 重新进行调试。
再次在容器中 tcpdump 抓取 9001 端口数据包:
# 连接的源地址已经正确
14:05:27.379783 IP 172.17.0.2.44668 > 192.168.1.101.9001: Flags [S], seq 3444466556, win 29200, options [mss 1460,sackOK,TS val 1462749 ecr 0,nop,wscale 7], length 0
E..<2.@.@..........e.|#).Nc|......r.nO.........
..Q.........
再次使用 PhpStorm 的 REST Client 断点调试 API 时, Debug 控制台如下:
所以,使用 Xdebug 进行远程调试时,需要选择合适的调试模式,在 Docker 下建议使用远程模式 1。
并不是每个 Xdebug 版本都适配 PHP 每个版本,可以直接使用 官方工具,选择合适的 Xdebug 版本。
如上图,在使用 PhpStorm 时进行远程调试时,需要配置本地文件和远端文件的目录映射关系,这样 IDE 才能根据 Xdebug 传递的当前执行文件路径与本地文件做匹配,实现断点调试和单步调试等。
查看原文天翊 评论了文章 · 2019-03-25
首发于 樊浩柏科学院
我们经常会使用 PhpStorm 结合 Xdebug 进行代码断点调试,这样能追踪程序执行流程,方便调试代码和发现潜在问题。博主将开发环境迁入 Docker 后,Xdebug 调试遇到了些问题,在这里整理出 Docker 中使用 Xdebug 的方法和注意事项。
说明:开发和调试环境为本地 Docker 中的 LNMP,IDE 环境为本地 Win10 下的 PhpStorm。这种情况下 Xdebug 属于远程调试模式,IDE 和本地 IP 为 192.168.1.101,Docker 中 LNMP 容器 IP 为 172.17.0.2。
在 Docker 中安装并配置完 Xdebug ,并设置 PhpStorm 中对应的 Debug 参数后,但是 Debug 并不能正常工作。
此时,php.ini
中 Xdebug 配置如下:
xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_connect_back = on
xdebug.remote_port = 9001 //PhpStorm监听本地9001端口
xdebug.remote_handler = dbgp
xdebug.remote_log = /home/tmp/xdebug.log
开始收集问题详细表述。首先,观察到 PhpStorm 的 Debug 控制台出现状态:
Waiting for incoming connection with ide key ***
然后查看 Xdebug 调试日志xdebug.log
,存在如下错误:
I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(
查看这些问题表述,基本上可以定位为 Xdebug 和 PhpStorm 之间的 网络通信 问题,接下来一步步定位具体问题。
Win 下执行 netstat -ant
命令:
协议 本地地址 外部地址 状态 卸载状态
TCP 0.0.0.0:9001 0.0.0.0:0 LISTENING InHost
端口 9001 监听正常,然后在容器中使用 telnet 尝试同本地 9001 端口建立 TCP 连接:
$ telnet 192.168.1.101 9001
Trying 192.168.1.101...
Connected to 192.168.1.101.
Escape character is '^]'.
说明容器同本地 9001 建立 TCP 连接正常,但是 Xdebug 为什么会报连接失败呢?此时,至少可以排除不会是因为 PhpStorm 端配置的问题。
回过头来看看 Xdebug 的错误日志,注意观察到失败时的连接信息:
I: Remote address found, connecting to 172.17.0.1:9001.
W: Creating socket for '172.17.0.1:9001', poll success, but error: Operation now in progress (29).
E: Could not connect to client. :-(
此时,在容器中使用 tcpdump 截获的数据包如下:
$ tcpdump -nnA port 9001
# 尝试建立连接,但是失败了
12:20:34.318080 IP 172.17.0.2.40720 > 172.17.0.1.9001: Flags [S], seq 2365657644, win 29200, options [mss 1460,sackOK,TS val 833443 ecr 0,nop,wscale 7], length 0
E..<..@.@.=...........#)...,......r.XT.........
............
12:20:34.318123 IP 172.17.0.1.9001 > 172.17.0.2.40720: Flags [R.], seq 0, ack 2365657645, win 0, length 0
E..(.]@.@..M........#).........-P....B..
可以确定的是, Xdebug 是向 IP 为 172.17.0.1 且端口为 9001 的目标机器尝试建立 TCP 连接,而非正确的 192.168.1.101 本地 IP。到底发生了什么?
首先,为了搞懂 Xdebug 和 PhpStorm 的交互过程,查了 官方手册 得知,Xdebug 工作在远程调试模式时,有两种工作方式:
1、IDE 所在机器 IP 确定/单人开发
图中,由于 IDE 的 IP 和监听端口都已知,所以 Xdebug 端可以很明确知道 DBGP 交互时 IDE 目标机器信息,所以 Xdebug 只需配置 xdebug.remote_host、xdebug.remote_port 即可。
2、IDE 所在机器 IP 未知/团队开发
由于 IDE 的 IP 未知或者 IDE 存在多个 ,那么 Xdebug 无法提前预知 DBGP 交互时的目标 IP,所以不能直接配置 xdebug.remote_host 项(remote_port 项可以确定),必须设置 xdebug.remote_connect_back 为 On 标识(会忽略 xdebug.remote_host 项)。这时,Xdebug 会优先获取 HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR 中的一个值作为通信时 IDE 端的目标 IP,通过Xdebug.log
记录可以确认。
I: Checking remote connect back address.
I: Checking header 'HTTP_X_FORWARDED_FOR'.
I: Checking header 'REMOTE_ADDR'.
I: Remote address found
接下来,可以知道 Xdebug 端是工作在远程调试的模式 2 上,Xdebug 会通过 HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR 项获取目标机 IP。Docker 启动容器时已经做了 80 端口映射,忽略宿主机同 Docker 容器复杂的数据包转发规则,先截取容器 80 端口数据包:
$ tcpdump -nnA port 80
# 请求信息
13:30:07.017770 IP 172.17.0.1.33976 > 172.17.0.2.80: Flags [P.], seq 1:208, ack 1, win 229, options [nop,nop,TS val 1250713 ecr 1250713], length 207
E....=@.@..............P.. .+.......Y......
........GET /v2/room/list.json HTTP/1.1
Accept: */*
Cache-Control: no-cache
Host: localhost
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_152-release)
Accept-Encoding: gzip,deflate
可以看出,数据包的源地址为 172.17.0.1,并非真正的源地址 192.168.1.101,HTTP 请求头中也无 HTTP_X_FORWARDED_FOR 项。
说明:172.17.0.1 实际为 Docker 创建的虚拟网桥 docker0 的地址 ,也是所有容器的默认网关。Docker 网络通信方式默认为 Bridge 模式,通信时宿主机会对数据包进行 SNAT 转换,进而源地址变为 docker0,那么,怎么在 Docker 里获取客户端真正 IP 呢?。
最后,可以确定由于 HTTP_X_FORWARDED_FOR 未定义,因此 Xdebug 会取 REMOTE_ADDR 为 IDE 的 IP,同时由于 Docker 特殊的网络转发规则,导致 REMOTE_ADDR 变更为网关 IP,所以 Xdebug 同 PhpStorm 进行 DBGP 交互会失败。
由于 Docker 容器里获取真正客户端 IP 比较复杂,这里使用 Xdebug 的 远程模式 1 明确 IDE 端 IP 来规避源 IP 被修改的情况,最终解决 Xdebug 调试问题。
模式 1 的 Xdebug 主要配置为:
//并没有xdebug.remote_connect_back项
xdebug.idekey = phpstorm
xdebug.remote_enable = on
xdebug.remote_host = 192.168.1.101
xdebug.remote_port = 9001
xdebug.remote_handler = dbgp
重启 php-fpm,使用php --ri xdebug
确定无误,使用 PhpStorm 重新进行调试。
再次在容器中 tcpdump 抓取 9001 端口数据包:
# 连接的源地址已经正确
14:05:27.379783 IP 172.17.0.2.44668 > 192.168.1.101.9001: Flags [S], seq 3444466556, win 29200, options [mss 1460,sackOK,TS val 1462749 ecr 0,nop,wscale 7], length 0
E..<2.@.@..........e.|#).Nc|......r.nO.........
..Q.........
再次使用 PhpStorm 的 REST Client 断点调试 API 时, Debug 控制台如下:
所以,使用 Xdebug 进行远程调试时,需要选择合适的调试模式,在 Docker 下建议使用远程模式 1。
并不是每个 Xdebug 版本都适配 PHP 每个版本,可以直接使用 官方工具,选择合适的 Xdebug 版本。
如上图,在使用 PhpStorm 时进行远程调试时,需要配置本地文件和远端文件的目录映射关系,这样 IDE 才能根据 Xdebug 传递的当前执行文件路径与本地文件做匹配,实现断点调试和单步调试等。
查看原文天翊 发布了文章 · 2019-03-25
首发于 樊浩柏科学院
之前一直在使用 Hexo 推荐的发布方案,缺点是本地依赖 Hexo 环境,无法随时随地地更新博客。为了摆脱 Hexo 环境约束进而高效写作,有了下述的发布方案。
本文的发布方案中,Git 仓库只是托管 md 文件,通过 Webhook 通知服务器拉取 md 文件,然后执行构建静态文件操作,完成一个发布过程。
我的写作环境为 Typora(Win10),博客发布在阿里云的 ECS(CentOS)上,文章托管在 GitHub。
随着时间成本的增高,只能利用碎片时间来进行写作。因此,我的写作场景变成了这样:
之前(包括 Hexo 推荐)的发布方案,都是先本地编写 MarkDown 源文件,然后本地构建静态文件,最后同步静态文件到服务器。发布流程图如下:
显而易见,若继续使用之前的发布方案,那么每当更换写作场地时都需要安装 Hexo 环境,写作场地和时间都受到限制,不满足需求。
问题主要是,本地受制于构建静态文件时需要的 Hexo 环境,那么是否可以将构建静态文件操作放到服务器端?
首先,看下新方案的发布流程图:
如流程图所示,整个发布系统共涉及到 3 个环境,分别为本地(写作)、Git 仓库(托管 md 源文件)、服务器(Web 服务)环境。在服务器环境构建静态文件,因此只需要在服务器端安装 Hexo 环境。
一个完整的发布流程包含 3 个部分:
采用按分支开发策略,当写作完成后,只需要 push 修改到对应分支即可。只要有 MarkDown 编辑器,以及任何文本编辑器,甚至 马克飞象 都可以随时随地写作。
当然,你可能说还需要 Git 环境呀?好吧,如果你是一名合格的 Coder,竟然没有 Git,你知道该干嘛了!再说没有 Git 环境,还可以通过 GitHub 来完成写作。
采用 master 发布策略,当需要发布时,需要将对应开发分支 merge 到 master 分支,然后push master
分支,即可实现发布。
这里使用到 Webhook 机制,触发服务器执行构建操作,构建脚本见 Webhook 脚本 部分。
当流程 ① 和 ② 结束后,Git 仓库都会向服务器发起一次 HTTP 请求,记录如下:
当收到构建请求后,执行构建操作。构建流程图如下:
首先检查当前变更分支,只有为 master 分支时,执行 pull 操作拉取 md 文件更新,然后再执行 hexo g
完成静态文件的构建。
Webhook 脚本使用 PHP 实现,代码如下:
主流程方法如下:
public function run()
{
//校验token
if ($this->checkToken()) {
echo 'ok';
} else {
echo 'error';
}
fastcgi_finish_request(); //返回响应
if ($this->checkBranch()) { //校验分支
$this->exec(); //执行操作逻辑
}
}
这里使用 shell 脚本实现构建所需的所有操作,方便扩展。执行操作方法如下:
public function exec()
{
//shell文件
$path = $this->config['bash_path'];
$result = shell_exec("sh $path 2>&1");
$this->accessLog($result);
return $result;
}
构建 shell 脚本如下:
#!/usr/bin/env bash
export NODE_HOME=/usr/local/node
export PATH=$NODE_HOME/bin:$PATH
pwd='/data/html/hexo'
cd $pwd/source
git pull
cd $pwd
$pwd/node_modules/hexo/bin/hexo g
新发布方案与之前方案的区别是:前者只需本地编写 md 文件,博客服务器构建静态文件;后者本地编写 md 文件后,需要本地构建静态文件,然后博客服务器只同步静态文件。
当然,有很多办法可以解决当前问题,比如可以使用 持续集成。本文只是提供一个发布思路,在项目的生成环境中,我们也很容易应用上这种发布思路,开发出自己的发布系统。
相关文章 »
赞 8 收藏 4 评论 0
天翊 发布了文章 · 2019-03-25
首发于 樊浩柏科学院
2017 年是自如快速增长的一年,自如客突破 100 万,管理资产达到 50 万间,在年底成功获得了 40 亿 A 轮融资,而这些都要感谢广大的自如客,公司为了回馈自如客,在六周年活动时就发放了 6000 万租住基金,当然年底散币活动也够疯狂。
既然公司对自如客这么阔,那对我们员工也得够意思,所以年底我们共准备了 3 个活动。
1、针对 自如客 的服务费减免活动;
2、针对 自如客 的 1000 万现金礼包;
3、25 万的 员工 红包活动;
散币活动 2 和 3 是通过微信红包形式进行,想散币就散吧,可微信告诉我们,想散币还得交税(>﹏<)。员工红包来说,25 万要交掉 10 多万税,此时心疼我的钱。好了,下面开始说点正事。
说到红包,我们肯定会想到红包拆分和抢红包两个场景。红包拆分是指将指定金额拆分为指定数目红包的过程,即是用来确定每个红包的金额数;而抢红包就是典型的高并发场景,需要避免红包超发的情况。
拆分方式
1、实时拆分
实时拆分,指的是在抢红包时实时计算每个红包的金额,以实现红包的拆分过程,对系统性能和拆分算法要求较高,例如拆分过程要一直保证后续待拆分红包的金额不能为空,不容易做到拆分红包的金额服从正态分布规律。
2、预先生成
预先生成,指的是在红包开抢之前已经完成了红包的拆分,抢红包时只是依次取出拆分好的红包金额,对拆分算法要求较低,可以拆分出随机性很好的红包金额,通常需要结合队列使用。
拆分算法
我并没有找到业界的通用算法,但红包拆分算法应该是拆分金额要看起来随机,最好能够服从正态分布,可以参考 微信 和 @lcode 提供的红包拆分算法。
微信拆分算法的优点是算法较简单,拆分效率高,同时,由于该算法天然的特性,可以保证后续红包金额一定不为空,特别适合实时拆分场景,但缺点是会导致大额红包较大概率地在拆分的最后出现。 @lcode 拆分算法的优点是拆分金额基本符合正态分布,适合随机性要求较高的拆分场景。
我们这次的业务对红包金额的随机性要求不高,但是对系统可靠性要求较高,所以我们选用了预算生成方式,使用 二倍均值法 的红包拆分算法,作为我们的红包拆分方案。
采用预算生成方式,我们预先生成红包并放入 Redis 的 List 中,当抢红包时只是 Pop List 即可,具体实现将在 抢红包 部分介绍。
拆分算法可以描述为:假设剩余拆分金额为 M,剩余待拆分红包个数为 N,红包最小金额为 1 元,红包最小单位为元,那么定义当前红包的金额为:
$$m = rand(1, floor(M/N*2))$$
其中,floor 表示向下取整,rand(min, max) 表示从 [min, max] 区间随机一个值。$M/N \ast 2$ 表示剩余待拆分金额平均金额的 2 倍,因为 N >= 2,所以 $M/N \ast 2 <= M$,表示一定能保证后续红包能拆分到金额。
代码实现为:
for ($i = 0; $i < $N - 1; $i++) {
$max = (int)floor($M / ($N - $i)) * 2;
$m[$i] = $max ? mt_rand(1, $max) : 0;
$M -= $m[$i];
}
$m[] = $M;
值得一提的是,我们为了保证红包金额差异尽量小,先将总金额平均拆分成 N+1 份,将第 N+1 份红包按照上述的红包拆分算法拆分成 N 份,这 N 份红包加上之前的平均金额才作为最终的红包金额。
限流
1、前端限流
前端限制用户在 n 秒之内只能提交一次请求,虽然这种方式只能挡住小白,不过这是 99% 的用户哟,所以也必须得做。
2、后端限流
常用的后端限流方法有 漏桶算法 和 令牌桶算法。漏桶算法 主要目的是控制请求数据注入的速率,如果此时漏桶溢出,后续的请求数据会被丢弃。而 令牌桶算法 是以一个恒定的速度往桶里放入令牌,而如果请求数据需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌时,这些请求才被丢弃,令牌桶算法的一个好处是可以方便地改变应用接受请求的速率。
防超发
1、库存加锁
可以通过加锁的方式解决资源抢占问题,但是加锁会增加系统开销,大流量下更容易拖垮系统,不过可以尝试一下基于版本号的乐观锁。
2、通过高速队列串行化请求
之所会出现超发问题,是因为并发时会出现多个进程同时获取同一资源的现象,如果使用高速队列将并行请求串行化,那么问题就不存在了。高速队列可以使用 Redis 缓存服务器来实现,当然光使用队列还不够,必要保证整个流程调用链要短、要快,否则队列会积压严重,甚至会拖垮整个服务。
在限流方面,由于我们预估的请求量还在系统承受范围,所以没有考虑引入后端限流方案。我们的抢红包系统流程图如下:
我们将抢红包拆分为 红包占有(流程①,同步) 和 红包发放 (流程②,异步)这两个过程,首先采用高速队列串行化请求,红包发放逻辑由一组 Worker 异步去完成。高速队列只是完成红包占有的过程,实现库存的控制,Worker 则处理耗时较长的红包发放过程。
当然,在实际应用中,红包占用过程还需要加上一些前置规则校验,比如用户是否已经领取过,领取次数是否已经达到上限等?红包占有流程图如下:
其中,red::list
为 List 结构,存放预先生成的红包金额(流程①中的红包队列);red::task
也为 List 结构,红包异步发放队列(流程②中的任务队列);red::draw
为 Hash 结构,存放红包领取记录,field
为用户的 openid,value
为序列化的红包信息;red::draw_count:u:openid
为 k-v 结构,用户领取红包计数器。
下面,我将以以下 3 个问题为中心,来说说我们设计出的抢红包系统。
1、怎么保证不超发
我们需要关注的是红包占有过程,从红包占有流程图可看出,这个过程是很多 Key 操作的组合,那怎么保证原子性?可以使用 Redis 事务,但我们选用了 Lua 方案,一方面是因为首先要保证性能,而 Lua 脚本嵌入 Redis 执行不存在性能瓶颈,另一方面 Lua 脚本执行时本身就是原子性的,满足需求。
红包占有的 Lua 脚本实现如下:
-- 领取人的openid为xxxxxxxxxxx
local openid = 'xxxxxxxxxxx'
local isDraw = redis.call('HEXISTS', 'red::draw', openid)
-- 已经领取
if isDraw ~= 0 then
return true
end
-- 领取太多次了
local times = redis.call('INCR', 'red::draw_count:u:'..openid)
if times and tonumber(times) > 9 then
return 0
end
local number = redis.call('RPOP', 'red::list')
-- 没有红包
if not number then
return {}
end
-- 领取人昵称为Fhb,头像为https://xxxxxxx
local red = {money=number,name='Fhb',pic='https://xxxxxxx'}
-- 领取记录
redis.call('HSET', 'red::draw', openid, cjson.encode(red))
-- 处理队列
red['openid'] = openid
redis.call('RPUSH', 'red::task', cjson.encode(red))
return true
需要注意 Lua 脚本执行过程并不是事务的,脚本中的操作命令在执行时是有先后顺序的,当某个操作执行失败时不会回滚已经执行成功的操作,它的原子性是通过单线程模型实现。
2、怎么提高系统响应速度
如红包占有流程图所示,当用户发起抢红包请求时,若有红包则直接完成红包占有操作,同步告知用户是否抢到红包,这个过程要求快速响应。
但由于微信红包支付属于第三方调用,若抢到红包后同步调用红包支付,系统调用链又长又慢,所以红包占有和红包发放异步拆分是必然。拆分后,红包占有只需操作 Redis,响应性能已不是问题。
3、怎么提高系统处理能力
从上述分析可知,目前系统的压力都会集中在红包发放这个环节,因为用户抢到红包时,我们只是同步告知用户已抢到红包,然后异步去发放红包,因此用户并不会立即收到红包(受红包发放 Worker 处理能力和微信服务压力制约)。若红包发放的 Worker 处理能力较弱,那么红包发放的延迟就会很高,体验较差。
如抢红包流程图中所示,我们采用一组 Worker 去消费任务队列,并调用红包支付 API,以及数据持久化操作(后续对账)。尽管红包发放调用链又长又慢,但是注意到这些 Worker 是 无状态 的,所以可以通过增加 Worker 数量,以横向扩展提高系统的处理能力。
4、怎么保证数据一致性
其实,红包发放延时我们可以做到用户无感知,但是若红包发放(流程②)失败了,已经告知用户抢到红包,但是却木有发,估计他杀人的心都有了。根据 CAP 原理,我们无法同时满足数据一致性、数据可用性、分区耐受性,通常只需做到数据最终一致性。
为了达到数据最终一致性,我们就引入了重试机制,生成一个全局唯一的外部订单号,当某单红包发放失败,就会放回任务队列,使得有机会进行发放重试,当然这一切都需要 API 做幂等处理。
这里必须将 Worker 可靠性单独说,因为它实在太重要了。Worker 的实现如下:
$maxTask = 1000;
$sleepTime = 1000;
while (true) {
while ($red = RedLogic::getTask()) {
RedLogic::doTask($red);
//处理多少个任务主动退出
$maxTask--;
if ($maxTask < 0) {
return EXIT_CODE_NORMAL;
}
}
//等待任务
usleep($sleepTime);
}
这里使用 LPOP 命令获取任务,所以使用了 while 结构,并且无任务时需要等待,可以用阻塞命令 BLPOP 来改进。
由于 Worker 需要常驻内存运行,难免会出现异常退出的情况(也有主动退出), 所以需要保持 Worker 一直处于运行状态。我们使用进程管理工具 Supervisor 来监控 Worker 的运行状态,同时管理 Worker 的数量,当任务队列出现堆积时,增加 Worker 数量即可。Supervisor 的监控后台如下:
公司员工都用唯一一个系统号 emp_code(自增字段)标识,登录成功后返回 emp_code,系统后续所有交互流程都基于 emp_code,分享出去的红包也会携带 emp_code,为了保护员工敏感信息和防止恶意碰撞攻击,我们不能直接将 emp_code 暴露给前端,需要借助一个 token(无规律)的中间者来完成交互。
1、储存映射关系,时时查询
预先生成一个随机串 token,然后跟 emp_code 绑定,每次请求都根据 token 时时查询 emp_code。优点是可以定期更新,相对安全,缺点是性能不高。
2、建立映射关系函数,实时计算
建立一个映射关系函数,如 hash 散列或者加密解密算法,能够根据 emp_code 生成一个无规律的字符串 token,并且要能够根据 token 反映射出 emp_code。优点是需要存储介质存储关系,性能较高,缺点是很难做到定期失效并更新。
由于我们的红包活动只进行几天,所以我们选用了方案 2。对 emp_code 做了 hashids 散列算法,暴露的只是一串无规律的散列字符串。
hashids 是一个开源且轻量的唯一 id 生成器,支持 Java、PHP、C/C++、Python 等主流语言,PHP 想使用 hashids,只需composer require hashids/hashids
命令安装即可。
然后,如下方式使用:
use Hashids\Hashids;
$hashids = new Hashids('salt', 6, 'abcdefghijk1234567890');
$hashids->encode(11002); //994k2kk
$hashids->decode('994k2kk'); //[11002]
需要说明的是,其中salt
是非常重要的散列加密盐串,6
表示散列值最小长度,abcde...7890
为散列字典,太长影响效率,太短不安全。由于默认的散列字典比较长,decode 效率并不高,所以这里移除了大写字母部分。
语音点赞就是用户以语音的形式助力好友,核心技术其实是语音识别,而我们一般都会使用第三方语音识别服务。
1、客户端调用第三方服务识别
客户端直接调用第三方语音识别服务,如微信提供了 JS-SDK 的语音识别 API ,返回识别的语音文本的信息,并且已经经过语义化。优点是识别较快,且不许关注语音存储问题,缺点是不安全,识别结果提交到服务端之前可能被恶意篡改。
2、服务端调用第三方服务识别
先将录制的语音上传至存储平台,然后服务端调用第三方语音识别服务,第三方语音识别服务去获取语音信息并识别,返回识别的语音文本的信息。优点是识别结果较安全,缺点是系统交互较多,识别效率不高。
我们业务场景的特殊性,存在用户可助力次数的限制,所以无需担心恶意刷赞的情况,因此可以选用方案 1,语音识别的交互流程如下:
此时,整个语音识别流程如下:
当然中国文字博大精深,语音识别的文本在匹配时,需要考虑容错处理,可以将文本转化为拼音,然后匹配拼音,或者设置一个匹配百分比,达到匹配值则认为语音口令正确。
需要注意的是,微信只提供 3 天的语音存储服务,若语音播放周期较长,则要考虑实现语音的存储。
我们使用了线上公账号进行红包发放测试,为了让线上公众号能够授权到测试环境,在线上的微信授权回调地址新增一个参数,将带有to=feature
参数的请求引流到测试环境,其他线上流量还是保持不变,匹配规则如下:
# Nginx不支持if嵌套,所以就这样变通实现
set $auth_redirect "";
if ($args ~* "r=auth/redirect") {
set $auth_redirect "prod";
}
if ($args ~* "to=feature") {
set $auth_redirect "feature";
}
if ($auth_redirect ~ "feature") {
rewrite ^(.*)$ http://wx.t.ziroom.com/index.php last;
}
if ($auth_redirect ~ "prod") {
rewrite ^(.*)$ http://wx.ziroom.com/index.php last;
}
由于本次活动力度较大,预估流量会比以往增加不少(不能再出现机房带宽打满的情况了,不然 >﹏<),静态页面占流量的很大一部分,所以静态页面在发布时都会放置一份在 CDN 上,这样回源的流量就很小了。
尽管做了很多准备,还是无法确保万无一失,我们在每个关键节点都增加了开关,一点出现异常,通过配置中心可以人工介入做降级处理。
查看原文赞 13 收藏 12 评论 2
推荐关注