swoole_process+redis 实现任务派送报错

实现一个多进程任务队列
采用Swoole扩展中的Process,出现:Fatal error: Uncaught RedisException: read error on connection

后来网上查了之后添加:ini_set('default_socket_timeout', -1);防止超时。

可是在执行的时候报错:
Fatal error: Uncaught RedisException: Operation now in progress
是redis连接的问题吗?

代码如下:

<?php
declare(ticks = 1);
class swooleProcess{

private $maxProcesses = 200;
private $child;
private $masterRedis;
private $redis_task_wing = 'task:wing'; //待处理队列

public function __construct(){
    //install signal handler for dead kids
    pcntl_signal(SIGCHLD, array($this, "sig_handler"));
    set_time_limit(0);
    //队列处理不超时,解决redis报错:read error on connection
    ini_set('default_socket_timeout', -1);
}

private function redis_client(){
    $rds = new Redis();
    $rds->connect('127.0.0.1',6379);
    return $rds;
}

public function process(swoole_process $worker){
    $GLOBALS['worker'] = $worker;
    swoole_event_add($worker->pipe, function($pipe) {
        $worker = $GLOBALS['worker'];
        //send data to master
        $recv = $worker->read();

        sleep(rand(1, 3));
        echo "From Master: $recv\n";
        $worker->exit(0);
    });
    exit;
}

public function testAction(){
    for ($i = 0; $i < 10000; $i++){
        $data = [
            'abc' => $i,
            'timestamp' => time().rand(100,999)
        ];
        $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
    }
    exit;
}

public function runAction(){
    while (1){
        echo "\t now we de have $this->child child processes\n";
        if ($this->child < $this->maxProcesses){
            $rds        = $this->redis_client();
            //无任务时,阻塞等待
            $data_pop   = $rds->brpop($this->redis_task_wing, 3);
            if (!$data_pop){
                continue;
            }
            echo "\t Starting new child | now we de have $this->child child processes\n";
            $this->child++;
            $process = new swoole_process([$this, 'process']);
            $process->write(json_encode($data_pop));
            $pid = $process->start();
        }
    }
}

private function sig_handler($signo) {
    echo "Recive: $signo \r\n";
    switch ($signo) {
        case SIGCHLD:
            while($ret = swoole_process::wait(false)) {
                echo "PID={$ret['pid']}\n";
                $this->child--;
            }
    }
}

}
$swooleProcess = new swooleProcess;

$swooleProcess->runAction();

阅读 2.7k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题