服务端
import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
pool = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='simple-work-thread-pool'
)
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket: socket.socket, addr: tuple) -> int:
try:
pid = os.getpid()
# logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(
clientsocket=clientsocket, addr=addr, response_body=response_body)
# logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
def start_request_callback(future: Future) -> None:
send_len: int = future.result()
logger.debug(
f'{threading.current_thread().name}, send payload len is {send_len}')
if __name__ == "__main__":
serversocket = init_serversocket()
pool = multiprocessing.Pool(processes=16)
try:
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
# future: Future = pool.submit(start_request, clientsocket, addr)
# future.add_done_callback(start_request_callback)
pool.apply_async(start_request, (clientsocket, addr))
finally:
pool.close()
pool.join()
serversocket.close()
客户端:
from base64 import encode
import socket # 客户端 发送一个数据,再接收一个数据
import json
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
import time
import multiprocessing
failture_requests = []
def send_request(index: int):
try:
# 声明socket类型,同时生成链接对象
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('127.0.0.1', 6001)) # 建立一个链接,连接到本地的6969端口
payload = b'ponponon'
clientsocket.send(payload)
data = clientsocket.recv(1024)
payload = data.decode()
logger.debug(index)
clientsocket.close()
logger.debug('请求完成')
except Exception as error:
failture_requests.append(index)
logger.exception(error)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=32)
for index in range(1000000):
pool.apply_async(send_request, (index, ))
pool.close()
pool.join()
问题症状:
- 启动服务端后立刻关闭,再启动服务端也一切正常
- 启动服务端后,也启动客户端请求服务端,然后立刻关闭服务端。这个时候,启动就无法再次启动服务端,服务端会报错:
OSError: [Errno 98] Address already in use
- 重点是,使用
sudo lsof -i :6001
空空如也,没有任何进程再占用这个 6001 端口 - 这个
OSError: [Errno 98] Address already in use
会在等待几分钟甚至几十分钟后自己消失,这个时候就能重新启动服务端了
为什么?
服务端的报错
╰─➤ python 002_socket_tcp_server_process.py 1 ↵
2023-05-04 23:00:42.862 | DEBUG | __main__:init_serversocket:28 - host amd5700g
Traceback (most recent call last):
File "/home/pon/Desktop/code/me/http_wsgi_asgi_tutorial/002_socket_tcp_server_process.py", line 73, in <module>
serversocket = init_serversocket()
^^^^^^^^^^^^^^^^^^^
File "/home/pon/Desktop/code/me/http_wsgi_asgi_tutorial/002_socket_tcp_server_process.py", line 33, in init_serversocket
serversocket.bind(('0.0.0.0', port))
OSError: [Errno 98] Address already in use
sudo lsof -i :6001
的输出, 空空如也
(http_wsgi_asgi_tutorial) ╭─pon@amd5700g ~/Desktop/code/me/http_wsgi_asgi_tutorial ‹master*›
╰─➤ sudo lsof -i :6001 1 ↵
(http_wsgi_asgi_tutorial) ╭─pon@amd5700g ~/Desktop/code/me/http_wsgi_asgi_tutorial ‹master*›
为什么?为什么?是 lsof 命令不准确吗?
换成 sudo netstat -anp | grep 6001
命令试了试,有输出
╰─➤ sudo netstat -anp | grep 6001 130 ↵
tcp 0 0 127.0.0.1:6001 127.0.0.1:56470 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:38966 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:50820 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:60648 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:50472 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:50860 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:50040 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:44920 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:38078 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:54364 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:35426 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:54430 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:56028 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:58210 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:38254 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:45726 TIME_WAIT -
tcp 0 0 127.0.0.1:6001 127.0.0.1:38046 TIME_WAIT -
.... 后面的省略,实在是太多了
这就是典型的 TIME_WAIT 端口占用的问题呗:程序意外退出,一个已连接的 Socket 没被主动释放,只能等操作系统最后回收。
解法即是侦听前加
SO_REUSEADDR
参数。P.S.1. Linux 3.9 之后的内核又新增了一个
SO_REUSEPORT
参数,比SO_REUSEADDR
来说更精细化了。不过一般来说的话都是同时设置两者的。Windows 上出于安全考虑一般还需要额外设置SO_EXCLUSIVEADDRUSE
。P.S.2 lsof 扫的是进程,它实质看的是“某个文件被某个进程占用的情况”(Linux 下万物皆文件嘛,Socket 也是个文件)。你这进程都没了,连 pid 都没有当然就看不到了。