实现一个多进程任务队列
采用Swoole扩展中的Process,出现:Fatal error: Uncaught RedisException: read error on connection
后来网上查了之后添加:ini_set('default_socket_timeout', -1);防止超时。
可是在执行的时候报错:
Fatal error: Uncaught RedisException: Operation now in progress
是redis连接的问题吗?
代码如下:
<?php
declare(ticks = 1);
class swooleProcess{
private $maxProcesses = 200;
private $child;
private $masterRedis;
private $redis_task_wing = 'task:wing'; //待处理队列
public function __construct(){
//install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));
set_time_limit(0);
//队列处理不超时,解决redis报错:read error on connection
ini_set('default_socket_timeout', -1);
}
private function redis_client(){
$rds = new Redis();
$rds->connect('127.0.0.1',6379);
return $rds;
}
public function process(swoole_process $worker){
$GLOBALS['worker'] = $worker;
swoole_event_add($worker->pipe, function($pipe) {
$worker = $GLOBALS['worker'];
//send data to master
$recv = $worker->read();
sleep(rand(1, 3));
echo "From Master: $recv\n";
$worker->exit(0);
});
exit;
}
public function testAction(){
for ($i = 0; $i < 10000; $i++){
$data = [
'abc' => $i,
'timestamp' => time().rand(100,999)
];
$this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
}
exit;
}
public function runAction(){
while (1){
echo "\t now we de have $this->child child processes\n";
if ($this->child < $this->maxProcesses){
$rds = $this->redis_client();
//无任务时,阻塞等待
$data_pop = $rds->brpop($this->redis_task_wing, 3);
if (!$data_pop){
continue;
}
echo "\t Starting new child | now we de have $this->child child processes\n";
$this->child++;
$process = new swoole_process([$this, 'process']);
$process->write(json_encode($data_pop));
$pid = $process->start();
}
}
}
private function sig_handler($signo) {
echo "Recive: $signo \r\n";
switch ($signo) {
case SIGCHLD:
while($ret = swoole_process::wait(false)) {
echo "PID={$ret['pid']}\n";
$this->child--;
}
}
}
}
$swooleProcess = new swooleProcess;
$swooleProcess->runAction();