The Ultimate Flask MySQL Tutorial for Beginners: Code You Can Copy & Run Instantly!

paz 06/12/2025

Stuck on database operations while trying to build a web project with Flask as a Python beginner? Don’t worry! Today, we’ll start from scratch and implement CRUD (Create, Read, Update, Delete) operations with Flask + MySQL. This is a complete, step-by-step guide with code you can copy and run right away!

Flask itself doesn’t include database functionality, but when paired with Flask-SQLAlchemy (an ORM tool) and pymysql (a MySQL driver), you can interact with your database like writing Python code, without memorizing complex SQL statements. Maximum efficiency unlocked!

I. Environment Setup: 3 Steps to Install Dependencies

1. Install Required Libraries

Open your terminal/command prompt and enter the following command to install all dependencies at once:

bash

pip install flask flask-sqlalchemy pymysql
  • flask: The core web framework.
  • flask-sqlalchemy: ORM tool for interacting with the database using Python classes (no SQL required).
  • pymysql: Driver for connecting to MySQL databases.

2. Prepare the MySQL Database

  • First, install MySQL (beginners are recommended to use XAMPP or WAMP for a one-click MySQL environment setup).
  • Manually create a database (e.g., flask_db). You can use Navicat, PhpMyAdmin, or the MySQL command line:sqlCREATE DATABASE flask_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;(utf8mb4 supports Chinese and special characters to avoid garbled text.)

3. Verify the Environment

After installation, open the Python terminal and enter the following code. If no errors appear, your environment is ready.

python

import flask
import flask_sqlalchemy
import pymysql
print("Environment configured successfully!")

II. Core Configuration: Connecting Flask to MySQL

First, create a Flask project. The directory structure is super simple:

text

flask_mysql_demo/
└── app.py  # Main application file

In app.py, write the following configuration code to establish the connection between Flask and MySQL:

python

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

# 1. Initialize the Flask application
app = Flask(__name__)

# 2. Configure MySQL connection information
# Format: mysql+pymysql://username:password@host:port/database_name
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@localhost:3306/flask_db'
# (Must modify) root: MySQL username (default is usually root)
# (Must modify) 123456: MySQL password (the password you set during installation)
# (Optional) localhost:3306: MySQL host and port (default is 3306)
# (Must modify) flask_db: The database name you just created

# 3. Disable SQLAlchemy's tracking modification warning (optional, removes unnecessary warnings)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# 4. Initialize the SQLAlchemy object (links the Flask app with the database)
db = SQLAlchemy(app)

# Test if the connection is successful
with app.app_context():  # Context is mandatory for Flask 2.0+
    try:
        # Attempt to connect to the database
        db.engine.connect()
        print("Connected to MySQL successfully! 🎉")
    except Exception as e:
        print(f"Connection failed! Error: {e}")

if __name__ == '__main__':
    app.run(debug=True)

Key Notes:

  • You must modify the usernamepassword, and database_name in SQLALCHEMY_DATABASE_URI, otherwise the connection will fail.
  • After running the code, if the terminal prints Connected to MySQL successfully!, the configuration is correct.

III. Defining Data Models: Mapping Database Tables with Python Classes

The core idea of ORM (Object Relational Mapping) is: Python Class → Database TableClass Attributes → Table Columns.

Let’s use a “Users table” as an example. Add the model definition to app.py:

python

# Continuing from the code above, add this to app.py
class User(db.Model):
    # Define the table name (the actual name in the database)
    __tablename__ = 'users'  # Table names are usually plural

    # Define the fields (field name → database column name)
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)  # Primary key ID, auto-incrementing
    username = db.Column(db.String(50), unique=True, nullable=False)  # Username: unique, not null
    email = db.Column(db.String(100), unique=True, nullable=False)   # Email: unique, not null
    age = db.Column(db.Integer, default=0)                          # Age: default value 0
    create_time = db.Column(db.DateTime, default=db.func.current_timestamp()) # Creation time: defaults to now

    # Optional: Define the format when printing the object (helpful for debugging)
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"

# Create the database tables (executed on first run)
with app.app_context():
    db.create_all()  # Automatically creates the 'users' table in MySQL if it doesn't exist

Field Type Guide (Must-know for beginners):

Field TypePurposeExample
db.IntegerInteger (ID, age)age = db.Column(db.Integer)
db.String(n)String (name, email)username = db.Column(db.String(50))
db.DateTimeDate & Time (created_at)create_time = db.Column(db.DateTime)
db.BooleanBoolean (is_active)is_active = db.Column(db.Boolean, default=True)

Field Parameter Guide:

  • primary_key=True: Sets as primary key (unique identifier for a record).
  • autoincrement=True: Auto-increment (ID doesn’t need manual assignment).
  • unique=True: Field value must be unique (avoids duplicate usernames/emails).
  • nullable=False: Field cannot be null (required).
  • default=xxx: Sets a default value (optional field).

Run the code:
Execute app.py. It will automatically create the users table in the flask_db database. You can check it with your MySQL tool!

IV. Core Operations: Database CRUD

All operations must be executed within the app.app_context() context (required by Flask 2.0+). Here is the complete CRUD code. You can copy it directly into app.py and use it.

1. Create Data

python

# Add a user (add this function to app.py)
def add_user(username, email, age=0):
    with app.app_context():
        # 1. Create a user object (represents one record)
        new_user = User(
            username=username,
            email=email,
            age=age
        )
        # 2. Add to the database session
        db.session.add(new_user)
        # 3. Commit the session (actually writes to the database)
        db.session.commit()
        print(f"User added successfully: {new_user}")
        return new_user

# Test the add function (executed when app.py runs)
if __name__ == '__main__':
    # Add 2 test users
    add_user("张三", "zhangsan@163.com", 25)
    add_user("李四", "lisi@qq.com", 30)
    app.run(debug=True)

2. Read Data

Supports single record query, multiple record query, and conditional queries for most scenarios.

python

# Query data (add this function to app.py)
def query_users():
    with app.app_context():
        # ① Query ALL users (returns a list)
        all_users = User.query.all()
        print("All users:")
        for user in all_users:
            print(f"ID: {user.id}, Username: {user.username}, Email: {user.email}, Age: {user.age}")

        # ② Query a single user by ID (very common)
        user1 = User.query.get(1)  # get() accepts the primary key ID
        if user1:
            print(f"\nUser with ID=1: {user1}")

        # ③ Conditional Query (filter users)
        # Find users older than 28
        old_users = User.query.filter(User.age > 28).all()
        print(f"\nUsers older than 28: {old_users}")

        # Find users whose username contains "张"
        zhang_users = User.query.filter(User.username.like("%张%")).all()
        print(f"Users whose username contains '张': {zhang_users}")

        # ④ Sorted Query (by creation time, descending)
        sorted_users = User.query.order_by(User.create_time.desc()).all()
        print(f"\nSorted by creation time (descending): {sorted_users}")

        # ⑤ Paginated Query (2 per page, get page 1)
        paginated_users = User.query.paginate(page=1, per_page=2)
        print("\nPaginated query, page 1:")
        for user in paginated_users.items:
            print(user)

# Test query (add to `if __name__ == '__main__'`)
if __name__ == '__main__':
    # ... previous add code ...
    query_users()  # Call the query function
    app.run(debug=True)

3. Update Data

Update user information based on ID (most common scenario).

python

# Update data (add this function to app.py)
def update_user(user_id, new_username=None, new_email=None, new_age=None):
    with app.app_context():
        # 1. First, find the user to update
        user = User.query.get(user_id)
        if not user:
            print(f"User with ID={user_id} not found")
            return None
        # 2. Update fields (only update non-None parameters passed in)
        if new_username:
            user.username = new_username
        if new_email:
            user.email = new_email
        if new_age is not None:
            user.age = new_age
        # 3. Commit the session (save changes)
        db.session.commit()
        print(f"User updated successfully: {user}")
        return user

# Test update (add to `if __name__ == '__main__'`)
if __name__ == '__main__':
    # ... previous code ...
    update_user(1, new_username="张三三", new_age=26)  # Update user with ID=1
    app.run(debug=True)

4. Delete Data

python

# Delete data (add this function to app.py)
def delete_user(user_id):
    with app.app_context():
        # 1. First, find the user to delete
        user = User.query.get(user_id)
        if not user:
            print(f"User with ID={user_id} not found")
            return False
        # 2. Delete from the session
        db.session.delete(user)
        # 3. Commit the session (confirm deletion)
        db.session.commit()
        print(f"User deleted successfully: {user}")
        return True

# Test delete (add to `if __name__ == '__main__'`)
if __name__ == '__main__':
    # ... previous code ...
    delete_user(2)  # Delete user with ID=2
    app.run(debug=True)

Complete Testing Workflow:

  1. Run app.py first to add 2 users.
  2. Comment out the add code, run the query function to see all users.
  3. Run the update function to modify the user with ID=1.
  4. Run the delete function to delete the user with ID=2.
  5. Query again to verify the results.

V. Practical Example: Flask API Endpoints for Database Operations

The functions above are foundational. In a real web project, you typically operate on the database through API endpoints. Let’s add a few endpoints that you can test with Postman or a browser.

python

# Continuing in app.py, add API routes
from flask import request, jsonify

# Endpoint 1: Add a user (POST request)
@app.route('/api/users', methods=['POST'])
def api_add_user():
    # Get request parameters (JSON format)
    data = request.get_json()
    username = data.get('username')
    email = data.get('email')
    age = data.get('age', 0)

    # Validate required parameters
    if not username or not email:
        return jsonify({"code": 400, "msg": "Username and email are required"}), 400

    # Call the add function
    user = add_user(username, email, age)
    return jsonify({
        "code": 200,
        "msg": "User added successfully",
        "data": {
            "id": user.id,
            "username": user.username,
            "email": user.email,
            "age": user.age
        }
    }), 201

# Endpoint 2: Get all users (GET request)
@app.route('/api/users', methods=['GET'])
def api_get_all_users():
    with app.app_context():
        users = User.query.all()
        # Convert to a JSON-serializable list
        user_list = []
        for user in users:
            user_list.append({
                "id": user.id,
                "username": user.username,
                "email": user.email,
                "age": user.age,
                "create_time": user.create_time.strftime("%Y-%m-%d %H:%M:%S") if user.create_time else None
            })
        return jsonify({
            "code": 200,
            "msg": "Query successful",
            "data": user_list
        })

# Endpoint 3: Get a user by ID (GET request)
@app.route('/api/users/<int:user_id>', methods=['GET'])
def api_get_user(user_id):
    with app.app_context():
        user = User.query.get(user_id)
        if not user:
            return jsonify({"code": 404, "msg": "User not found"}), 404
        return jsonify({
            "code": 200,
            "msg": "Query successful",
            "data": {
                "id": user.id,
                "username": user.username,
                "email": user.email,
                "age": user.age
            }
        })

# Endpoint 4: Update a user (PUT request)
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def api_update_user(user_id):
    data = request.get_json()
    new_username = data.get('username')
    new_email = data.get('email')
    new_age = data.get('age')

    user = update_user(user_id, new_username, new_email, new_age)
    if not user:
        return jsonify({"code": 404, "msg": "User not found"}), 404
    return jsonify({
        "code": 200,
        "msg": "Update successful",
        "data": {"id": user.id, "username": user.username}
    })

# Endpoint 5: Delete a user (DELETE request)
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def api_delete_user(user_id):
    success = delete_user(user_id)
    if not success:
        return jsonify({"code": 404, "msg": "User not found"}), 404
    return jsonify({"code": 200, "msg": "Delete successful"}), 200

if __name__ == '__main__':
    app.run(debug=True)

Test the Endpoints:

  1. Run app.py. The server starts at http://127.0.0.1:5000.
  2. Test with Postman (examples):
    • Add a user: POST http://127.0.0.1:5000/api/users. JSON body: {"username":"王五","email":"wangwu@163.com","age":28}
    • Get all users: GET http://127.0.0.1:5000/api/users
    • Update a user: PUT http://127.0.0.1:5000/api/users/3. JSON body: {"age":29}
    • Delete a user: DELETE http://127.0.0.1:5000/api/users/3

VI. Beginner’s Guide: Common Problem Troubleshooting

  1. Can’t connect to the database?
    • Check if the usernamepassword, and database_name in SQLALCHEMY_DATABASE_URI are correct.
    • Confirm the MySQL service is running (start MySQL in XAMPP/WAMP).
    • Is the port number correct? (Default is 3306. Adjust if you changed it.)
  2. “Duplicate entry” error when adding data?
    • The field is set with unique=True (e.g., username/email), but you’re inserting a duplicate value. Use a unique value.
  3. Chinese characters appear garbled?
    • Did you create the database with utf8mb4 encoding? (Included in the earlier SQL).
    • Ensure SQLALCHEMY_DATABASE_URI doesn’t specify a different encoding.
  4. Flask 2.0+ error “Working outside of application context”?
    • All database operations must be inside a with app.app_context(): block (handled in the code above).

VII. Next Steps for Beginners

  1. Database Migrations: Use Flask-Migrate to manage database schema changes (no need to drop tables when adding/modifying fields): pip install flask-migrate.
  2. Relationships: Learn about linking multiple tables (e.g., Users and Orders).
  3. Transaction Handling: Use transactions for complex operations (db.session.rollback() to rollback on failure).
  4. Data Validation: Use pydantic to validate API parameters (prevents invalid data from entering the database).

Summary

The core of using Flask + Flask-SQLAlchemy with MySQL is: Configure Connection → Define Models → Perform CRUD. You don’t need to write SQL at all, making it easy for beginners to get started quickly.