多线程libevent在Windows环境下高延迟(200ms)和低传输量(8MB/s)的解决策略?

新手上路,请多包涵

问题:多线程libevent,网络延迟高(200ms),每秒数据传输量小(8MB/s)。

libevent,多线程,TCP。
环境:windows,局域网,两台电脑网线直连。
服务端机器:win11,12核心20G内存,千兆网卡,千兆网线。
客户端机器:win10,8核心16G内存,千兆网卡,千兆网线。

服务端架构:
总共8个线程。
第1个线程,1个事件循环,只用于监听连接,监听到连接将fd通过管道均衡转发给其他线程。
第2-4线程,3个线程,3个事件循环,每个线程各1各事件循环event_base,这3个专门用于做数据接收。从管道中接收连接,将连接注册到本线程的数据接收事件。接收到数据以后通过无锁队列转发给工作线程。
第5-8个线程,无事件循环,通过无锁队列接收到数据以后,对数据简单进行处理(分包),转发回客户端。

客户端架构:
自己编写,使用go语言。
程序开始设置建立连接的数量,每个连接发送多少包。
假设建立15000个连接,每个连接发送800个包,每个包128字节。
创建5w个协程,拥有断线重连机制。
向服务端发送128字节的数据,再立马接收到以后,将收发时间差存入日志文件。

通过对服务端使用LukeStackWalker进行性能分析,发送NtWaitForSingle这个底层函数的cpu耗时最高,推测是libevent内部的信号量。我自己服务端代码未使用过任何信号量和阻塞式操作。

CPU占用率很高,但是运行效率低下,很奇怪。

目前已知的问题,存日志这个操作对延迟影响比较大,但是也只能节省100ms.
希望能够将本地测试的网络延迟降到50ms以内,每秒稳定传输50MB以上。
主要特别是网络延迟这个问题,希望能够得到解答。

服务端代码:

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/thread.h>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <cstring>
#include <Windows.h>
#include "concurrentqueue.h"
#include <event2/event_compat.h>
//#include <arpa/inet.h>  //linux socket
#pragma comment(lib, "wsock32.lib")


const int NUM_DATA_PROCESSORS = 3; // 事件循环2-4
const int NUM_WORKERS = 4;         // 工作线程数量
const int PORT = 8000;

// 任务类型
struct Task {
    evutil_socket_t fd;
    std::string data;
    Task()= default;
    Task(evutil_socket_t _fd,std::string _data):fd(_fd),data(_data){ }
};


struct eBase_data {
    std::thread thread;
    event_base *base{};
    event *notify_event{};
    evutil_socket_t notify_receive_fd;
    evutil_socket_t notify_send_fd;
};

// 全局任务队列和同步机制
moodycamel::ConcurrentQueue<Task>  task_queue;

std::atomic<size_t> aliveSocket(0);
std::atomic<size_t> recvSize(0);
std::atomic<size_t> sendSize(0);

// 工作线程函数
[[noreturn]] void worker_thread() {
    while(true) {
        Task task;
        bool isGet=task_queue.try_dequeue(task);
        if(!isGet)continue;
        // 处理任务
        // 处理和发送数据的逻辑
//        bufferevent_write(bev, msg, strlen(msg));
        for(int i=0;i<task.data.size()/128;i++)
        {
            int res=send(task.fd,task.data.c_str()+i*128,128,0);
            if(res<0){
                //发送失败
            }else sendSize+=res;
        }
    }
}

// 数据接收回调函数
void read_cb(bufferevent *bev, void *ctx) {
    struct evbuffer *input = bufferevent_get_input(bev);
    size_t len = evbuffer_get_length(input);
    size_t size=len/128 *128;
    std::string data;
    data.resize(size);
    len=bufferevent_read(bev,(char*)data.c_str(),size);
    if(len>0){
        recvSize+=len;
        {
            Task tk(bufferevent_getfd(bev),data);
            task_queue.enqueue(tk);
        }
    }
}

void event_cb(bufferevent *bev, short event, void *pVoid){
    if (event & BEV_EVENT_EOF | BEV_EVENT_ERROR) {
        aliveSocket--;
        bufferevent_free(bev);
    }
//    printf("Got an error on the connection: %s\n",strerror(errno));
}

//sock传输事件
void worker_notify_cb(evutil_socket_t fd, short what, void *arg) {
    auto ebDt = static_cast<eBase_data *>(arg);
     evutil_socket_t client_fd;

    if (recv(fd, reinterpret_cast<char *>(&client_fd), sizeof(client_fd), 0) <= 0) {
        perror("recv");
        return;
    }

    bufferevent *bev = bufferevent_socket_new(ebDt->base, client_fd, BEV_OPT_CLOSE_ON_FREE);
    if (!bev) {
        std::cerr << "Error constructing bufferevent!" << std::endl;
        evutil_closesocket(client_fd);
        return;
    }
//    event_set(ebDt->base,fd,EV_READ | EV_PERSIST, read_cb,NULL);
    evbuffer_expand(bufferevent_get_input(bev), 32 * 1024);  // 扩展缓冲区到 20K
    bufferevent_setcb(bev, read_cb, nullptr, event_cb, nullptr);
    bufferevent_enable(bev, EV_READ | EV_PERSIST);//EV_WRITE);
    struct timeval tv = {1,0};
    bufferevent_set_timeouts(bev,&tv, nullptr);
}

//数据接收事件循环线程
void data_thread_func(void* arg){

    auto eb=(eBase_data*)arg;
    eb->base = event_base_new();
    if (!eb->base) {
        std::cerr << "Could not initialize worker thread base!" << std::endl;
        return;
    }
    eb->notify_event = event_new(eb->base, eb->notify_receive_fd, EV_READ | EV_PERSIST, worker_notify_cb, eb);
    event_add(eb->notify_event, nullptr);

    event_base_dispatch(eb->base);

    event_free(eb->notify_event);
    event_base_free(eb->base);
}

// 连接回调函数
void accept_conn_cb(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) {
    static int next_processor = 0;
//    struct event_base *base = evconnlistener_get_base(listener);
    auto dBases= (eBase_data*)ctx;

    // 选择下一个数据处理事件循环
    eBase_data& eb = (dBases)[next_processor];

    if (send(eb.notify_send_fd, reinterpret_cast<char *>(&fd), sizeof(fd), 0) != sizeof(fd)) {
        perror("send");
        evutil_closesocket(fd);
    }

    next_processor=(next_processor + 1) % NUM_DATA_PROCESSORS;

    aliveSocket++;
}

// 连接错误回调函数
void accept_error_cb(evconnlistener *listener, void *ctx) {
    event_base *base = static_cast<event_base *>(ctx);
    int err = EVUTIL_SOCKET_ERROR();
    std::cerr << "Got an error on the listener: " << evutil_socket_error_to_string(err) << std::endl;
    event_base_loopexit(base, NULL);
}

// 事件循环线程函数
void accept_thread_func(eBase_data *ctx) {
    event_base *accept_base = event_base_new();
    if (!accept_base) {
        std::cerr << "Could not initialize libevent!" << std::endl;
        return;
    }
    // 监听端口8000
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    sin.sin_port = htons(PORT);

    evconnlistener *listener = evconnlistener_new_bind(accept_base, accept_conn_cb,ctx,
                                                       LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
                                                       (sockaddr *)&sin, sizeof(sin));
    if (!listener) {
        std::cerr << "Could not create a listener on port "<<PORT<< std::endl;
        return;
    }
    evconnlistener_set_error_cb(listener, accept_error_cb);

    event_base_dispatch(accept_base);

    evconnlistener_free(listener);
    event_base_free(accept_base);
}

int main(int argc, char **argv) {
#ifdef WIN32
    WSADATA wsaData;
    int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (result != 0) {
        std::cout << "WSAStartup failed: " << result << std::endl;
        exit(1);
    }
    evthread_use_windows_threads();
#else
    evthread_use_pthreads();
#endif
    // 创建线程池-工作线程
    std::vector<std::thread> worker_threads;
    for (int i = 0; i < NUM_WORKERS; ++i) {
        worker_threads.emplace_back(worker_thread);
    }
    // 创建线程池-数据接收
    eBase_data data_ebs[NUM_DATA_PROCESSORS];
    for (int i = 0; i < NUM_DATA_PROCESSORS; ++i) {
        evutil_socket_t fds[2];
        if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) {
            perror("socketpair");
            return 1;
        }
        data_ebs[i].notify_receive_fd = fds[0];
        data_ebs[i].notify_send_fd = fds[1];
        data_ebs[i].thread = std::thread(data_thread_func,&data_ebs[i]);
    }

    // 创建主事件循环
    std::thread accept_thread(accept_thread_func,data_ebs);

    //数据监测
    std::thread([&](){
        for(int i=0;i<1200;i++){
            std::this_thread::sleep_for(std::chrono::seconds(1));
            size_t as = aliveSocket.load(std::memory_order_acq_rel);
            std::cout<<"sock:"<<as<<" send:"<<sendSize<<" recv:"<<recvSize<<std::endl;
        }
    }).detach();

    // 等待所有线程结束
    accept_thread.join();
    for (auto &t : data_ebs) {
        t.thread.join();
    }
    for (auto &t : worker_threads) {
        t.join();
    }

    // 释放资源
    for (auto &base : data_ebs) {
        event_base_free(base.base);
    }
#ifdef WIN32
    WSACleanup();
#endif
    return 0;
}

客户端代码

package main

import (
    "crypto/md5"
    "fmt"
    "io"

    //"google.golang.org/protobuf/proto"
    "net"
    "strconv"
    "strings"
    "sync"
    "time"
)
var numAlrdSend AtomicInt
var byteAlrd AtomicInt
var threadAlrd AtomicInt
var numSocket AtomicInt

func dialServer(serverAddr string) (net.Conn, error) {

    for {
        conn, err := net.Dial("tcp", serverAddr)
        if err == nil {
            return conn,nil
        } else {
            //fmt.Printf("Failed to connect to the server: %v.\n", err)
            //time.Sleep(1 * time.Second) // 等待1秒后重试
        }
    }
}

//time.Sleep(1 * time.Second)
////recv
//buffer := make([]byte, 100)
//_, err = conn.Read(buffer)
//if err != nil {
//    fmt.Printf("Recv Failed: %v\n", err)
//    return
//}

var logger *Logger


func communicateWithServer(conn net.Conn,num int,mSend *int)bool{
    data := make([]byte, 128) // 初始数据量为1KB
    //fullPack(num,data)
    //fullPack(num,*mSend,data)
    fullpacksmall(data)
    //data[0]=0x12
    for *mSend>0 {
        //time.Sleep(time.Millisecond*300)
        a:=time.Now().UnixMilli()
        n, err := conn.Write(data)
        if err != nil {
            //fmt.Printf("Send Failed : %v\n", err)
            return false
        }
        byteAlrd.IncNum(n)
        //fmt.Println("send yes.")
        n, err = conn.Read(data)
        if err != nil {
            return false
        }
        numAlrdSend.Inc()
        b:=time.Now().UnixMilli()
        logger.log(int(b - a))
        //logMsg(strconv.Itoa(int(b - a)))
        *mSend--
    }
    return true
}

func fullpacksmall(data []byte)  {
    // 定义填充的字符集
    str := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    // 填充剩余的data空间
    size:=0
    count:=(128)/len(str) //计算完整填充次数
    for i:=0;i<count;i++ {
        copy(data[size:], str)
        size+=len(str)
    }
    copy(data[size:], "FEFF")
}

func fullPackOld(num int, data []byte) {
    str := strconv.Itoa(num) // 将数字转换为字符串
    // 确保字符串长度为5,不足部分用空格填充
    if len(str) < 5 {
        str = strings.Repeat("0", 5-len(str)) + str
    }
    str+=":"
    copy(data, str)
    size:=len(str)
    // 定义填充的字符集
    str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    // 填充剩余的data空间
    count:=(1024-size)/len(str) //计算完整填充次数
    for i:=0;i<count;i++ {
        copy(data[size:], str)
        size+=len(str)
    }
    copy(data[size:], str[:1024-size])
}

//"{clnId}:{packId}:{timeStamp}:{mdFive}:{data}\r\n\r\n\r\n"
//00001:0028:1720081312639:ABCDEFGH....\r\n\r\n\r\n
// 生成数据填充字符串的函数
func fullPack(clnId int, packId int, data []byte) {
    // 获取当前时间戳
    timeStamp := time.Now().UnixMilli()

    // 生成基础字符串
    baseStr := fmt.Sprintf("%05d:%04d:%d", clnId, packId, timeStamp)

    // 计算需要填充的字符数
    abc := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    fillLength := 1024 - len(baseStr) -34

    // 用 data 循环填充不足的部分
    fillStr := ""
    abcLen := len(abc)
    for i := 0; i < fillLength/abcLen; i++ {fillStr+=abc}
    fillStr+=abc[0:fillLength%abcLen]
    // 计算 data 的 md5 校验值
    md5Hash := md5.Sum([]byte(fillStr))

    final:=fmt.Sprintf("%s:%x:%s",baseStr,md5Hash,fillStr)

    copy(data, final)
}

func isConnAlive(conn net.Conn) bool {
    one := []byte{}
    conn.SetReadDeadline(time.Now().Add(1 * time.Second))
    var err error
    if _, err = conn.Read(one); err == nil || err == io.EOF {
        return false
    }
    if netErr, ok:= err.(net.Error); ok && netErr.Timeout() {
        return true
    }
    return false
}

func main(){
    //logger=NewLogger("info_1.log")
    //serverAddr    := "192.168.1.231:8000"
    serverAddr    := "192.168.1.1:8000"
    //serverAddr    := "111.230.37.164:8000"
    numClients    := 30000
    numSend        := 800
    numLog=numClients*numSend
    //var conns sync.Map
    logger=NewLogger("info_1.log")
    var wgMain sync.WaitGroup
    // 初始化日志
    //捕获全局错误
    defer func() {
        if r := recover(); r != nil {
            // 处理错误,例如记录日志
            fmt.Println("Recovered in panic:", r)
        }
    }()
    //开始通信
    for i := 1; i <=numClients; i++ {
        go func(clientID int) {
            var conn net.Conn
            mSend:=numSend
            wgMain.Add(1)
            for{
                conn,_ = dialServer(serverAddr)
                //conns.Store(clientID,conn)
                ////模拟与服务器通信
                //time.Sleep(time.Second*3)
                if communicateWithServer(conn,clientID,&mSend){
                    threadAlrd.Inc()
                    break
                }
                numSocket.Inc()

                //threadAlrd.Inc()
                //for ;isConnAlive(conn);{}
                //threadAlrd.Dec()
                //conn.Close()
            }
            //time.Sleep(time.Second*10)
            _ = conn.Close()
            wgMain.Done()
        }(i)
    }
    //每3秒输出一次统计数据
    go func() {
        wgMain.Add(1)
        tal:=numSend*numClients
        lastByte:=0
        useTime:=0
        a:=time.Now().UnixMilli()
        for {
            time.Sleep(3 * time.Second)
            fmt.Println("===============================================")

            totalSend := numAlrdSend.Value()
            persent:=float64(totalSend)/float64(tal)
            fmt.Printf("sendCount: %d / %d  rate:%.2f %%\n",totalSend,tal,persent*100)

            balrd:=byteAlrd.Value()
            bper:=float64(balrd)/float64(tal*128)
            speed:=float64(balrd-lastByte)/3/1024/1024

            fmt.Printf("sendByte: %d / %d speed:%.2fMB/s rate:%.2f %%\n",balrd,tal*128,speed,bper*100)
            lastByte=balrd

            talrd:=threadAlrd.Value()
            tper:=float64(talrd)/float64(numClients)
            fmt.Printf("thread:%d / %d  rate:%.2f %%\n",talrd,numClients,tper*100)

            nalrd:=numSocket.Value()
            fmt.Printf("dis sock: %d \n",nalrd)
            numSocket.Reset()

            useTime++
            if totalSend >= numSend*numClients {
                useTime=useTime*3
                fmt.Printf("传输结束,实际发送次数:%d 实际发送Byte:%d 完成线程:%d 耗时:%dmin %ds\n",totalSend,balrd,talrd,useTime/60,useTime%60)
                break
            }
        }
        b:=time.Now().UnixMilli()
        fmt.Println("总耗时:",b-a)
        wgMain.Done()
    }()
    time.Sleep(time.Second*30)
    //断开连接数,总连接数,工作完成连接数
    //优化:成功发送的字节量记录  理论发送字节,实际发送字节
    wgMain.Wait()
    fmt.Println("连接数:",numClients," 发送数:",numSend," 传输工作都已完成.")
    //释放
    //logger.log(0)
    // 检查日志容器是否已经达到10000个元素
    logger.writeToFile()
    fmt.Println("日志已全部保存完毕.")
    //select {}//无限循环
}

阅读 729
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏