十一放假,在家学习
FastAPI是一个基于seattle的异步web框架, 据说其速度和go语言相当。
uvicorn/Gunicorn是一个基于asyncio的异步服务器,用于启动fastapi,其中Gunicorn多进程启动时有负载均衡的功能。
本次源码阅读主要是为了解决我内心的一些疑惑:
- uvicorn启动之后,如何将端口请求传递给fastapi?
- fastapi拿到请求后如何调用的endpoint函数?
- endpoint处理完成后如何返回请求?
- fastapi的endpoint如果做到同时适配同步函数和异步函数的?
源码版本
fastapi == 0.103.1
uvicorn == 0.23.2
1. 从uvicorn到fastapi
1.1 启动uvicorn.run("server:app")
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| def run(...): config = Config(...) server = Server(config=config) ... if config.should_reload: sock = config.bind_socket() ChangeReload(config, target=server.run, sockets=[sock]).run() elif config.workers > 1: sock = config.bind_socket() Multiprocess(config, target=server.run, sockets=[sock]).run() else: server.run() ...
|
uvicorn启动后将run的参数全部传给了Config,实例化之后又传递给了server对象。之后主要运行的就是server.run()
函数。
可以看到在启用reload
或works
之后会创建一个目标端口的套接字,这个主要是为了在多个进程间共享同一个socket对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def run(self, sockets: Optional[List[socket.socket]] = None) -> None: ... return asyncio.run(self.serve(sockets=sockets))
async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: ... await self.startup(sockets=sockets) if self.should_exit: return await self.main_loop() await self.shutdown(sockets=sockets) ...
async def main_loop(self) -> None: counter = 0 should_exit = await self.on_tick(counter) while not should_exit: counter += 1 counter = counter % 864000 await asyncio.sleep(0.1) should_exit = await self.on_tick(counter)
|
代码中可以看到server.run()
调用了server.serve()
函数,而server.serve()
中主要做的就是startup()
,main_loop()
以及当self.show_exit=True
之后运行的shutdown()
函数。 main_loop
主要的作用就是保持代码的运行。
所以关键代码应该在startup()
中
1.2 启动端口监听loop.create_server()
和ASGI协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: ... def create_protocol( _loop: Optional[asyncio.AbstractEventLoop] = None, ) -> asyncio.Protocol: return config.http_protocol_class(...) ... if sockets is not None: ... elif ... else: try: server = await loop.create_server( create_protocol, host=config.host, port=config.port, ... ) except OSError as exc: ...
def load(self): ... if isinstance(self.http, str): http_protocol_class = import_from_string(HTTP_PROTOCOLS[self.http]) self.http_protocol_class: Type[asyncio.Protocol] = http_protocol_class else: self.http_protocol_class = self.http
|
uvicorn通过asyncio的底层接loop.create_server()
启动了一个监听指定端口的server,fastapi的应用程序就运行在这个server中。
config.http_protocol_class()
是一个实现了python异步基础协议的类,这里默认是uvicron.protocols.http.h11_impl.H11Protocol
。
协议规定了需要实现data_data_received(data)
用于处理接收到的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| def data_received(self, data: bytes) -> None: ... self.handle_events()
def handle_events(self) -> None: while True: try: event = self.conn.next_event() except h11.RemoteProtocolError: ...
if event is h11.NEED_DATA: ... elif ... elif isinstance(event, h11.Request): ... self.headers = [(key.lower(), value) for key, value in event.headers] raw_path, _, query_string = event.target.partition(b"?") self.scope = { "type": "http", "asgi": { "version": self.config.asgi_version, "spec_version": "2.3", }, "http_version": event.http_version.decode("ascii"), "server": self.server, "client": self.client, "scheme": self.scheme, "method": event.method.decode("ascii"), "root_path": self.root_path, "path": unquote(raw_path.decode("ascii")), "raw_path": raw_path, "query_string": query_string, "headers": self.headers, "state": self.app_state.copy(), } upgrade = self._get_upgrade() if upgrade == b"websocket" and self._should_upgrade_to_ws(): self.handle_websocket_upgrade(event) return if ... else: app = self.app
self.cycle = RequestResponseCycle( scope=self.scope, ... ) task = self.loop.create_task(self.cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) ...
|
这里可以看到uvicorn中解析了请求的路径、参数、请求头等并保存到了scope中,随后实例化了RequestResponseCycle
并异步调用了run_asgi()
方法
async def run_asgi(self, app: "ASGI3Application") -> None: try: result = await app(self.scope, self.receive, self.send) except ...
|
这一行 result = await app(self.scope, self.receive, self.send)
就是ASGI协议的主要内容。
ASGI规定了app需要实现app(data, receive, send)
,从而使服务端能够与app进行通讯。
至此,我们就理解了uvicorn是如何将收到的请求传递给FastAPI的了:
- uvicorn通过asyncio底层接口启动一个监听端口的server
- server收到请求后解析请求头、请求路径等信息后保存在scope中
- 通过ASGI协议调用FastAPI app,完成信息传递
2. FastAPI调用endpoint
app通过app(data, receive, send)
方法来处理uvicorn传递的数据,由于fastapi的app是一个对象而非函数,所以我们从app的__call__()
方法作为入口开始分析。
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: ... await super().__call__(scope, receive, send)
|
FastAPI直接继承了starlette.Starlette()
,所以我们继续去看starlette
的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: scope["app"] = self if self.middleware_stack is None: self.middleware_stack = self.build_middleware_stack() await self.middleware_stack(scope, receive, send)
def build_middleware_stack(self) -> ASGIApp: ... middleware = [Middleware(...), ...]
app = self.router for cls, options in reversed(middleware): app = cls(app=app, **options) return app
|
starlette
创建了多个中间件,并一层层额的调用,最内层为self.router
。所以app(scope, receive, send)
最终的调用是self.router(scope, receive, send)
。FastAPI中router是自己定义的APIRouter
,我们这里需要回到fastapi看它是如何实现这部分的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class APIRouter(routing.Router):
class Router: async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: assert scope["type"] in ("http", "websocket", "lifespan") ... partial = None for route in self.routes: match, child_scope = route.matches(scope) if match == Match.FULL: scope.update(child_scope) await route.handle(scope, receive, send) return elif match == Match.PARTIAL and partial is None: partial = route partial_scope = child_scope
if partial is not None: ...
|
当调用router(scope, receive, send)
时,他会遍历所有注册的路由去寻找匹配的route
来对传入的数据进行处理。
我们继续看FastAPI默认使用的APIRoute.handle(scope, receive, send)
是如何工作的。
class APIRoute(routing.Route):
class Route: async def handle(self, scope: Scope, receive: Receive, send: Send) -> None: if self.methods and scope["method"] not in self.methods: ... else: await self.app(scope, receive, send)
|
FastAPI中的APIRoute
继承自starlette.routing.Route
。其中handle()
方法的主要内容就是调用了自己的app(scope, receive, send)
方法,而FastAPI中对route的app属性做了修改,所以我们继续看FastAPI的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class APIRoute(routing.Route): def __init__(self, path, endpoint, ...) -> None: ... self.endpoint = endpoint ... self.dependant = get_dependant(path=self.path_format, call=self.endpoint) ... self.app = request_response(self.get_route_handler())
def get_route_handler(self): return get_request_handler(dependant=self.dependant, ...)
def get_request_handler(dependant): is_coroutine = asyncio.iscoroutinefunction(dependant.call) async def app(request: Request) -> Response: if ... else: raw_response = await run_endpoint_function( dependant=dependant, values=values, is_coroutine=is_coroutine ) return response return app
def request_response(func: typing.Callable) -> ASGIApp: is_coroutine = is_async_callable(func) async def app(scope: Scope, receive: Receive, send: Send) -> None: request = Request(scope, receive=receive, send=send) if is_coroutine: response = await func(request) else: response = await run_in_threadpool(func, request) await response(scope, receive, send)
return app
|
从上边的代码中可以看到,FastAPI中的Route.app
实际上上是一个闭包函数。在get_request_handler()
中定义的一个异步闭包函数,并将该函数传递给request_response()
。request_response()
函数调用闭包函数app
拿到其返回值Response
。并调用Response()(scope, receive, send)
实现返回请求。
下面是Response.__call__()
的代码,可以看到其中异步调用的send函数,用于返回这次请求的响应。
1 2 3 4 5 6 7 8 9 10 11 12 13
| class Response: async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: await send( { "type": "http.response.start", "status": self.status_code, "headers": self.raw_headers, } ) await send({"type": "http.response.body", "body": self.body})
if self.background is not None: await self.background()
|
综上,FastAPI从接到请求到返回响应的完整逻辑如下:
- 根据请求的路径寻找匹配的路由
- 调用路由的endpoint(即我们自己定义的函数),拿到函数返回值
raw_content
- 根据
raw_content
生成Response
对象 - 调用
response(scope, receive, send)
返回响应
3. FastAPI如何处理同步、异步请求
在浏览上边的代码时这个问题其实已经解决了,关健就在run_endpoint_function
函数中。
async def run_endpoint_function(dependant): ... if is_coroutine: return await dependant.call(**values) else: return await run_in_threadpool(dependant.call, **values)
|
所以结论就是如果endpoint是一个异步函数,FastAPI会直接调用,而如果是同步函数,会将函数放入线程池中异步调用。
总结
从开发者定义一个endpoint到uvicorn服务启动,再到接受请求,返回响应。中间的流程如下。
- uvicorn启动一个异步服务器,监听指定端口
- 接收到请求后,uvicorn解析请求头、路径等参数,保存到scope中
- 通过ASGI协议,调用app,将scope数据传递给app
- app遍历所有
self.routes
,并找到match的路由 Route
对象调用开发者挂载的endpoint函数,得到返回值Route
构造Response
对象,将endpoint返回值发送回客户端