0

我建立了个websocket服务,用swoole的process作为消息发布者。当触发onMessage的事件时,建立redis连接作为消息订阅者。当process发送了一条某个redis连接需要取消订阅的消息,该如何取消对应的redis连接呢?

源代码如下:

<?php
$server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
$server->on('open', function (Swoole\WebSocket\Server $server, $request)
{
    echo "server: handshake success with fd{$request->fd}\n";
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame)
{
    $redis = new co\redis();
    $redis->connect('127.0.0.1', 6379);
    if ($redis->subscribe(['channel'])) {
        while ($msg = $redis->recv()) {
            // msg是一个数组, 包含以下信息
            // $type # 返回值的类型:显示订阅成功
            // $name # 订阅的频道名字 或 来源频道名字
            // $info  # 目前已订阅的频道数量 或 信息内容
            list($type, $name, $info) = $msg;
            if ($type == 'subscribe') // 或psubscribe
            {
                // 频道订阅成功消息,订阅几个频道就有几条
            } else {
                if ($type == 'unsubscribe' && $info == 0) // 或punsubscribe
                {
                    break; // 收到取消订阅消息,并且剩余订阅的频道数为0,不再接收,结束循环
                } else {
                    if ($type == 'message') // 若为psubscribe,此处为pmessage
                    {
                        // 打印来源频道名字
                        //var_dump($name);
                        // 打印消息
                        var_dump($info);
                        // 处理消息
                        // balabalaba....
                        if($info=='cancel'){
                            $redis->unsubscribe([$name]);
                        }
                    }
                }
            }
        }
    }
});
$server->on('close', function ($ser, $fd)
{
    echo "client {$fd} closed\n";
});
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response)
{

});
$process = new \Swoole\Process(function () use ($server)
{
    $redis = new \Swoole\Coroutine\Redis();
    $redis->connect('127.0.0.1', 6379);
    while (true) {
        foreach ($server->connections as $connection) {
            $client = $server->getClientInfo($connection);
            if (isset($client['websocket_status']) && $client['websocket_status'] == 3) {
                $redis->publish('channel', 'cancel11111');
            }
        }
    }
}, false, 1, true);
$server->addProcess($process);
$server->start();
_Peter 24
2019-03-14 提问

1 个回答

0

我的理解是订阅 一个管道后,在管道放置json数据,里面多个字段比如 type消息类型 fd message 3个字段,
type =1 向单个fd推送消息
type=2 向所有fd推送,

而取消订阅 ,订阅 客户端的维护应该在db(mysql)层做,给websocket 只告诉 我要往谁推消息

推广链接