uvicorn 如何调节线程池大小

gunicornthreads 启动参数用来调节线程池的大小

但是 uvicorn 貌似没有类似的参数?

实测 uvicorn 的线程池的大小为 40

本来以为 limit_concurrency 参数来限制线程池的大小

实际测试也确实如此,但是发生了一个重大问题!

不加 limit_concurrency 的 pool 的 max 是 40,当一下子涌进来 400000 个请求,多出来的 399960 个请求会排排队,等待处理。

但是使用了 limit_concurrency 之后,多出来的 399960 个请求直接 503 了!

虽然 fastapi + uvicorn 是为了异步而生的,但是我只想同步用而已!
阅读 6.8k
1 个回答

我知道了,这个原理是这样的

参考: fastapi 的 middleware 怎么改成同步模式?

fastapi 会把没有加 async 的视图函数放到线程池中去跑

而不是 「uvicorn 会把没有加 async 的视图函数放到线程池中去跑」

所以如果你需要调整 「调节线程池大小」,配置 uvicorn 是没用的


fastapi 的 fastapi/routing.py 文件,有一个 run_endpoint_function 函数

async def run_endpoint_function(
    *, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
) -> Any:
    # Only called by get_request_handler. Has been split into its own function to
    # facilitate profiling endpoints, since inner functions are harder to profile.
    assert dependant.call is not None, "dependant.call must be a function"

    if is_coroutine:
        return await dependant.call(**values)
    else:
        return await run_in_threadpool(dependant.call, **values)

但是视图函数没有加 async,那么 is_coroutine 就是 False,所以会执行 run_in_threadpool,我们的视图函数就被放到线程池里面去跑了


那么回答一开始的问题,这个时候我们需要改一下问题的标题了:「fastapi 如何调节线程池大小」

那么如何调整的?run_in_threadpool 不是 fastapi 实现的,而是 starlette 实现的

fastapi 只是引用了 starlette 而已:from starlette.concurrency import run_in_threadpool

那么 starlette 是怎么实现的?

我也不知道,我问了一下 ChatGPT,下面是我和 chatGPT 的 QA 日志:

以下的内容,仅供参考:

Q:讲解一下 starlette 的 run_in_threadpool

A:在 Starlette 中,run_in_threadpool 是一个函数,用于在线程池中运行阻塞的同步函数,以避免阻塞异步事件循环。该函数可以将同步函数转换为异步函数,并在后台的线程池中执行,以确保不会阻塞主事件循环。

run_in_threadpool 函数位于 starlette.background 模块中。以下是该函数的签名:

def run_in_threadpool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Awaitable[Any]:
    ...

它接受一个同步函数 func 和其参数,然后返回一个可被 await 的对象,表示异步操作的结果。在后台,run_in_threadpool 将同步函数放入线程池中执行,并返回一个 Future 对象,以便在主事件循环中等待其完成。

以下是一个示例,演示如何使用 run_in_threadpool 来运行阻塞的同步函数:

from starlette.background import run_in_threadpool

def blocking_sync_function(arg1: int, arg2: str) -> str:
    # 阻塞的同步函数
    ...

async def async_function():
    result = await run_in_threadpool(blocking_sync_function, 123, "hello")
    # 使用结果进行异步操作
    ...

在上述示例中,我们定义了一个名为 blocking_sync_function 的阻塞的同步函数。在 async_function 中,我们使用 run_in_threadpool 来调用 blocking_sync_function,并传递参数 123"hello"run_in_threadpoolblocking_sync_function 放入线程池中执行,并返回一个可被 await 的对象。

通过使用 run_in_threadpool,我们可以在异步函数中调用阻塞的同步函数,而不会阻塞主事件循环。这对于需要处理阻塞操作的场景非常有用,如调用同步的第三方库、执行计算密集型的任务等。请注意,由于线程池中的执行是同步的,所以在执行阻塞操作时,主事件循环仍然可以处理其他异步任务。

Q:如何设置 run_in_threadpool 对应的线程池的大小

A:在 Starlette 的 run_in_threadpool 中,您可以通过配置 asyncio 的线程池来设置线程池的大小。run_in_threadpool 实际上是使用了 asyncio.get_event_loop().run_in_executor() 来在线程池中执行函数。

您可以使用 asyncio 模块中的 get_event_loop 方法来获取当前的事件循环,并调用其 set_default_executor 方法来设置线程池的大小。以下是一个示例:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from starlette.background import run_in_threadpool

def my_blocking_function(arg):
    # 阻塞的同步函数
    ...

# 设置线程池的大小
executor = ThreadPoolExecutor(max_workers=10)
asyncio.get_event_loop().set_default_executor(executor)

# 在异步函数中使用 run_in_threadpool
async def async_function():
    result = await run_in_threadpool(my_blocking_function, arg)
    ...

在上述示例中,首先使用 ThreadPoolExecutor 创建了一个线程池,通过设置 max_workers 参数来指定线程池的大小为 10。然后,使用 asyncio.get_event_loop().set_default_executor() 方法将线程池设置为当前事件循环的默认执行器。

接下来,在异步函数 async_function 中,您可以使用 run_in_threadpool 来执行阻塞的同步函数 my_blocking_function,并在其中传递参数。run_in_threadpool 将使用您设置的线程池来执行函数,并返回一个可被 await 的对象。

请注意,设置线程池的大小是为了控制并发执行的线程数。根据您的应用程序需求,您可以根据实际情况来设置线程池的大小,以达到最佳性能。

Q:fastapi 中,如何设置 run_in_threadpool 对应的线程池的大小

A:在 FastAPI 中使用 run_in_threadpool 函数时,默认情况下,它使用的是 Python 的全局线程池。要设置线程池的大小,您可以使用 concurrent.futures 模块的 ThreadPoolExecutor 类来创建一个自定义的线程池,并将其传递给 run_in_threadpool

以下是一个示例,演示如何在 FastAPI 中设置 run_in_threadpool 对应的线程池的大小:

from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI
from starlette.background import run_in_threadpool

app = FastAPI()

# 创建自定义线程池
executor = ThreadPoolExecutor(max_workers=10)

@app.get("/")
async def async_function():
    result = await run_in_threadpool(my_blocking_function, arg)
    # 处理结果
    ...

def my_blocking_function(arg):
    # 阻塞的同步函数
    ...

# 设置 app 的线程池
app.state.executor = executor

在上述示例中,我们首先创建了一个自定义的线程池 executor,通过设置 max_workers 参数来指定线程池的大小为 10。

然后,在异步路由处理函数 async_function 中,我们使用 run_in_threadpool 来执行阻塞的同步函数 my_blocking_function,并在其中传递参数。run_in_threadpool 将使用自定义的线程池来执行函数。

最后,我们将自定义的线程池设置为 FastAPI 应用程序的状态 app.state.executor,以便在整个应用程序中共享该线程池。

请注意,使用自定义线程池可以让您控制并发执行的线程数,以满足应用程序的性能需求。根据实际情况和系统资源,您可以根据需要设置线程池的大小。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题