0

laravel+swoole+amqp实现异步的方式推送消息,之前都是自己手写redis队列,这次想用下amqp。
创建websocket_server,接收到消息放到队列。然后另外一个websocket_server取队列消息进行推送。问题就出在第二个websocket_server,启动时报错如下:
clipboard.png

下面贴上代码,望大牛指正,谢谢!
websocket_server : app/Lib/websocket.php

<?php

namespace App\Lib;

class Websocket
{
    //
    private static $instance = null;
    public static $websocket_server;
    private function __construct()
    {
    }

    private function __clone()
    {
    }

    public static function get_instance(){
        if(self::$instance == null){
            self::$instance = new \swoole_websocket_server('0.0.0.0',9502);
        }
        return self::$instance;
    }

}

接收消息,放入队列:
app/Console/Commands/MessagePublish.php

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use App\Lib\Websocket;
class MessagePublish extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'message:publish';
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $ws = Websocket::get_instance();

        $ws->on('open',function ($ws,$request){
            DB::table('clients')->insert(['fd'=>$request->fd,'ip'=>$request->server['remote_addr']]);
            //$ws->push($request->fd,"hello,welcome!\n");
        });

        $ws->on('message',function($ws,$frame){
            echo "Message:{$frame->data}\n";

            //把消息放入队列
            $host = config('queue.connections.amqp.host');
            $port = config('queue.connections.amqp.port');
            $user = config('queue.connections.amqp.user');
            $password = config('queue.connections.amqp.password');
            $queue = config('queue.connections.amqp.queue');
            $exchange = config('queue.connections.amqp.exchange');
            $key = config('queue.connections.amqp.key');

            try {
                $connection = new AMQPStreamConnection($host,$port,$user,$password);
                $channel = $connection->channel();
                $channel->exchange_declare($exchange,'direct',true,true,false);
                $channel->queue_declare($queue,true,true);
                $channel->queue_bind($queue,$exchange,$key);

                $data = new AMQPMessage($frame->data);

                $channel->basic_publish($data,$exchange,$queue);
            }catch (\Exception $e){
                echo $e->getMessage() . "\n";
            }

        });

        $ws->on('close',function ($ws,$fd){
            DB::table('clients')->where('fd',$fd)->delete();
            echo "Client:{$fd} is closed\n";
        });

        $ws->start();
    }
}

处理队列消息,执行推送:
app/Console/Commands/MessageConsume.php

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Lib\Websocket;
class MessageConsume extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'message:consume';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     * @throws \ErrorException
     */
    public function handle()
    {
        $host = config('queue.connections.amqp.host');
        $port = config('queue.connections.amqp.port');
        $user = config('queue.connections.amqp.user');
        $password = config('queue.connections.amqp.password');
        $queue = config('queue.connections.amqp.queue');

        $ws = Websocket::get_instance();

        try {
            $connection = new AMQPStreamConnection($host,$port,$user,$password);
            $channel = $connection->channel();
            $channel->queue_declare($queue,true,true);

            echo "waiting for message\n";

            $callback = function ($msg) use ($ws) {
                echo "Received ", $msg->body,"\n";

                //执行消息发送
                $clients = DB::table('clients')->get();

                foreach($clients as $client){
                    $ws->push($client->fd,$msg->body);
                }
            };

            $channel->basic_consume($queue,'',false,true,false,false,$callback);

            while (count($channel->callbacks)){
                $channel->wait();
            }
        }catch (Exception $e){
            echo $e->getMessage() . "\n";
        }
    }
}

前台就是常规的js代码发送websocket消息。服务端可以启动message:publish,amqp队列中也有数据。就是启动message:consume报错。看报错信息,就是先启动的脚本已经监听了9502,不能再在9502上创建一个新的websocket_server,我想到的就是用单例模式来加载websocket_server,但是依然报错。不知道是不是我的单例写的不对还是单例的方式根本解决不了这个问题。
我能想到的其他方式:
1、js端ajax请求到php,php把数据放入队列,这样就可以启动一个websocket_server来处理队列了。我没有尝试,估计是没问题的。
2、使用swoole的异步,swoole官方文档如下,看了下,我不知道怎么改造到我的这个例子来。
clipboard.png

我想实现的就是消息放入队列,异步的方式来实现推送。

2 个回答
0

已采纳

自己回答。
还是用swoole的任务投递来实现,贴上代码

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
class MessagePublish extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'message:publish';
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $ws = new \swoole_websocket_server('0.0.0.0',9502);

        //设置运行时参数
        $ws->set([
            'task_worker_num'   =>  4
        ]);

        $ws->on('open',function ($ws,$request){
            DB::table('clients')->insert(['fd'=>$request->fd,'ip'=>$request->server['remote_addr']]);
            //$ws->push($request->fd,"hello,welcome!\n");
        });

        $ws->on('message',function($ws,$frame){
            echo "Message:{$frame->data}\n";

            try {
                //投递异步任务
                $task_id = $ws->task($frame->data);
                echo "投递异步任务 id:{$task_id}\n";
            }catch (\Exception $e){
                echo $e->getMessage() . "\n";
            }
        });

        //处理异步任务
        $ws->on('task',function($ws,$task_id,$from_id,$data){
            echo "处理异步任务 id:{$task_id}\n";
            echo "处理异步任务 from_id:{$from_id}\n";
            echo "处理异步任务 data:{$data}\n";
            //消息推送
            $clients = DB::table('clients')->get();
            foreach($clients as $client){
                if($ws->isEstablished($client->fd)){
                    $ws->push($client->fd,$data);
                }
            }
            $ws->finish("OK");
        });

        //处理异步任务结果
        $ws->on('finish',function ($ws,$task_id,$data){
            echo "异步任务 {$task_id} 执行结果:{$data}\n";
        });

        $ws->on('close',function ($ws,$fd){
            DB::table('clients')->where('fd',$fd)->delete();
            echo "Client:{$fd} is closed\n";
        });

        $ws->start();
    }
}
0

端口只能一个程序监听的,为什么要启动多个服务监听同一个端口?

撰写答案

推广链接