我建立了个websocket服务,用swoole的process作为消息发布者。当触发onMessage的事件时,建立redis连接作为消息订阅者。当process发送了一条某个redis连接需要取消订阅的消息,该如何取消对应的redis连接呢?
源代码如下:
<?php
$server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
$server->on('open', function (Swoole\WebSocket\Server $server, $request)
{
echo "server: handshake success with fd{$request->fd}\n";
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame)
{
$redis = new co\redis();
$redis->connect('127.0.0.1', 6379);
if ($redis->subscribe(['channel'])) {
while ($msg = $redis->recv()) {
// msg是一个数组, 包含以下信息
// $type # 返回值的类型:显示订阅成功
// $name # 订阅的频道名字 或 来源频道名字
// $info # 目前已订阅的频道数量 或 信息内容
list($type, $name, $info) = $msg;
if ($type == 'subscribe') // 或psubscribe
{
// 频道订阅成功消息,订阅几个频道就有几条
} else {
if ($type == 'unsubscribe' && $info == 0) // 或punsubscribe
{
break; // 收到取消订阅消息,并且剩余订阅的频道数为0,不再接收,结束循环
} else {
if ($type == 'message') // 若为psubscribe,此处为pmessage
{
// 打印来源频道名字
//var_dump($name);
// 打印消息
var_dump($info);
// 处理消息
// balabalaba....
if($info=='cancel'){
$redis->unsubscribe([$name]);
}
}
}
}
}
}
});
$server->on('close', function ($ser, $fd)
{
echo "client {$fd} closed\n";
});
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response)
{
});
$process = new \Swoole\Process(function () use ($server)
{
$redis = new \Swoole\Coroutine\Redis();
$redis->connect('127.0.0.1', 6379);
while (true) {
foreach ($server->connections as $connection) {
$client = $server->getClientInfo($connection);
if (isset($client['websocket_status']) && $client['websocket_status'] == 3) {
$redis->publish('channel', 'cancel11111');
}
}
}
}, false, 1, true);
$server->addProcess($process);
$server->start();
我的理解是订阅 一个管道后,在管道放置json数据,里面多个字段比如 type消息类型 fd message 3个字段,
type =1 向单个fd推送消息
type=2 向所有fd推送,
而取消订阅 ,订阅 客户端的维护应该在db(mysql)层做,给websocket 只告诉 我要往谁推消息