问题:多线程libevent,网络延迟高(200ms),每秒数据传输量小(8MB/s)。
libevent,多线程,TCP。
环境:windows,局域网,两台电脑网线直连。
服务端机器:win11,12核心20G内存,千兆网卡,千兆网线。
客户端机器:win10,8核心16G内存,千兆网卡,千兆网线。
服务端架构:
总共8个线程。
第1个线程,1个事件循环,只用于监听连接,监听到连接将fd通过管道均衡转发给其他线程。
第2-4线程,3个线程,3个事件循环,每个线程各1各事件循环event_base,这3个专门用于做数据接收。从管道中接收连接,将连接注册到本线程的数据接收事件。接收到数据以后通过无锁队列转发给工作线程。
第5-8个线程,无事件循环,通过无锁队列接收到数据以后,对数据简单进行处理(分包),转发回客户端。
客户端架构:
自己编写,使用go语言。
程序开始设置建立连接的数量,每个连接发送多少包。
假设建立15000个连接,每个连接发送800个包,每个包128字节。
创建5w个协程,拥有断线重连机制。
向服务端发送128字节的数据,再立马接收到以后,将收发时间差存入日志文件。
通过对服务端使用LukeStackWalker进行性能分析,发送NtWaitForSingle这个底层函数的cpu耗时最高,推测是libevent内部的信号量。我自己服务端代码未使用过任何信号量和阻塞式操作。
CPU占用率很高,但是运行效率低下,很奇怪。
目前已知的问题,存日志这个操作对延迟影响比较大,但是也只能节省100ms.
希望能够将本地测试的网络延迟降到50ms以内,每秒稳定传输50MB以上。
主要特别是网络延迟这个问题,希望能够得到解答。
服务端代码:
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/thread.h>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <cstring>
#include <Windows.h>
#include "concurrentqueue.h"
#include <event2/event_compat.h>
//#include <arpa/inet.h> //linux socket
#pragma comment(lib, "wsock32.lib")
const int NUM_DATA_PROCESSORS = 3; // 事件循环2-4
const int NUM_WORKERS = 4; // 工作线程数量
const int PORT = 8000;
// 任务类型
struct Task {
evutil_socket_t fd;
std::string data;
Task()= default;
Task(evutil_socket_t _fd,std::string _data):fd(_fd),data(_data){ }
};
struct eBase_data {
std::thread thread;
event_base *base{};
event *notify_event{};
evutil_socket_t notify_receive_fd;
evutil_socket_t notify_send_fd;
};
// 全局任务队列和同步机制
moodycamel::ConcurrentQueue<Task> task_queue;
std::atomic<size_t> aliveSocket(0);
std::atomic<size_t> recvSize(0);
std::atomic<size_t> sendSize(0);
// 工作线程函数
[[noreturn]] void worker_thread() {
while(true) {
Task task;
bool isGet=task_queue.try_dequeue(task);
if(!isGet)continue;
// 处理任务
// 处理和发送数据的逻辑
// bufferevent_write(bev, msg, strlen(msg));
for(int i=0;i<task.data.size()/128;i++)
{
int res=send(task.fd,task.data.c_str()+i*128,128,0);
if(res<0){
//发送失败
}else sendSize+=res;
}
}
}
// 数据接收回调函数
void read_cb(bufferevent *bev, void *ctx) {
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
size_t size=len/128 *128;
std::string data;
data.resize(size);
len=bufferevent_read(bev,(char*)data.c_str(),size);
if(len>0){
recvSize+=len;
{
Task tk(bufferevent_getfd(bev),data);
task_queue.enqueue(tk);
}
}
}
void event_cb(bufferevent *bev, short event, void *pVoid){
if (event & BEV_EVENT_EOF | BEV_EVENT_ERROR) {
aliveSocket--;
bufferevent_free(bev);
}
// printf("Got an error on the connection: %s\n",strerror(errno));
}
//sock传输事件
void worker_notify_cb(evutil_socket_t fd, short what, void *arg) {
auto ebDt = static_cast<eBase_data *>(arg);
evutil_socket_t client_fd;
if (recv(fd, reinterpret_cast<char *>(&client_fd), sizeof(client_fd), 0) <= 0) {
perror("recv");
return;
}
bufferevent *bev = bufferevent_socket_new(ebDt->base, client_fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
std::cerr << "Error constructing bufferevent!" << std::endl;
evutil_closesocket(client_fd);
return;
}
// event_set(ebDt->base,fd,EV_READ | EV_PERSIST, read_cb,NULL);
evbuffer_expand(bufferevent_get_input(bev), 32 * 1024); // 扩展缓冲区到 20K
bufferevent_setcb(bev, read_cb, nullptr, event_cb, nullptr);
bufferevent_enable(bev, EV_READ | EV_PERSIST);//EV_WRITE);
struct timeval tv = {1,0};
bufferevent_set_timeouts(bev,&tv, nullptr);
}
//数据接收事件循环线程
void data_thread_func(void* arg){
auto eb=(eBase_data*)arg;
eb->base = event_base_new();
if (!eb->base) {
std::cerr << "Could not initialize worker thread base!" << std::endl;
return;
}
eb->notify_event = event_new(eb->base, eb->notify_receive_fd, EV_READ | EV_PERSIST, worker_notify_cb, eb);
event_add(eb->notify_event, nullptr);
event_base_dispatch(eb->base);
event_free(eb->notify_event);
event_base_free(eb->base);
}
// 连接回调函数
void accept_conn_cb(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) {
static int next_processor = 0;
// struct event_base *base = evconnlistener_get_base(listener);
auto dBases= (eBase_data*)ctx;
// 选择下一个数据处理事件循环
eBase_data& eb = (dBases)[next_processor];
if (send(eb.notify_send_fd, reinterpret_cast<char *>(&fd), sizeof(fd), 0) != sizeof(fd)) {
perror("send");
evutil_closesocket(fd);
}
next_processor=(next_processor + 1) % NUM_DATA_PROCESSORS;
aliveSocket++;
}
// 连接错误回调函数
void accept_error_cb(evconnlistener *listener, void *ctx) {
event_base *base = static_cast<event_base *>(ctx);
int err = EVUTIL_SOCKET_ERROR();
std::cerr << "Got an error on the listener: " << evutil_socket_error_to_string(err) << std::endl;
event_base_loopexit(base, NULL);
}
// 事件循环线程函数
void accept_thread_func(eBase_data *ctx) {
event_base *accept_base = event_base_new();
if (!accept_base) {
std::cerr << "Could not initialize libevent!" << std::endl;
return;
}
// 监听端口8000
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(PORT);
evconnlistener *listener = evconnlistener_new_bind(accept_base, accept_conn_cb,ctx,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
(sockaddr *)&sin, sizeof(sin));
if (!listener) {
std::cerr << "Could not create a listener on port "<<PORT<< std::endl;
return;
}
evconnlistener_set_error_cb(listener, accept_error_cb);
event_base_dispatch(accept_base);
evconnlistener_free(listener);
event_base_free(accept_base);
}
int main(int argc, char **argv) {
#ifdef WIN32
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (result != 0) {
std::cout << "WSAStartup failed: " << result << std::endl;
exit(1);
}
evthread_use_windows_threads();
#else
evthread_use_pthreads();
#endif
// 创建线程池-工作线程
std::vector<std::thread> worker_threads;
for (int i = 0; i < NUM_WORKERS; ++i) {
worker_threads.emplace_back(worker_thread);
}
// 创建线程池-数据接收
eBase_data data_ebs[NUM_DATA_PROCESSORS];
for (int i = 0; i < NUM_DATA_PROCESSORS; ++i) {
evutil_socket_t fds[2];
if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) {
perror("socketpair");
return 1;
}
data_ebs[i].notify_receive_fd = fds[0];
data_ebs[i].notify_send_fd = fds[1];
data_ebs[i].thread = std::thread(data_thread_func,&data_ebs[i]);
}
// 创建主事件循环
std::thread accept_thread(accept_thread_func,data_ebs);
//数据监测
std::thread([&](){
for(int i=0;i<1200;i++){
std::this_thread::sleep_for(std::chrono::seconds(1));
size_t as = aliveSocket.load(std::memory_order_acq_rel);
std::cout<<"sock:"<<as<<" send:"<<sendSize<<" recv:"<<recvSize<<std::endl;
}
}).detach();
// 等待所有线程结束
accept_thread.join();
for (auto &t : data_ebs) {
t.thread.join();
}
for (auto &t : worker_threads) {
t.join();
}
// 释放资源
for (auto &base : data_ebs) {
event_base_free(base.base);
}
#ifdef WIN32
WSACleanup();
#endif
return 0;
}
客户端代码
package main
import (
"crypto/md5"
"fmt"
"io"
//"google.golang.org/protobuf/proto"
"net"
"strconv"
"strings"
"sync"
"time"
)
var numAlrdSend AtomicInt
var byteAlrd AtomicInt
var threadAlrd AtomicInt
var numSocket AtomicInt
func dialServer(serverAddr string) (net.Conn, error) {
for {
conn, err := net.Dial("tcp", serverAddr)
if err == nil {
return conn,nil
} else {
//fmt.Printf("Failed to connect to the server: %v.\n", err)
//time.Sleep(1 * time.Second) // 等待1秒后重试
}
}
}
//time.Sleep(1 * time.Second)
////recv
//buffer := make([]byte, 100)
//_, err = conn.Read(buffer)
//if err != nil {
// fmt.Printf("Recv Failed: %v\n", err)
// return
//}
var logger *Logger
func communicateWithServer(conn net.Conn,num int,mSend *int)bool{
data := make([]byte, 128) // 初始数据量为1KB
//fullPack(num,data)
//fullPack(num,*mSend,data)
fullpacksmall(data)
//data[0]=0x12
for *mSend>0 {
//time.Sleep(time.Millisecond*300)
a:=time.Now().UnixMilli()
n, err := conn.Write(data)
if err != nil {
//fmt.Printf("Send Failed : %v\n", err)
return false
}
byteAlrd.IncNum(n)
//fmt.Println("send yes.")
n, err = conn.Read(data)
if err != nil {
return false
}
numAlrdSend.Inc()
b:=time.Now().UnixMilli()
logger.log(int(b - a))
//logMsg(strconv.Itoa(int(b - a)))
*mSend--
}
return true
}
func fullpacksmall(data []byte) {
// 定义填充的字符集
str := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
// 填充剩余的data空间
size:=0
count:=(128)/len(str) //计算完整填充次数
for i:=0;i<count;i++ {
copy(data[size:], str)
size+=len(str)
}
copy(data[size:], "FEFF")
}
func fullPackOld(num int, data []byte) {
str := strconv.Itoa(num) // 将数字转换为字符串
// 确保字符串长度为5,不足部分用空格填充
if len(str) < 5 {
str = strings.Repeat("0", 5-len(str)) + str
}
str+=":"
copy(data, str)
size:=len(str)
// 定义填充的字符集
str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
// 填充剩余的data空间
count:=(1024-size)/len(str) //计算完整填充次数
for i:=0;i<count;i++ {
copy(data[size:], str)
size+=len(str)
}
copy(data[size:], str[:1024-size])
}
//"{clnId}:{packId}:{timeStamp}:{mdFive}:{data}\r\n\r\n\r\n"
//00001:0028:1720081312639:ABCDEFGH....\r\n\r\n\r\n
// 生成数据填充字符串的函数
func fullPack(clnId int, packId int, data []byte) {
// 获取当前时间戳
timeStamp := time.Now().UnixMilli()
// 生成基础字符串
baseStr := fmt.Sprintf("%05d:%04d:%d", clnId, packId, timeStamp)
// 计算需要填充的字符数
abc := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
fillLength := 1024 - len(baseStr) -34
// 用 data 循环填充不足的部分
fillStr := ""
abcLen := len(abc)
for i := 0; i < fillLength/abcLen; i++ {fillStr+=abc}
fillStr+=abc[0:fillLength%abcLen]
// 计算 data 的 md5 校验值
md5Hash := md5.Sum([]byte(fillStr))
final:=fmt.Sprintf("%s:%x:%s",baseStr,md5Hash,fillStr)
copy(data, final)
}
func isConnAlive(conn net.Conn) bool {
one := []byte{}
conn.SetReadDeadline(time.Now().Add(1 * time.Second))
var err error
if _, err = conn.Read(one); err == nil || err == io.EOF {
return false
}
if netErr, ok:= err.(net.Error); ok && netErr.Timeout() {
return true
}
return false
}
func main(){
//logger=NewLogger("info_1.log")
//serverAddr := "192.168.1.231:8000"
serverAddr := "192.168.1.1:8000"
//serverAddr := "111.230.37.164:8000"
numClients := 30000
numSend := 800
numLog=numClients*numSend
//var conns sync.Map
logger=NewLogger("info_1.log")
var wgMain sync.WaitGroup
// 初始化日志
//捕获全局错误
defer func() {
if r := recover(); r != nil {
// 处理错误,例如记录日志
fmt.Println("Recovered in panic:", r)
}
}()
//开始通信
for i := 1; i <=numClients; i++ {
go func(clientID int) {
var conn net.Conn
mSend:=numSend
wgMain.Add(1)
for{
conn,_ = dialServer(serverAddr)
//conns.Store(clientID,conn)
////模拟与服务器通信
//time.Sleep(time.Second*3)
if communicateWithServer(conn,clientID,&mSend){
threadAlrd.Inc()
break
}
numSocket.Inc()
//threadAlrd.Inc()
//for ;isConnAlive(conn);{}
//threadAlrd.Dec()
//conn.Close()
}
//time.Sleep(time.Second*10)
_ = conn.Close()
wgMain.Done()
}(i)
}
//每3秒输出一次统计数据
go func() {
wgMain.Add(1)
tal:=numSend*numClients
lastByte:=0
useTime:=0
a:=time.Now().UnixMilli()
for {
time.Sleep(3 * time.Second)
fmt.Println("===============================================")
totalSend := numAlrdSend.Value()
persent:=float64(totalSend)/float64(tal)
fmt.Printf("sendCount: %d / %d rate:%.2f %%\n",totalSend,tal,persent*100)
balrd:=byteAlrd.Value()
bper:=float64(balrd)/float64(tal*128)
speed:=float64(balrd-lastByte)/3/1024/1024
fmt.Printf("sendByte: %d / %d speed:%.2fMB/s rate:%.2f %%\n",balrd,tal*128,speed,bper*100)
lastByte=balrd
talrd:=threadAlrd.Value()
tper:=float64(talrd)/float64(numClients)
fmt.Printf("thread:%d / %d rate:%.2f %%\n",talrd,numClients,tper*100)
nalrd:=numSocket.Value()
fmt.Printf("dis sock: %d \n",nalrd)
numSocket.Reset()
useTime++
if totalSend >= numSend*numClients {
useTime=useTime*3
fmt.Printf("传输结束,实际发送次数:%d 实际发送Byte:%d 完成线程:%d 耗时:%dmin %ds\n",totalSend,balrd,talrd,useTime/60,useTime%60)
break
}
}
b:=time.Now().UnixMilli()
fmt.Println("总耗时:",b-a)
wgMain.Done()
}()
time.Sleep(time.Second*30)
//断开连接数,总连接数,工作完成连接数
//优化:成功发送的字节量记录 理论发送字节,实际发送字节
wgMain.Wait()
fmt.Println("连接数:",numClients," 发送数:",numSend," 传输工作都已完成.")
//释放
//logger.log(0)
// 检查日志容器是否已经达到10000个元素
logger.writeToFile()
fmt.Println("日志已全部保存完毕.")
//select {}//无限循环
}