Code can be found here.
Table of Contents
- What We'll Cover
- The Differences Between FastAPI and Other Frameworks/Microservices
- Making a Dead Simple API
- Extending Our Dead Simple API
- Using WebSockets With FastAPI and RxJS
- How Video Streaming Works and FastAPI Implementation
- Uploading Files and Storing Meta Information in the Database
What We'll Cover
In this post, we'll talk about the following:
- The differences between FastAPI and other python web frameworks/microservices such as Flask.
- How to make a dead simple API with FastAPI.
- Extending the API with routing and MongoEngine or SQLAlchemy.
- How to harness the power of websockets with FastAPI and RxJS to open two-way communication.
- Learn how video streaming works and how to implement it in FastAPI with Starlette's StreamingResponse.
- How to upload and save files to the file system (SQLAlchemy) or GridFS (MongoEngine) and store meta information in our database.
The Differences Between FastAPI and Other Frameworks/Microservices
FastAPI is a modern web framework designed to build, well, fast APIs. It is written in python 3.6+. But Django and Flask already exist. Why would I care about learning a different framework? Take a look at the image below.
This image is a speed test of hundreds of frameworks for a bunch of different languages. You can find the full site here. This list has been filtered down to python frameworks only. It shows that spots 1-6 are taken by straight servers, mostly ASGI and WSGI servers. They may be fast, but they are difficult and time-consuming to write in. Spot 7 and 8, however, are FastAPI at a pretty 72% performance compared to Uvicorn, and if you look down the list, you'll see Django at 21% performance and Flask at a measly 14% perfomance.
Now that we've covered what FastAPI is, let's make a simple API
So FastAPI, as the name suggests, is fast. Though not nearly as fast as compiled APIs (FastAPI sits at around 8% the performance of the top performer, which is still pretty fast, especially for a python program), FastAPI makes up for it by providing many tools under the hood without the need of additional packages, a concept that frameworks like Flask fail at.
How is FastAPI so fast? It inheritly uses asynchronous functions for it's API endpoints instead of synchronous functions. That allows the API to consume thousands of requests without the need of a load balancer.
Making a Dead Simple API
First, we need to install FastAPI and an ASGI server. I recommend you use a package like venv
to make a virtual environment to install everything to avoid crowding your main python installation. For venv
, you can create one by running python3 -m venv venv
for Mac and Linux and python -m venv venv
. You will now have a folder inside the folder you ran the command in called venv
. Now, we'll activate the environment so the terminal knows to use it. For Mac and Linux, we'll run . venv/bin/activate
, and on Windows, we run venv\Scripts\activate
. Now that the environment is set up, we'll install FastAPI and Uvicorn (a popular ASGI server). We'll also install wheel to make installing other packages smoother. If you get an error installing wheel, just ignore it and move on. We do that by running the following commands:
pip install wheel
pip install fastapi python-multipart uvicorn[standard]
And now that everythings installed, on to making the API. Create a python file and begin writing.
main.py# import the FastAPI object from fastapi import FastAPI # define the FastAPI app with the debug flag so we get more output in case something goes wrong app = FastAPI(debug=True) # define a route at '/' (root) @app.get('/') async def hello_world(): # send back "hello world" return 'hello world' # if this file is run directly if __name__ == '__main__': import uvicorn # start up uvicorn with the reload flag so it restarts whenever we change a file (turn off in production) uvicorn.run('main:app', reload=True)
Running this file with python main.py
will boot up Uvicorn and FastAPI, which will spit out the address of the API, http://127.0.0.1:8000/
. Going to this address will spit out "hello world"
, just as we defined it. Congrats! You made an API. It doesn't do much, but it's a start.
Extending Our Dead Simple API
The biggest improvement we can make for quality-of-life is to split our application into multiple files. We can do that with FastAPI's APIRouter. By using APIRouter, we can have similar routes in the same file, while keeping different groups of routes apart from each other. Let's start by creating routes.py
in the resources
folder.
resources/routes.pyfrom fastapi import FastAPI # a list of routers to include in our app routers = [ ] # loop through our routers and add them to the app def initialize_routes(app: FastAPI) -> None: for router in routers: app.include_router(router)
But wait. What's that colon and arrow doing in the initialize_routes()
definition? One of FastAPI's major advantages over other frameworks is it utilizes typing (introduced in python 3.5). By using these typngs, we are writing cleaner code and providing type hints for our editor to use. You might have noticed when writing the last line that your editor might have given you an autocomplete for include_router(router: APIRouter)
.
Now back in main.py
, we can initialize the routes at startup
main.py... app = FastAPI(debug=True) - # define a route at '/' (root) - @app.get('/') - async def hello_world(): - # send back "hello world" - return 'hello world' + @app.on_event('startup') + async def startup(): + print('-- STARTING UP --') + from resources.routes import initialize_routes + initialize_routes(app) + print('-- STARTED UP --') ...
When Uvicorn reloads, not much will change, but you will notice in the console that it is printing out "-- STARTING UP --" and "-- STARTED UP --". This is because we used the on_event
decorator on our function, which is ran before requests can be received. This is very helpful because we'll be able to do any neccesary actions before we start accepting requests, such as setting up databases.
Next, we'll move the old default route to a different file. Feel free to skip this step as it will be removed later, but do read it as it will cover APIRouter.
resources/test.pyfrom fastapi import APIRouter router = APIRouter( prefix='', tags=['Test'] ) @router.get('/') async def hello_world(): return 'hello world'
And we'll add the new router to our list of routers
resources/routes.py+ from .test import router as test_router router = [ + test_router ] ...
And after we save and FastAPI finishes startup, if we go back to http://127.0.0.1:8000/
, we'll be greeted by our familiar "hello world".
And if we go to http://127.0.0.1:8000/docs
, we'll be greeted by our very own OpenAPI documentation for our API. We'll see our route under the Test
section. We can even try it out directly on this page as well. Very useful for debugging and rembering what our routes do.
Now that routing is all set, we'll install MongoEngine or SQLAlchemy (You can install both, but you'll only use one at a time):
pip install mongoengine
pip install SQLAlchemy
For now, we won't create any collections or tables, but we'll come back to it.
MongoEngine Set Up
You will need to install mongodb server either from here or from your favorite package manager.
database/db.pyfrom mongoengine import connect, disconnect def initialize_db() -> None: # host='mongodb://localhost/<DATABASE NAME>' connect(host='mongodb://localhost/fastapi-test') def close_db() -> None: disconnect()
SQLAlchemy Set Up
database/db.pyfrom sqlalchemy import create_engine from sqlalchemy.engine.base import Engine from typing import Optional engine: Optional[Engine] = None def initialize_db() -> None: global engine # '<DB TYPE>:///<DATABASE NAME>' engine = create_engine('sqlite:///fastapi-test.sqlite3', echo=True, future=True) def close_db() -> None: if engine: engine.dispose()
We also see the introduction of Optional
. This tells you that the variable can either be None
or the type we provided.
Both
main.py@app.on_event('startup') async def startup(): print('-- STARTING UP--') + from database.db import initialize_db + initialize_db() from resources.routes import initialize_routes initialize_routes(app) print('-- STARTED UP --') + @app.on_event('shutdown') + async def shutdown(): + print('-- SHUTTING DOWN --') + from database.db import close_db + close_db() + print('-- SHUT DOWN --')
You'll noticed that we added a shutdown
event. It's to clean up anything we need to. In our case, we use it close our database connections. We don't technically need to with MongoEngine or SQLAlchemy, but it's good practice to clean up after yourself.
Using WebSockets With FastAPI and RxJS
Now it's time to get into WebSockets. What are WebSockets?
From Wikipedia. Licensed under CC BY-SA 4.0
WebSockets are like normal HTTP request, but are long-lived (only one real request is made) and are bi-directional, allowing the server to talk to the client. This is very important in today's world where we want to be able to give the client information efficiently without having them refresh the page.
Basic WebSockets
First, let's make a WebSocket handler in our API.
resources/sockets.pyfrom fastapi import APIRouter, WebSocket, WebSocketDisconnect router = APIRouter( tags=['Sockets'] ) @router.websocket('/') async def root_socket(websocket: WebSocket): await websocket.accept() print('Client connected') try: while True: print(await websocket.receive_text()) await websocket.send_json({'pong': True}) except WebSocketDisconnect: print('Client disconnected')
And don't forget to add the router to our list of available routes. It is safe to remove the test router and file as we won't be needing it anymore
resources/routes.py... - from .test import router as test_router + from .sockets import router as socket_router routers = [ - test_router + socket_router ] ...
We have our socket going, but now we need a client in order to use it. For this post, We'll use Angular, but any framework that uses RxJS will work the same. You can also use vanilla JavaScript, but the code below will need to be adapted to use JavaScript's WebSocket API (more info here).
With node and the Angular CLI installed, we can run ng new test-client
. We won't use Angular routing and we'll use basic CSS for this project, as it won't do much. After it's installed, we'll open a new terminal inside the newly created project and run ng serve
to run the project. We can then go to the provided url and we'll see our basic Angular application.
Now that we have a client going, we'll open test-client/src/app/app.component.ts
. This will be the only file we modify for this project. Let's connect to our socket with RxJS.
test-client/src/app/app.component.tsimport { Component, OnDestroy, OnInit } from '@angular/core'; import { Subscription, retry } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { title = 'test-client'; private socket: WebSocketSubject<any> | undefined; // If you have an interface for incoming websocket data, you can replace <any> with <INTERFACE> private socketSub: Subscription | undefined; ngOnInit(): void { this.socket = webSocket('ws://localhost:8000/'); // Open a new websocket connection // Subscribe to incoming data. Retry the connection if it closes. this.socket.pipe(retry()).subscribe(payload => { console.log(payload); // Print out incoming data }); this.socket.next('ping'); // Send a string } ngOnDestroy(): void { this.socketSub?.unsubscribe(); this.socket?.unsubscribe(); // Close the connection } }
After saving, we can see that FastAPI prints out "ping"
, and the browser console will print out {pong: true}
. And with that, we've established a bi-direction connection between our front end and back end.
Connection Manager
Basic websockets are all well and good, but the way they are set up now only allows us to send messages from the root_socket
function. Let's implement a reusable connection manager class to allow us to keep track of our connected clients and be able to manipulate them from other endpoints and functions.
resources/sokcets.pyrouter = APIRouter( tags=['Sockets'] ) + class ConnectionManager: + def __init__(self): + self.active_connections: list[WebSocket] = [] + + async def connect(self, websocket: WebSocket) -> None: + await websocket.accept() + self.activate_connection.append(websocket) + + def disconnect(self, websocket: WebSocket) -> None: + self.active_connections.remove(websocket) + + async def send_message(self, message: dict, websocket: WebSocket) -> None: + await websocket.send_json(message) + + async def broadcast(self, message: dict) -> None: + for connection in self.active_connections: + await connection.send_json(message) + root_manager = ConnectionManager() @router.websocket('/') async def root_socket(websocket: WebSocket): - await websocket.accept() + await root_manager.connect(websocket) print('Client connected') try: while True: print(await websocket.receive_text()) - await websocket.send_json({'pong': True}) + await root_manager.send_message({'pong': True}, websocket) except WebSocketDisconnect: + root_manager.disconnect(websocket) print('Client disconnected')
It seems a little more comlicated than before, and if you connect to this endpoint, it seems that nothing really changed. But this is an important step in websocket handling, as now we have access to the websocket from other functions, as well as the ability to add additional information to each socket, such as a session ID (sid), authentication, and metadata.
As a final step, let's assing sids to each websocket for easier handling.
resources/sockets.py... + from typing import Optional + import base64 + import os + class WebSocketConnection: + def __init__(self, sid: str, socket: WebSocket): + self.sid = sid + self.socket = socket class ConnectionManager: def __init__(self): - self.active_connections: list[WebSocket] = [] + self.active_connections: list[WebSocketConnection] = [] - async def connect(self, websocket: WebSocket) -> None: + async def connect(self, websocket: WebSocket) -> str: + await websocket.accept() + sid = '' + do = True + while do: + do = False + sid = base64.b32encode(os.urandom(32)).decode('utf8') + if len(self.active_connections) > 0: + for connection in active_connections: + if connection.sid == sid: + do = True - self.activate_connection.append(websocket) + self.active_connections.append(WebSocketConnection(sid=sid, socket=websocket)) + return sid def disconnect(self, websocket: WebSocket) -> None: - self.active_connections.remove(websocket) + for i in range(len(self.active_connections)): + if self.active_connections[i].socket == websocket: + self.active_connections.pop(i) + break ... + def get_socket_from_sid(self, sid: str) -> Optional[WebSocket]: + for connection in self.active_connections: + if connection.sid == sid: + return connection.socket root_manager = ConnectionManager() @router.websocket('/') async def root_socket(websocket: WebSocket): - await root_manager.connect(websocket) + sid = await root_manager.connect(websocket) + await root_manager.send_message({'sid': sid}, websocket) print('Client connected') try: while True: print(await websocket.receive_text()) await root_manager.send_message({'pong': True}, websocket) except WebSocketDisconnect: root_manager.disconnect(websocket) print('Client disconnected')
Nothing has really changed in our root_socket
function, but now we're sending back the generated sid. This is very important down the line because other endpoints can have access to the websocket connections based in this session ID.
How Video Streaming Works and FastAPI Implementation
Video streaming is a major step up from how it used to be done. Before, the entire video would have to download for it to be playable, but thanks to modern codecs, only the metadata and the requested parts need to be downloaded to achieve streaming. This is a major improvement as network load and loading times are lessened.
But how does our API know what parts of the video to stream? The answer is headers. Headers are part of the request that offer information such as the kind of device requesting, authorization codes, or what parts of a file we want. We could technically use query parameters to achieve the same result, but using headers have become the standard as they don't interfere with query parameters.
Let's create our streaming endpoint. For now, we will be streaming a video file from disk, but later on, we will stream live video. You can download a test video here (I recommend you use the VP9 codec as it is very efficient, but if you have problems, use H.264). Put the video file in a folder in the project called files
resources/file.pyfrom fastapi import APIRouter, HTTPException from starlette.responses import StreamingResponse import os from io import BufferedReader import mimetypes router = APIRouter( prefix='/file', tags=['File'] ) @router.get('/stream') def stream(filename: str): try: def iterfile(path: str): with open(path, 'rb') as file: yield from file path = os.path.join(os.getcwd(), 'files', filename) mimetype = mimetypes.guess_type(filename)[0] return StreamingResponse( content=iterfile(path), media_type=mimetype ) except OSError: raise HTTPException(status_code=404, detail='file not found') except Exception as e: raise e
And don't forget to add the new router to your list of routers.
resources/routes.py... + from .file import router as file_router routers = [ + file_router, socket_router ] ...
And that's it! Kind of. If you go to http://127.0.0.1:8000/file/stream?filename=<FILE NAME WITH EXTENSION>
in your browser or a video player that supports network streams, such as VLC. You'll be met with a video that is being streamed through FastAPI. Very cool, but there's a problem. You can't seek. That's because we aren't using headers to determine what the client actually wants to see. We're just sending the contents of the file sequentially. You'll also see the introduction of HTTPException
. This is how FastAPI returns errors. Let's look at how we can use the range
header to allow seeking.
resources/file.py- from fastapi import APIRouter, HTTPException + from fastapi import APIRouter, Header, HTTPException from starlette.responses import StreamingResponse import os from io import BufferedReader import mimetypes router = APIRouter( prefix='/file', tags=['File'] ) @router.get('/stream') - def stream(filename: str): + def stream(filename: str, range: str = Header('bytes=0-')): + CONTENT_CHUNK_SIZE = 1024 * 14 # feel free to play around with this number to maximize efficiency try: + def get_stream_and_size(filename: str): + path = os.path.join(os.getcwd(), 'files', filename) + f = open(path, 'rb') + return f, os.path.getsize(path) - def iterfile(path: str): - with open(path, 'rb') as file: - yield from file + def iterfile(file_stream: BufferedReader, chunk_size: int, start: int, size: int): + bytes_read = 0 + file_stream.seek(start) + while bytes_read < size: + bytes_to_read = min(chunk_size, size - bytes_read) + yield file_stream.read(bytes_to_read) + bytes_read += bytes_to_read + file_stream.close() - path = os.path.join(os.getcwd(), 'files', filename) + start_byte = int(range.split('=')[-1].split('-')[0]) + chunk_size = CONTENT_CHUNK_SIZE + file_stream, size = get_stream_and_size(filename) + if start_byte + chunk_size > size: + chunk_size = size - 1 - start_byte mimetype = mimetypes.guess_type(filename)[0] return StreamingResponse( - content=iterfile(path), + content=iterfile( + file_stream, + chunk_size, + start_byte, + size + ), + status_code=206, + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start_byte}-{start_byte+chunk_size}/{size - 1}', + 'Content-Type': mimetype, + 'Content-Disposition': f'inline; filename={filename}' + }, media_type=mimetype ) except OSError: raise HTTPException(status_code=404, detail='file not found') except Exception as e: raise e
It's more complex, but that's because we need to process what's being asked for, and serve what's requested while making sure we don't overflow, because that would cause errors. The important part here is that our StreamingResponse
is allowing the 'Accept-Ranges'
header, which allows clients to request specific parts of the video. You'll also notice that we are using the 206 status code, which tells the browser that it is receiving partial content, and allows certain browsers or applications to seek. Another small but import part is the value of CONTENT_CHUNK_SIZE
. This is the maximum size of each payload back to the client in bytes. If you make it too small, the client will need to make a lot of requests. If you make it too large, it will hog bandwitch and be slow to load. You will need to play around with it to find a good sweet spot. A typical value is around between 1024 * 4 and 1024 * 15.
Uploading Files
Now that we have file streaming working, let's be able to upload files. We will be utilizing either MongoEngine or SQLAlchemy to do this in order to map file names and, in the case of MongoEngine, be able to store the files directly in the DB with GridFS.
MongoEngine
Note: Using GridFS over a normal file system supposedly hinders performance a little, but in my experience, this is negligible and is greatly outweighed by the features of GridFS, such as better organization, a single database backup, and an easier saving process.
database/models.pyfrom mongoengine import Document, StringField, FileField class Media(Document): folder = StringField(required=True) filename = StringField(unique_with="folder", required=True) file = FileField()
In models.py
, we define a Media
collection that contains the folder, filename, and a reference to the file itself. Now we'll define an uploader endpoint and change our stream
endpoint to use the database instead of the file system.
resources/file.py- from fastapi import APIRouter, Header, HTTPException + from fastapi import APIRouter, Header, HTTPException, UploadFile, Form, File from starlette.responses import StreamingResponse from io import BufferedReader import mimetypes + from mongoengine.errors import DoesNotExist, NotUniqueError + from database.models import Media router = APIRouter( prefix='/file', tags=['File'] ) + @router.post('/upload') + async def upload_file(file: UploadFile = File(...), folder: str = Form('')): + try: + if file.filename == '' or (len(folder) > 0 and folder[0] == '/'): # Enforce required filename and that folders can't start with `/` + raise HTTPException + mimetype = file.content_type + if not mimetype: + mimetype = mimtypes.guess_type(file.filename)[0] + media = Media(folder=folder, filename=file.filename) + media.file.put(file.file, content_type=mimetype) + media.save() + return str(media.id) + except HTTPException: + raise HTTPException(status_code=400, detail='Validation Error') + except NotUniqueError: + raise HTTPException(status_code=400, detail='Not Unique') + except Exception as e: + raise e @router.get('/stream') - def stream(filename: str, range: Optional[str] = Header(None)): + def stream(filename: str, folder: str = '', range: str = Header('bytes=0-')): ... - def get_stream_and_size(filename: str): + def get_stream_and_size_and_mimetype(filename: str, folder: str): - path = os.path.join(os.getcwd(), 'files', filename) - f = open(path, 'rb') - return f, os.path.getsize(path) + f = Media.objects.get(filename=filename, folder=folder) + return f.file, f.file.length, f.file.content_type - file_stream, size = get_stream_and_size(filename) + file_stream, size, mimetype = get_stream_and_size_and_mimetype(filename, folder) ... - mimetype = mimetypes.guess_type(filename)[0] + if not mimetype: + mimetype = mimetypes.guess_type(filename)[0] if start_byte + chunk_size > size: chunk_size = size - 1 - start_byte return StreamingResponse( ... ) - except OSError: + except DoesNotExist: ...
Now, we have the ability to upload and stream files from GridFS.
SQLAlchemy
database/models.pyfrom .db import engine from sqlalchemy import Table, Column, Integer, String, MetaData from sqlalchemy.engine.base import Engine meta = MetaData() media = Table( 'media', meta, Column('folder', String, primary_key = True), Column('filename', String, primary_key = True), Column('mimetype', String), Column('length', Integer) ) def create_tables(engine: Engine): meta.create_all(engine)
And we'll call create_tables
in database/db.py
database/db.pydef initialize_db() -> None: global engine # '
:/// ' engine = create_engine('sqlite:///fastapi-test.sqlite3', echo=True, future=True) + if engine: + from .models import create_tables + create_tables(engine) + else: + print('Failed to create tables. Engine not connected')
Now that we've defined and created our media
table, we'll move on to uploading files to the file system and saving the information to the database.
resources/file.py- from fastapi import APIRouter, Header, HTTPException + from fastapi import APIRouter, Header, HTTPException, UploadFile, Form, File from starlette.responses import StreamingResponse import os from io import BufferedReader import mimetypes + from database.db import engine + from database.models import media + from sqlalchemy.orm.exc import NoResultFound router = APIRouter( prefix='/file', tags=['File'] ) + def is_safe_path(basedir, path) -> bool: + return basedir == os.path.commonpath((basedir, os.path.abspath(path))) + @router.post('/upload') + async def upload_file(file: UploadFile = File(...), folder: str = Form('')): + try: + if not is_safe_path(os.getcwd(), folder) or file.filename == '' or (len(folder) > 0 and folder[0] == '/'): # Enforce required filename and that folders can't start with `/` + raise HTTPException + filesPath = os.path.join(os.getcwd(), 'files') + outputFolderPath = os.path.join(filesPath, *folder.split('/') + outputPath = os.path.join(outputFolderPath, file.filename) + if os.path.exists(outputPath): + raise HTTPException + os.makedirs(outputFolderPath) + with open(outputPath, 'wb+') as f: + length = f.write(file.file.read()) + mimetype = file.content_type + if not mimetype: + mimetype = mimetypes.guess_type(file.filename)[0] + with engine.connect() as conn: + ins = media.insert().values(folder=folder, filename=file.filename, mimetype=mimetype, length=length) + conn.execute(ins.compile()) + conn.commit() + return 'ok' @router.get('/stream') - def stream(filename: str, range: str = Header('bytes=0-')): + def stream(filename: str, folder: str = '', range: str = Header('bytes=0-')): ... - def get_stream_and_size(filename: str): + def get_stream_and_size_and_mimetype(filename: str, folder: str): path = os.path.join(os.getcwd(), 'files', *folder.split('/'), filename) f = open(path, 'rb') + with engine.connect() as conn: + stmt = media.select().where(media.c.folder=folder, media.c.filename=filename) + mediaObj = conn.execute(stmt.compile()).one() + length = mediaObj.length + mimetype = mediaObj.mimetype - return f, os.path.getsize(path) + return f, length, mimetype + if not is_safe_path(os.getcwd(), folder): + raise OSError - file_stream, size = get_stream_and_size(filename) + file_stream, size, mimetype = get_stream_and_size_and_mimetype(filename, folder) ... - mimetype = mimetypes.guess_type(filename)[0] + if not mimetype: + mimetype = mimetypes.guess_type(filename)[0] return StreamingResponse( ... ) - except OSError: + except (OSError, NoResultFound): ...
Now we can upload and stream files to and from the file system.
>