c++ libevent 多进程socket惊群怎么处理?

当前我是在主进程初始化监听socket并监听,并初始化一个pthread_mutex_t存在共享内存中,然后fork子进程,在子进程中获取pthread_mutex_t后接受socket连接,最后释放锁,但有时不止一个子进程获得锁。

部分代码如下:

    struct event_base * base = NULL;
    pthread_mutex_t *g_lock=NULL; //多进程锁
    int listen_fd = -1;//多进程监听socket
    struct event ev_accept;
    ......
    
    int main(int argc,char** argv){
        if(!init_lock()){
            err(1,"init lock failed");
            return -1;
        }
        if(!init_listener()){
            err(1,"init listener failed");
            return -1;
        }
        pid_t pid =1;
        int fork_i=0;
        for(;fork_i<FORK_NUM;fork_i++){
            pid = fork();
            if(-1 == pid || 0 == pid){
                break;
            }
        }
        if(-1 == pid){
            err(1,"this children process: %d fail to fork",fork_i);
            exit(1);
        }
        else if(0 == pid){
            //子进程
            local_worker(fork_i);
            exit(0);
        }
        else{
            //主进程
            manager();
            exit(0);
        }
        return 0;
    }

    bool init_lock(){
        g_lock = (pthread_mutex_t*)mmap(NULL,sizeof(pthread_mutex_t),PROT_READ|PROT_WRITE,MAP_SHARED|MAP_ANON,-1,0);
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
        pthread_mutex_init(g_lock,&attr);
        if(g_lock == MAP_FAILED){
            return false;
        }
        return true;
    }
    bool try_lock(){
        if(0 == pthread_mutex_trylock(g_lock)){
            return true;
        }
        else{
            return false;
        }
    }
    void unlock(){
        pthread_mutex_unlock(g_lock);
    }

    void* local_worker(int worker_id,pid_t pid){
        ......
        struct event_base * base = p_work_info->base = event_base_new();

        /*初始化监听客户端连接事件*/
        event_set(&ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL);
        event_base_set(base,&ev_accept);
        event_add(&ev_accept,NULL);
        ......
        event_base_dispatch(base);
    }

    void on_accept(int fd, short ev, void *arg){
        if(!try_lock()){
            return;
        }
        int client_fd;
        struct sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);


        client_fd = accept(fd,(struct sockaddr*)&client_addr,&client_len);
        unlock();
        warn("~~~~~~~~~~~");
        warn("%d accept",getpid());
        if(client_fd < 0){
            warn("accept failed");
            return;
        }
        ......
    }

使用warn打印消息到终端,其中出现:

    ......
    client disconnected     #客户端断开连接时打印消息,之后客户端重新发起连接请求。
    libevent_worker: ~~~~~~~~~~~: Success
    libevent_worker: 12486 accept: Success
    libevent_worker: ~~~~~~~~~~~: Resource temporarily unavailable
    libevent_worker: 12485 accept: Resource temporarily unavailable
    libevent_worker: accept failed: Resource temporarily unavailable
    
    client disconnected
    libevent_worker: ~~~~~~~~~~~: Success
    libevent_worker: 12486 accept: Success
    ......

同一个连接请求有2个子进程获得了锁,第一个accept成功,第二个失败。也就是存在竞争时这个锁没起作用。
如何正确处理惊群呢?

完整代码如下:
在on_accept回调函数中打印了try_lock()成功和失败的消息,发现一次客户端连接请求各个子进程都触发一次或多次on_accept事件。

//libevent server sample on linux

/*socket*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
/**/
#include <iostream>
#include <err.h>    //err()
#include <string.h> //memset()
#include <fcntl.h>  //fcntl()
#include <pthread.h> //
#include <cstdlib>  //calloc()
#include <unistd.h> //fork(),close()
#include <sys/queue.h>  //链表
#include <errno.h>  //errno,ENTIR
#include <stdio.h>
#include <vector>
#include <string>
#include <sys/mman.h>//mmap
#include <sys/wait.h>

/*libevent*/
#include <event.h>

/*内存泄露*/
#include <mcheck.h>
/**/
#include "manage_protocol.h"

using namespace std;

//#define CONTROLLEN CMSG_LEN(sizeof(int))

#define SERVER_PORT 55541
#define LISTEN_NUM 32
#define FORK_NUM 4
#define BUF_LEN 1024
#define WRITE_BUF_LEN 1024*4


struct event_accepted{
    int fd;//socket fd
    int worker_id;  //记录所连客户端socket所属的工作者编号
    struct event* ev_read;
    struct event* ev_write;
    char* write_buf;
    int len;    //write_buf的长度
    int offset; //write_buf已写入后的偏移
    event_accepted(){
        ev_read = NULL;
        ev_write = NULL;
        write_buf = NULL;
        worker_id = -1;
        fd = -1;
        len = 0;
        offset = 0;
    }
};
struct socket_pair{
    int connecter;  //管理者监听工作者信息的socket
    int accepter;   //工作者监听管理者消息的socket
    socket_pair(){
        connecter =-1;
        accepter =-1;
    }
};
/*工作者结构*/
struct worker_info{
    int worker_id;                      //工作者编号
    //pthread_mutex_t g_clock;            //线程锁
    struct socket_pair socket_pairs;    //管理者与工作者通讯的事件socket对。
    struct event socket_pair_read_event;     //用于监听socket pair的事件(多进程中,管理者与工作者该事件对应的socket不同)
    struct event socket_pair_write_event;    //用于通知socket pair对端的事件(多进程中,管理者与工作者该事件对应的socket不同)
    struct event_base *base;            //工作者的event_base实例(多进程中,管理者与工作者event_base不同)
    char* write_buf;
    int len;    //write_buf的长度
    int offset; //write_buf已写入后的偏移
    pid_t pid;//记录子进程pid
    // TAILQ_ENTRY(event_accepted) entries;//工作者上注册的event的链表,用于回收event分配的内存
    worker_info(){
        worker_id = -1;
        write_buf = NULL;
        len = 0;
        offset = 0;
    }
};

//
vector<worker_info> v_global_workers;
struct event_base * base = NULL;
pthread_mutex_t *g_lock=NULL; //多进程锁
int listen_fd = -1;//多进程监听socket
struct event ev_accept;//多进程连接事件
struct event signal_child;

int setnonblock(int fd);
void on_accept(int fd, short ev, void *arg);
void worker_read(int fd, short ev, void *arg);
void worker_write(int fd, short ev, void *arg);
void* local_worker(int worker_id, pid_t pid = -1);
int socketpair_init();    //初始化本地socket连接用于通知本地工作者注册事件

void socketpair_worker_read(int fd, short ev, void *arg);//本地工作者接收管理者命令
void socketpair_worker_write(int fd, short ev, void *arg);//本地工作者向管理者发送状态等
void socketpair_manager_read(int fd, short ev, void *arg);
void socketpair_manager_write(int fd, short ev, void *arg);
void proc_socketpair_worker_read(int fd, char* data, int len);//
void proc_socketpair_manager_read(worker_info *worker,int fd, char* data, int len);//
void destroy();
void manager();//管理者监听连接事件(在管理者注册完所有与工作者相关事件后进行)
bool init_lock();
bool try_lock();
void unlock();
bool init_listener();
void signal_child_cb(int fd,short ev,void *arg);

int main(int argc,char** argv){
    setenv("MALLOC_TRACE","mtrace.out",1);
    mtrace();

    //初始化本地工作者注册通知socket
    if(socketpair_init() <0){
        err(1,"init socketpair_init failed");
    }
    if(!init_lock()){
        err(1,"init lock failed");
        return -1;
    }
    if(!init_listener()){
        err(1,"init listener failed");
        return -1;
    }
    pid_t pid =1;
    int fork_i=0;
    for(;fork_i<FORK_NUM;fork_i++){
        //v_global_workers[fork_i].worker_id=fork_i;
        pid = fork();
        if(-1 == pid || 0 == pid){
            v_global_workers[fork_i].pid=getpid();
            break;
        }
        v_global_workers[fork_i].pid=pid;
    }
    if(-1 == pid){
        err(1,"this children process: %d fail to fork",fork_i);
        exit(1);
    }
    else if(0 == pid){
        //子进程
        local_worker(fork_i);
        exit(0);
    }
    else{
        //主进程
        manager();
        exit(0);
    }
    return 0;
}
bool init_lock(){
    g_lock = (pthread_mutex_t*)mmap(NULL,sizeof(pthread_mutex_t),PROT_READ|PROT_WRITE,MAP_SHARED|MAP_ANON,-1,0);
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(g_lock,&attr);
    if(g_lock == MAP_FAILED){
        return false;
    }
    return true;
}
bool try_lock(){
    if(0 == pthread_mutex_trylock(g_lock)){
        return true;
    }
    else{
        return false;
    }
}
void unlock(){
    pthread_mutex_unlock(g_lock);
}
bool init_listener(){
    /*初始化监听socket*/
    struct sockaddr_in listen_addr;
    int reuseaddr_on =1;
    listen_fd =socket(AF_INET,SOCK_STREAM,0);
    if(listen_fd<0){
        err(1,"listen failed");
        return false;
    }
    if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr_on,sizeof(reuseaddr_on)) <0){
        err(1,"setsockopt failed");
        return false;
    }
    memset(&listen_addr,0,sizeof(listen_addr));
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = INADDR_ANY;
    listen_addr.sin_port =htons(SERVER_PORT);
    if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr))<0){
        err(1,"bind failed");
        return false;
    }
    if(listen(listen_fd,LISTEN_NUM)<0){
        err(1,"listen failed");
        return false;
    }
    if(setnonblock(listen_fd)<0){
        err(1,"set server socket to non-blocking failed");
        return false;
    }
    return true;
}
void manager(){
    base = event_init();
    for(int i=0;i<v_global_workers.size();i++){
        /*初始化管理者写事件内存*/
        v_global_workers[i].base = base;
        v_global_workers[i].write_buf = new char[WRITE_BUF_LEN];
        v_global_workers[i].len = 0;
        v_global_workers[i].offset = 0;

        /*初始化管理者读写事件并注册读事件(写事件发送时再注册)*/
        event_set(  &v_global_workers[i].socket_pair_read_event,
                    v_global_workers[i].socket_pairs.connecter,
                    EV_READ|EV_PERSIST,
                    socketpair_manager_read,
                    &v_global_workers[i]);
        event_add(&v_global_workers[i].socket_pair_read_event,NULL);
        event_set(  &v_global_workers[i].socket_pair_write_event,
                    v_global_workers[i].socket_pairs.connecter,
                    EV_WRITE,
                    socketpair_manager_write,
                    &v_global_workers[i]);
    }
    // /*初始化监听客户端连接事件*/
    // event_set(&ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL);
    // event_add(&ev_accept,NULL);

    //注册监听子进程退出信号事件
    event_set(&signal_child,SIGCHLD,EV_SIGNAL|EV_PERSIST,signal_child_cb,&signal_child);
    event_add(&signal_child,NULL);

    event_dispatch();
}

void on_accept(int fd, short ev, void *arg){
    if(!try_lock()){
        //warn("%d:lock failed",getpid());
        return;
    }
    //warn("%d:lock success",getpid());
    int worker_id =0;
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    struct worker_info *p_work_info;
    struct event_accepted *client;
    
    client_fd = accept(fd,(struct sockaddr*)&client_addr,&client_len);
    unlock();
    warn("~~~~~~~~~~~");
    warn("%d accept",getpid());
    if(client_fd < 0){
        warn("accept failed");
        return;
    }
    


    p_work_info = &v_global_workers[0];
    client = (struct event_accepted*)calloc(1,sizeof(event_accepted));
    if(NULL == client){
        err(1,"on_accept malloc event_accepted failed");

    }
    client->fd = client_fd;
    client->ev_write = new event;
    client->ev_read = new event;
    client->write_buf = new (nothrow) char[WRITE_BUF_LEN];
    client->len = 0;
    client->offset = 0;
    if(client->ev_read == NULL || NULL == client->ev_write || NULL == client->write_buf){
        if(!client->ev_read){
            delete client->ev_read;
        }
        if(!client->ev_write){
            delete client->ev_write;
        }
        if(!client->write_buf){
            delete[] client->write_buf;
        }
        free(client);
        err(1,"alloc read event failed");
    }
    if(setnonblock(client_fd)<0){
        warn("failed to set client socket non-blocking");
        close(client_fd);

    }
    //初始化事件
    event_set(client->ev_read,client_fd,EV_READ|EV_PERSIST,worker_read,client);
    event_base_set(p_work_info->base,client->ev_read);

    event_set(client->ev_write,client_fd,EV_WRITE,worker_write,client);
    event_base_set(p_work_info->base,client->ev_write);

    //工作者注册读事件,写事件在需要时再注册
    event_add(client->ev_read,NULL);
    
    return;
}

/*初始化本地socket连接用于通知本地工作者注册事件*/
int socketpair_init(){
    for(int i=0;i<FORK_NUM;i++){
        struct worker_info temp_worker;
        temp_worker.worker_id = i;  //记录本地工作进程编号

        //socket_pairs
        int fd_1[2];
        int status_1 = socketpair(AF_UNIX, SOCK_STREAM, 0, fd_1);
        if (status_1 < 0) {
            printf("Call socketpair error, errno is %d\n", errno);
            goto fail;
        }
        temp_worker.socket_pairs.connecter=fd_1[0];
        temp_worker.socket_pairs.accepter= fd_1[1];
        
        //
        if(setnonblock(temp_worker.socket_pairs.accepter)<0){
            warn("failed to set client socket non-blocking");
        }
        if(setnonblock(temp_worker.socket_pairs.connecter)<0){
            warn("failed to set client socket non-blocking");
        }
        v_global_workers.push_back(temp_worker);
    }
    return 0;

    fail:
    warn("socket pair init failed");
    while(v_global_workers.empty()){
        worker_info temp = v_global_workers.back();
        v_global_workers.pop_back();
        if(temp.socket_pairs.connecter >0){
            close(temp.socket_pairs.connecter);
        }
        if(temp.socket_pairs.accepter >0){
            close(temp.socket_pairs.accepter);
        }
    }
    return -1;
}

int setnonblock(int fd)
{
    int flags;

    flags = fcntl(fd, F_GETFL);
    if (flags < 0)
        return flags;
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) < 0)
        return -1;

    return 0;
}

void socketpair_manager_read(int fd, short ev, void *arg){
    int buf_size = 1024;
    char buf[buf_size];
    memset(buf,0,buf_size);
    uint16_t data_len = 0;
    int recv_len = 0;
    
    while(1){
        int size =read(fd,buf+recv_len,buf_size-recv_len);//recv(fd,buf+recv_len,buf_size-recv_len,0);
        if(0 == size){
            //worker与管理者断开连接
            cout<<"worker disconnected"<<endl;
            worker_info *worker = (worker_info*)arg;
            vector<worker_info>::iterator it=v_global_workers.begin();
            for(;it!=v_global_workers.end();it++){
                if(fd == (*it).socket_pairs.connecter){
                    break;
                }
            }
            event_del(&worker->socket_pair_read_event);
            event_del(&worker->socket_pair_write_event);
            delete[] worker->write_buf;
            close(worker->socket_pairs.connecter);
            v_global_workers.erase(it);
            return;
        }
        else if(size < 0){
            //
            cout<<"socekt failure,disconnecting worker:"<<strerror(errno)<<endl;
            worker_info *worker = (worker_info*)arg;
            vector<worker_info>::iterator it=v_global_workers.begin();
            for(;it!=v_global_workers.end();it++){
                if(fd == (*it).socket_pairs.connecter){
                    break;
                }
            }
            event_del(&worker->socket_pair_read_event);
            event_del(&worker->socket_pair_write_event);
            delete[] worker->write_buf;
            close(worker->socket_pairs.connecter);

            v_global_workers.erase(it);
            return;
        }
        else{
            recv_len += size;
        }
        while(recv_len >2){
            if(0x0b != buf[0] || 0x0d != buf[1]){
                recv_len--;
                memmove(buf,buf+1,recv_len);
                continue;
            }
            if(recv_len > 4){
                data_len = *(uint16_t*)(buf+2);

                if(recv_len < data_len +4){
                    //一条信息未读取完全,继续读取
                    break;
                }
                proc_socketpair_manager_read((worker_info*)arg,fd, buf+4, data_len);
                recv_len = recv_len - 4 - data_len;
                memmove(buf,buf+4+data_len,recv_len);
            }
            else{
                break;
            }

        } 
        //退出
        if(0 == recv_len){
            break;
        }
    }
}

void proc_socketpair_manager_read(worker_info *worker,int fd, char* data, int len){
    char protocol = *data;
    string log_str="";
    switch(protocol){
        default:
            break;
    }
}

void socketpair_manager_write(int fd, short ev, void *arg){
    struct worker_info* client = (worker_info*)arg;
    int len =0;
    len = write(fd,client->write_buf+client->offset,client->len-client->offset);
    if(len == -1){
        //写操作被信号打断或不能写入,重新注册写事件
        if(errno == EINTR || errno == EAGAIN){
            event_add(&(client->socket_pair_write_event),NULL);
            return;
        }
        else{
            err(1,"write error");
        }
    }
    else if((client->offset + len) <client->len){
        //数据没有完全写入
        int offset = client->offset +len;
        memmove(client->write_buf,client->write_buf+offset,client->len-offset);
        client->offset = 0;
        client->len -= offset;
        event_add(&(client->socket_pair_write_event),NULL);
        return;
    }
    else{
        //写入完成
        client->offset = client->len = 0;
    }
}

void* local_worker(int worker_id,pid_t pid){
    //重启子进程时退出父进程的事件循环
    if(NULL != base){
        event_base_loopexit(base,NULL);
    }
    vector<worker_info>::iterator it=v_global_workers.begin();
    if(pid>0){
        //子进程退出,重新fork
        for(;it!=v_global_workers.end();it++){
            if((*it).pid == pid){
                (*it).pid = getpid();
                break;
            }
        }
    }
    else{
        it = v_global_workers.begin()+worker_id;
    }
    
    //多进程本地工作者删除其他工作者的信息,只保留本工作者的信息
    struct worker_info temp = *it;
    close(temp.socket_pairs.connecter);
    temp.socket_pairs.connecter = -1;
    v_global_workers.erase(it);
    for(int i=0;i<v_global_workers.size();i++){
        close(v_global_workers[i].socket_pairs.connecter);
        close(v_global_workers[i].socket_pairs.accepter);
    }
    v_global_workers.clear();
    v_global_workers.push_back(temp);

    struct worker_info *p_work_info = &v_global_workers[0];
    //为写事件分配缓冲内存
    p_work_info->write_buf = new char[WRITE_BUF_LEN];
    // struct event_config *cfg = event_config_new();
    // event_config_avoid_method(cfg,"epoll");
    // struct event_base * base = event_base_new_with_config(cfg);
    // event_config_free(cfg);

    //局部event_base* base区别与全局base,避免父进程读写事件污染子进程事件
    struct event_base * base = p_work_info->base = event_base_new();

    /*初始化监听客户端连接事件*/
    event_set(&ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL);
    event_base_set(base,&ev_accept);
    event_add(&ev_accept,NULL);

    event_set(&p_work_info->socket_pair_read_event,p_work_info->socket_pairs.accepter,EV_READ|EV_PERSIST,socketpair_worker_read,NULL);
    event_base_set(base,&p_work_info->socket_pair_read_event);
    event_add(&p_work_info->socket_pair_read_event,NULL);

    event_set(&p_work_info->socket_pair_write_event,p_work_info->socket_pairs.accepter,EV_WRITE,socketpair_worker_write,NULL);
    event_base_set(base,&p_work_info->socket_pair_write_event);
    //event_add(&p_work_info->socket_pair_write_event,NULL);
    event_base_dispatch(base);
}

void socketpair_worker_write(int fd, short ev, void *arg){
    struct worker_info* client = &v_global_workers[0];
    int len =0;
    len = write(fd,client->write_buf+client->offset,client->len-client->offset);
    if(len == -1){
        //写操作被信号打断或不能写入,重新注册写事件
        if(errno == EINTR || errno == EAGAIN){
            event_add(&(client->socket_pair_write_event),NULL);
            return;
        }
        else{
            err(1,"write error");
        }
    }
    else if((client->offset + len) <client->len){
        //数据没有完全写入
        int offset = client->offset +len;
        memmove(client->write_buf,client->write_buf+offset,client->len-offset);
        client->offset = 0;
        client->len -= offset;
        event_add(&(client->socket_pair_write_event),NULL);
        return;
    }
    else{
        //写入完成
        client->offset = client->len = 0;
    }
}

void socketpair_worker_read(int fd, short ev, void *arg){
    int buf_size = 1024;
    char buf[buf_size];
    memset(buf,0,buf_size);
    uint16_t data_len = 0;
    int recv_len = 0;
    
    while(1){
        int size =read(fd,buf+recv_len,buf_size-recv_len);//recv(fd,buf+recv_len,buf_size-recv_len,0);
        if(0 == size){
            //管理者与工作者断开连接
            cout<<"manager disconnection"<<endl;
            worker_info *worker = &v_global_workers[0];

            event_del(&worker->socket_pair_read_event);
            event_del(&worker->socket_pair_write_event);
            delete[] worker->write_buf;
            close(worker->socket_pairs.accepter);

            //TODO因与管理者断开联系,结束该本地工作者进程
            exit(1);
            return;
        }
        else if(size < 0){
            //
            cout<<"socekt failure,disconnecting manager:"<<strerror(errno)<<endl;
            worker_info *worker = &v_global_workers[0];

            event_del(&worker->socket_pair_read_event);
            event_del(&worker->socket_pair_write_event);
            delete[] worker->write_buf;
            close(worker->socket_pairs.accepter);

            //TODO因与管理者断开联系,结束该本地工作者进程
            exit(1);
            return;
        }
        else{
            recv_len += size;
        }
        while(recv_len >2){
            if(0x0b != buf[0] || 0x0d != buf[1]){
                recv_len--;
                memmove(buf,buf+1,recv_len);
                continue;
            }
            if(recv_len > 4){
                data_len = *(uint16_t*)(buf+2);
                if(recv_len < data_len +4){
                    //一条信息未读取完全,继续读取
                    break;
                }
                proc_socketpair_worker_read(fd, buf+4, data_len);
                recv_len = recv_len - 4 - data_len;
                memmove(buf,buf+4+data_len,recv_len);
            }
            else{
                break;
            }
        } 
        //退出
        if(0 == recv_len){
            break;
        }
    }
    
}

void proc_socketpair_worker_read(int fd, char* data, int len){
    char protocol = *data;
    switch(protocol){
        default:
            break;
    }
}

void worker_read(int fd, short ev, void *arg){
    struct event_accepted *client = (event_accepted*)arg;
    char* buf = new char[BUF_LEN];
    if(buf == NULL){
        err(1,"malloc failed");
    }
    int recv_len = read(fd,buf,BUF_LEN);

    if(recv_len == 0){
        //客户端断开连接
        cout<<endl;
        cout<<"client disconnected"<<endl;
        
        event_del(client->ev_read);
        event_del(client->ev_write);
        delete client->ev_write;
        delete client->ev_read;
        delete client->write_buf;
        close(fd);
        free(client);
        delete[] buf;
        return;
    }
    else if(recv_len < 0){
        cout<<endl;
        cout<<"socekt failure,disconnecting client:"<<strerror(errno)<<endl;
        
        event_del(client->ev_read);
        event_del(client->ev_write);
        delete client->ev_write;
        delete client->ev_read;
        delete client->write_buf;
        close(fd);
        free(client);
        delete[] buf;
        return;
    }
    //TODO 1.需要处理数组越界问题;2.这里只是一问一答的情况,如果需要主动推送消息需要增加设计;
    //TODO 1.在同一个文件描述符上如果多次注册同一事件会发生什么?这里改用链表记录需要发送的数据可能会好一点,
    memcpy(client->write_buf+client->len,buf,recv_len);
    client->len += recv_len;
    event_add(client->ev_write,NULL);
    delete[] buf;
}

void worker_write(int fd, short ev, void *arg){
    struct event_accepted* client = (event_accepted*)arg;
    int len =0;
    len = write(fd,client->write_buf+client->offset,client->len-client->offset);
    if(len == -1){
        //写操作被信号打断或不能写入,重新注册写事件
        if(errno == EINTR || errno == EAGAIN){
            event_add(client->ev_write,NULL);
            return;
        }
        else{
            err(1,"write error");
            
            event_del(client->ev_read);
            event_del(client->ev_write);
            delete client->ev_write;
            delete client->ev_read;
            delete client->write_buf;
            close(fd);
            free(client);
        }
    }
    else if((client->offset + len) <client->len){
        //数据没有完全写入
        int offset = client->offset +len;
        memmove(client->write_buf,client->write_buf+offset,client->len-offset);
        client->offset = 0;
        client->len -= offset;
        event_add(client->ev_write,NULL);
        return;
    }
    else{
        //写入完成
        client->offset = client->len = 0;
    }
}

void signal_child_cb(int fd,short ev,void *arg){
    int stat;
    pid_t pid = waitpid(-1,&stat,WNOHANG);
    if(pid > 0){
        pid_t pid_1 = fork();
        if(0 == pid_1){
            local_worker(-1,pid);
            exit(0);
        }
    }
}
阅读 1k
评论
    1 个回答
    • 4.8k

    看起来没有问题,你能贴上完整的代码吗?

    这是个完整的示例,监听地址为 "0.0.0.0:8080"

    // -lpthread -levent
    #include <errno.h>
    #include <event.h>
    #include <fcntl.h>
    #include <netinet/in.h>
    #include <netinet/ip.h>
    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/mman.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <unistd.h>
    
    void info(const char* fmt, ...);
    void assert(int cond, const char* msg);
    int get_listener();
    pthread_mutex_t* get_mutex();
    int run_child_process(int fd, pthread_mutex_t* mutex);
    void on_event(int fd, short events, void* udata);
    
    int main()
    {
        int lfd = get_listener();
        pthread_mutex_t* mutex = get_mutex();
        (void)lfd;
        (void)mutex;
    
        for (int i = 0; i < 2; i++) {
            pid_t pid = fork();
            assert(-1 != pid, "fork()");
            if (!pid)
                exit(run_child_process(lfd, mutex));
        }
    
        sleep(3600);
    }
    
    void info(const char* fmt, ...)
    {
        va_list va;
    
        printf("Info: pid=%d, ", getpid());
        va_start(va, fmt);
        vprintf(fmt, va);
        va_end(va);
        printf("\n");
    }
    
    void assert(int cond, const char* msg)
    {
        if (!cond) {
            fprintf(
                stderr, "Fatal: %s, errno=%d, %s\n", msg, errno, strerror(errno));
            exit(1);
        }
    }
    
    int get_listener()
    {
        int fd = socket(AF_INET, SOCK_STREAM, 0);
        assert(-1 != fd, "socket()");
    
        struct sockaddr_in addr;
        memset(&addr, 0, sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_port = htons(8080);
        assert(-1 != bind(fd, (struct sockaddr*)&addr, sizeof(addr)), "bind()");
    
        assert(-1 != listen(fd, 8), "listen()");
        return fd;
    }
    
    pthread_mutex_t* get_mutex()
    {
        void* ptr = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
            MAP_SHARED | MAP_ANON, -1, 0);
        assert(MAP_FAILED != ptr, "mmap()");
    
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
        pthread_mutex_init((pthread_mutex_t*)ptr, &attr);
        return (pthread_mutex_t*)ptr;
    }
    
    int run_child_process(int fd, pthread_mutex_t* mutex)
    {
        struct event_base* base = event_base_new();
        assert(NULL != base, "event_base_new()");
    
        struct event* e
            = event_new(base, fd, EV_READ | EV_PERSIST, on_event, mutex);
        assert(NULL != e, "event_new()");
        assert(-1 != event_add(e, NULL), "event_add()");
    
        assert(-1 != event_base_dispatch(base), "event_base_dispatch()");
        return 0;
    }
    
    void on_event(int fd, short events, void* udata)
    {
        info("events=%d", events);
        if (!(events & EV_READ))
            return;
    
        pthread_mutex_t* mutex = (pthread_mutex_t*)udata;
        if (pthread_mutex_trylock(mutex))
            return;
    
        struct sockaddr_in addr;
        socklen_t len = sizeof(addr);
        int fd2 = accept(fd, (struct sockaddr*)&addr, (socklen_t*)&len);
        if (fd2 == -1) {
            info("accept failed: %s", strerror(errno));
        } else {
            close(fd2);
            info("accept succ: fd2=%d", fd2);
        }
    
        assert(0 == pthread_mutex_unlock(mutex), "unlock");
    }

    输出如

    Info: pid=2, events=2
    Info: pid=3, events=2
    Info: pid=3, events=2
    Info: pid=3, events=2
    Info: pid=3, events=2
    Info: pid=2, accept succ: fd2=31
      撰写回答

      登录后参与交流、获取后续更新提醒

      相似问题
      推荐文章