python 生态下的 WSGI/ASGI 怎么读取客户端请求的 TLS 指纹?

问了 deepseek、豆包、gemini、chatgpt,回答不正确

我一开始问他们是怎么在 uvicorn+fastapi 生态下实现,但是 request.scope.get('ssl') 为空

import uvicorn
from loggers import logger
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from middleware.loggers import RequestLogMiddleware
from middleware.correlations import CorrelationIdMiddleware
from starlette.middleware.base import BaseHTTPMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
app.add_middleware(BaseHTTPMiddleware, dispatch=RequestLogMiddleware())
app.add_middleware(BaseHTTPMiddleware, dispatch=CorrelationIdMiddleware())


@app.get('/')
@logger.catch
async def root(request: Request):
    ssl_info = request.scope.get('ssl')
    if ssl_info:
        # 尝试获取 TLS 指纹,不同的 Python 版本和环境可能有所不同
        tls_fingerprint = ssl_info.get('peer_cert_fingerprint')
        if tls_fingerprint:
            logger.info(f"Client TLS fingerprint: {tls_fingerprint}")
        else:
            logger.info("Could not get client TLS fingerprint.")
    else:
        logger.info("No SSL information available.")

    response_body = {
        "ip": request.client.host
    }
    logger.debug(response_body)

    return response_body


if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8086,
        workers=1,
        ssl_keyfile="/Users/ponponon/Downloads/xxxx.cn_nginx/xxxx.cn.key",
        ssl_certfile="/Users/ponponon/Downloads/xxxx.cn_nginx/xxxx.cn.pem"
    )
    

后来又让我直接 socket 写 server,但是都读取不到客户端请求中的 TLS 指纹

import ssl
import socket
from loguru import logger
from pyja3 import extract_ja3_from_client_hello
from threading import Thread


CERT_FILE = "/home/pon/code/me/ssl/xxxx.cn_nginx/xxxx.cn.pem"
KEY_FILE = "/home/pon/code/me/ssl/xxxx.cn_nginx/xxxx.cn.key"


def handle_client(client_socket, addr):
    try:
        raw_data = client_socket.recv(4096, socket.MSG_PEEK)
        ja3_str, ja3_hash = extract_ja3_from_client_hello(raw_data)
        logger.info(f"[{addr}] JA3: {ja3_str}, MD5: {ja3_hash}")
    except Exception as e:
        logger.warning(f"[{addr}] Failed to get JA3: {e}")
    finally:
        client_socket.close()


def main():
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)

    bindsocket = socket.socket()
    bindsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    bindsocket.bind(("0.0.0.0", 8086))
    bindsocket.listen(5)

    logger.info("Server listening on 0.0.0.0:8086")

    while True:
        client_socket, fromaddr = bindsocket.accept()
        Thread(target=handle_client, args=(client_socket, fromaddr)).start()


if __name__ == "__main__":
    main()
阅读 583
1 个回答
import socket
import ssl
import threading
from loguru import logger

CERT_FILE = "/path/to/cert.pem"
KEY_FILE = "/path/to/key.pem"

def get_client_hello(sock):
    class TLSLogger:
        def __init__(self):
            self.client_hello = None
            
        def callback(self, conn, direction, version, content_type, msg_type, data):
            # ClientHello是握手类型1
            if direction == 0 and content_type == 22 and msg_type == 1:
                self.client_hello = data
                logger.info(f"捕获到ClientHello: {len(data)}字节")
                
    logger_obj = TLSLogger()
    
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
    
    # 设置消息回调
    context.set_msg_callback(logger_obj.callback)
    
    ssl_sock = context.wrap_socket(sock, server_side=True, do_handshake_on_connect=False)
    try:
        ssl_sock.do_handshake()
    except:
        pass
    
    return logger_obj.client_hello

def handle_client(client_sock, addr):
    try:
        client_hello = get_client_hello(client_sock)
        if client_hello:
            logger.info(f"客户端 {addr} 的TLS握手数据: {client_hello[:20]}...")
    except Exception as e:
        logger.error(f"处理客户端 {addr} 出错: {e}")
    finally:
        client_sock.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 8443))
    server.listen(5)
    logger.info("服务器启动在 0.0.0.0:8443")
    
    try:
        while True:
            client, addr = server.accept()
            logger.info(f"接受连接: {addr}")
            client_thread = threading.Thread(
                target=handle_client, args=(client, addr))
            client_thread.start()
    except KeyboardInterrupt:
        logger.info("服务器关闭")
    finally:
        server.close()

if __name__ == "__main__":
    main()

最好是用Nginx和Lua脚本

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题