In our previous article, we covered the basics of FastAPI. Today, we’ll dive into one of its “killer” features: Asynchronous Endpoints.
If you’re new to programming, hearing the term “asynchronous” might be daunting. Don’t worry! This article will explain it in the simplest way possible, show you with practical code how to use it in FastAPI, and most importantly – when you should use it.
I. Synchronous vs Asynchronous: A Simple Analogy
Before diving into technical details, let’s understand the difference between synchronous and asynchronous using a real-life example.
Scenario: You’re having dinner at a restaurant.
1. Synchronous (Synchronous) Mode
You order a steak. The waiter takes your order to the kitchen, then stands at the kitchen door and waits until the steak is cooked before bringing it to you. During this time, they cannot serve any other customers.
This is how synchronous programming works: once a task starts, the program must wait for it to finish before it can start the next task. If the task is time-consuming (like waiting for a steak to cook), the entire program gets “stuck” there.
2. Asynchronous (Asynchronous) Mode
You order a steak. The waiter takes your order to the kitchen, but does not wait. Instead, they go to serve other new customers: taking their orders, refilling water. When the kitchen shouts “Steak’s ready!”, the waiter comes back to bring your steak to you.
This is how asynchronous programming works: after starting a task, the program doesn’t wait for it to finish. It can immediately start handling other tasks. When the previous task completes, the program receives a “notification” and then comes back to process its result.
The Core Difference: Asynchronous programming allows the program, while waiting for some time-consuming operation (usually I/O operations, like network requests, database queries, file reading/writing), not to sit idle, but to do other work. This increases overall throughput and responsiveness.
II. Asynchronous Endpoints in FastAPI: How to Implement?
Writing asynchronous endpoints in FastAPI is incredibly simple. Just add the keyword async before def when defining your route handler function.
Example 1: The Simplest Async Endpoint
python
# main.py
from fastapi import FastAPI
import asyncio
app = FastAPI()
# Synchronous Endpoint
@app.get("/sync/hello")
def sync_hello():
# Simulate a time-consuming operation, e.g., waiting for 1 second
# time.sleep(1) # Note: You cannot use synchronous sleep in async functions!
return {"message": "Hello from synchronous endpoint"}
# Asynchronous Endpoint
@app.get("/async/hello")
async def async_hello():
# Simulate a time-consuming operation, MUST use async sleep
await asyncio.sleep(1)
return {"message": "Hello from asynchronous endpoint"}
Code Breakdown:
async def: This is the syntax for defining an asynchronous function.await: This keyword can only be used inside anasync deffunction. It tells the program: “I’m now going to wait for this time-consuming operation to finish. You can go handle other tasks and call me back when it’s ready.”asyncio.sleep(1): This is an asynchronous sleep function. It does not block the entire program; instead, it releases the event loop, allowing other tasks to run. You cannot usetime.sleep()inside an async function because it’s synchronous and would block the entire event loop.
How to Run and Test?
- Install Dependencies: As before, you need
fastapianduvicorn.pip install fastapi "uvicorn[standard]" - Start the server:
uvicorn main:app --reload - Test:
Open your browser or usecurlto visithttp://127.0.0.1:8000/sync/helloandhttp://127.0.0.1:8000/async/hello. You’ll notice both endpoints return a result after about 1 second. You might ask: “So what’s the difference?” The difference lies in concurrent handling capability. Let’s use a tool to simulate multiple users accessing the endpoints simultaneously.
Concurrency Test: Witnessing the Power of Async
We’ll use locust for a simple load test.
- Install locust:
pip install locust - Create a test file
locustfile.py:pythonfrom locust import HttpUser, task, between class QuickstartUser(HttpUser): wait_time = between(1, 3) # Users wait 1-3 seconds between requests # Uncomment to test sync endpoint # @task(1) # Task weight, both tasks have the same weight here # def test_sync(self): # self.client.get(“/sync/hello”) @task(1) def test_async(self): self.client.get(“/async/hello”) - Run locust:
locust - Start testing:
- Open your browser to
http://localhost:8089. - Set the number of simulated users (e.g., 100) and the spawn rate (users started per second, e.g., 10).
- Click “Start swarming”.
- Open your browser to
You’ll observe:
- When testing
/sync/hello, the Requests Per Second (RPS) will be very low because the server has to “wait” 1 second for each request, unable to handle multiple requests concurrently. - When testing
/async/hello, the RPS will be significantly higher. Because when one request is atawait asyncio.sleep(1), the server can immediately go handle another request instead of idling.
This simple example vividly demonstrates the tremendous advantage of asynchronous endpoints in handling high-concurrency, I/O-bound tasks.
III. Core Use Cases for Async Endpoints
Merely knowing how to write async endpoints isn’t enough. More importantly, you need to understand when to use them to maximize effectiveness.
Core Principle: Asynchronous programming is best suited for I/O-bound tasks.
Here are the typical use cases for FastAPI async endpoints:
Use Case 1: Database Operations
This is the most common scenario. Database queries (especially to networked databases) often have latencies ranging from tens to hundreds of milliseconds.
Bad Practice (Do NOT use synchronous DB drivers in async functions):
python
# Bad Example: Using synchronous libraries like requests or synchronous SQLAlchemy inside async def
import requests
@app.get("/async/bad-example")
async def bad_async_example():
# requests.get is synchronous and will block the event loop!
response = requests.get("https://api.github.com")
return response.json()
The Right Way: Use asynchronous database drivers.
For example, using SQLAlchemy 1.4+ in asynchronous mode with the asyncpg driver for PostgreSQL.
python
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, select
app = FastAPI()
# 1. Define Database Model
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
# 2. Create Async Engine and Session
DATABASE_URL = "postgresql+asyncpg://user:password@host/dbname"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
# 3. Async Dependency: Get DB Session
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# 4. Async Endpoint: Query Database
@app.get("/async/users/{user_id}")
async def get_user_async(user_id: int, db: AsyncSession = Depends(get_db)):
# Use await to wait for the database query result
result = await db.execute(select(User).filter(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
Key Point: await db.execute(...) releases the event loop, allowing the server to handle other requests until the database returns the result.
Use Case 2: Calling External APIs
When your API needs to act as a client to call another HTTP API (e.g., a weather service, payment gateway), this is also a classic I/O operation.
You should use an asynchronous HTTP client like httpx.
python
# main.py
from fastapi import FastAPI
import httpx
app = FastAPI()
# Recommended: Create a reusable async client
async_client = httpx.AsyncClient()
@app.get("/async/weather/{city}")
async def get_weather_async(city: str):
# Use await to wait for the HTTP request to complete
response = await async_client.get(
f"https://api.openweathermap.org/data/2.5/weather",
params={"q": city, "appid": "YOUR_API_KEY"}
)
response.raise_for_status() # Raises an exception if the request failed
return response.json()
# Close the client connection pool when the app shuts down
@app.on_event("shutdown")
async def shutdown_event():
await async_client.aclose()
Use Case 3: File I/O (Specific Cases)
For local file I/O, Python’s standard open() is synchronous. In most cases, local disk I/O is fast enough that using synchronous endpoints is fine.
However, if you need to process very large files, or perform many time-consuming file operations within a single request, you can offload these operations to a thread pool to avoid blocking the event loop. FastAPI makes this easy.
python
# main.py
from fastapi import FastAPI
import asyncio
app = FastAPI()
# A time-consuming synchronous file I/O function
def write_large_file_sync(filename: str, data: str):
# Simulate writing a lot of data
with open(filename, "w") as f:
for _ in range(100000):
f.write(data)
return f"File {filename} written successfully."
# Async endpoint, using a thread pool to execute the sync function
@app.post("/async/write-file/{filename}")
async def write_file_async(filename: str, data: str):
# loop.run_in_executor runs the synchronous function in a thread pool
# Thus, it won't block the main event loop
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, write_large_file_sync, filename, data)
return {"message": result}
Note: For regular small file I/O, just use a synchronous def route. No need to overcomplicate things.
IV. Async in Practice: Running Multiple Tasks in Parallel
A powerful async endpoint often needs to handle multiple I/O tasks in parallel. For example, an endpoint might need to fetch data from a database, Redis, and an external API simultaneously, then aggregate the results.
python
# main.py
from fastapi import FastAPI
import httpx
import asyncio
app = FastAPI()
async def fetch_from_db(item_id: int):
# Simulate async database query
await asyncio.sleep(0.5)
return {"id": item_id, "name": f"Item {item_id} from DB", "price": 99.99}
async def fetch_from_redis(item_id: int):
# Simulate async Redis query
await asyncio.sleep(0.3)
return {"stock": 100}
async def fetch_from_api(item_id: int):
# Simulate async HTTP request
async with httpx.AsyncClient() as client:
await asyncio.sleep(0.7) # Simulate network latency
return {"discount": 0.1} # Assume API returns discount info
@app.get("/async/item/{item_id}")
async def get_item_async(item_id: int):
# Use asyncio.gather to run multiple async tasks concurrently
# Total time is roughly equal to the SLOWEST task (0.7s), not the sum (0.5+0.3+0.7=1.5s)
db_data, redis_data, api_data = await asyncio.gather(
fetch_from_db(item_id),
fetch_from_redis(item_id),
fetch_from_api(item_id)
)
# Aggregate results
result = {
"item": db_data,
"stock": redis_data["stock"],
"final_price": db_data["price"] * (1 - api_data["discount"])
}
return result
This example is crucial! asyncio.gather is key to achieving concurrency. It launches all the passed async tasks simultaneously and waits for all of them to finish. This is much faster than sequentially await-ing each task.
V. Summary & Pitfall Avoidance Guide
When to use async def?
- When your route handler contains
awaitcalls: For example, calling asynchronous database drivers, async HTTP clients (likehttpx.AsyncClient), or other async functions. - When you need to run multiple I/O tasks concurrently within one endpoint: Using
asyncio.gathercan dramatically improve performance. - When your application is I/O-bound and needs to handle high concurrency: Async endpoints let you handle more concurrent requests with fewer server resources.
When to use def (synchronous)?
- When your route handler contains only CPU-bound operations: For example, complex math calculations, data processing. For CPU-bound tasks, using multiprocessing (
ProcessPoolExecutor) or plain synchronous endpoints might be more appropriate. - When the library you’re using doesn’t have an async version: If you must use a synchronous database driver or third-party library, and the operation is time-consuming, the best practice is to wrap its execution in
loop.run_in_executoror simply use a synchronousdefroute to avoid blocking the event loop. - For simple CRUD operations with extremely low I/O latency: For very simple, fast operations, the performance difference between sync and async is negligible. Using sync endpoints yields simpler code.
Common Pitfalls for Beginners:
- Calling synchronous blocking functions inside
async def: This is the most common mistake. Examples: usingrequests.get()ortime.sleep()inside an async route. This blocks the entire event loop, making all async endpoints unresponsive. - Overusing async unnecessarily: Don’t use async just for the sake of it. If an endpoint’s logic is very simple with no operations that need
await, usingdefis more efficient. - Database connections: Ensure you’re using the correct asynchronous database driver and asynchronous ORM mode for your async application.