一、核心概念解析

1.1 基础定义:什么是SQLite?

SQLite不是一个独立的服务器进程,而是一个嵌入式数据库。它的整个数据库就是一个普通的文件,你可以像处理其他文件一样复制、移动、备份它。

SQLite的特点

  1. 零配置:不用安装,不用配置

  2. 无服务器:没有单独的数据库服务

  3. 单文件:整个数据库就是一个文件

  4. 跨平台:文件在不同系统间通用

  5. 功能完整:支持SQL标准的大多数功能

1.2 基本语法:快速上手

sqlite3只需4步:连接、创建游标、执行SQL、关闭连接。

# sqlite3_basic.py
import sqlite3

# 1. 连接到数据库(不存在则创建)
conn = sqlite3.connect('example.db')

# 2. 创建游标对象
cursor = conn.cursor()

# 3. 执行SQL语句
# 创建表
cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        age INTEGER
    )
''')

# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES ('张三', 25)")
cursor.execute("INSERT INTO users (name, age) VALUES ('李四', 30)")

# 提交更改
conn.commit()

# 4. 查询数据
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()

print("用户列表:")
for row in rows:
    print(f"  ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}")

# 5. 关闭连接
conn.close()

运行这个脚本,你会看到两个用户信息,当前目录会生成example.db文件。

1.3 核心特点:为什么选择SQLite?

  1. 零依赖:Python自带,不用安装任何东西

  2. 简单易用:像操作文件一样简单

  3. 快速开发:原型开发的完美选择

  4. 学习友好:学习SQL的最佳起点

  5. 适用场景广:适合大多数小型应用

二、应用场景详解

2.1 用户数据存储

场景:存储用户信息,支持增删改查

# user_management.py
import sqlite3

def setup_database():
    """初始化数据库"""
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT UNIQUE NOT NULL,
            email TEXT UNIQUE NOT NULL,
            age INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    conn.commit()
    conn.close()
    print("数据库初始化完成")

def add_user(username, email, age):
    """添加用户"""
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    
    try:
        cursor.execute(
            "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
            (username, email, age)
        )
        conn.commit()
        print(f"用户 {username} 添加成功")
    except sqlite3.IntegrityError:
        print(f"用户 {username} 或邮箱 {email} 已存在")
    finally:
        conn.close()

def get_users():
    """获取所有用户"""
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    
    cursor.execute("SELECT * FROM users ORDER BY id")
    users = cursor.fetchall()
    
    conn.close()
    return users

def update_user_age(username, new_age):
    """更新用户年龄"""
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    
    cursor.execute(
        "UPDATE users SET age = ? WHERE username = ?",
        (new_age, username)
    )
    
    if cursor.rowcount > 0:
        print(f"用户 {username} 年龄更新为 {new_age}")
    else:
        print(f"用户 {username} 不存在")
    
    conn.commit()
    conn.close()

def delete_user(username):
    """删除用户"""
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    
    cursor.execute("DELETE FROM users WHERE username = ?", (username,))
    
    if cursor.rowcount > 0:
        print(f"用户 {username} 已删除")
    else:
        print(f"用户 {username} 不存在")
    
    conn.commit()
    conn.close()

# 演示
if __name__ == "__main__":
    print("=== 用户管理系统演示 ===\n")
    
    # 初始化数据库
    setup_database()
    
    # 添加用户
    print("1. 添加用户:")
    add_user("张三", "zhangsan@example.com", 25)
    add_user("李四", "lisi@example.com", 30)
    add_user("王五", "wangwu@example.com", 28)
    
    print("\n2. 显示所有用户:")
    users = get_users()
    for user in users:
        print(f"  {user[1]} ({user[2]}), 年龄: {user[3]}")
    
    print("\n3. 更新用户年龄:")
    update_user_age("张三", 26)
    
    print("\n4. 删除用户:")
    delete_user("王五")
    
    print("\n5. 最终用户列表:")
    users = get_users()
    for user in users:
        print(f"  {user[1]} ({user[2]}), 年龄: {user[3]}")

2.2 缓存API数据

场景:缓存网络请求结果,提高响应速度

# api_cache.py
import sqlite3
import time
import json
from datetime import datetime, timedelta

class APICache:
    """API缓存管理器"""
    
    def __init__(self, db_file='api_cache.db'):
        self.db_file = db_file
        self.setup_database()
    
    def setup_database(self):
        """设置数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS cache (
                key TEXT PRIMARY KEY,
                data TEXT NOT NULL,
                expires_at TIMESTAMP NOT NULL
            )
        ''')
        
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_expires ON cache (expires_at)
        ''')
        
        conn.commit()
        conn.close()
    
    def get(self, key):
        """获取缓存数据"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute(
            "SELECT data FROM cache WHERE key = ? AND expires_at > ?",
            (key, datetime.now())
        )
        
        result = cursor.fetchone()
        conn.close()
        
        if result:
            return json.loads(result[0])
        return None
    
    def set(self, key, data, ttl_seconds=300):
        """设置缓存数据"""
        expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
        
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute(
            '''INSERT OR REPLACE INTO cache (key, data, expires_at)
               VALUES (?, ?, ?)''',
            (key, json.dumps(data), expires_at)
        )
        
        conn.commit()
        conn.close()
    
    def cleanup_expired(self):
        """清理过期缓存"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute("DELETE FROM cache WHERE expires_at <= ?", (datetime.now(),))
        deleted = cursor.rowcount
        
        conn.commit()
        conn.close()
        
        if deleted > 0:
            print(f"清理了 {deleted} 个过期缓存")
        return deleted

def simulate_api_request(endpoint, params):
    """模拟API请求(实际中可能是requests.get)"""
    print(f"请求API: {endpoint} {params}")
    time.sleep(1)  # 模拟网络延迟
    
    # 模拟返回数据
    return {
        'endpoint': endpoint,
        'params': params,
        'data': f"{endpoint} 的返回数据",
        'timestamp': datetime.now().isoformat()
    }

def demo_api_cache():
    """演示API缓存"""
    print("=== API缓存演示 ===\n")
    
    cache = APICache()
    
    def get_with_cache(endpoint, params, cache_ttl=10):
        """带缓存的API请求"""
        cache_key = f"{endpoint}:{json.dumps(params, sort_keys=True)}"
        
        # 尝试从缓存获取
        cached_data = cache.get(cache_key)
        if cached_data:
            print(f"  [缓存命中] {endpoint}")
            return cached_data
        
        # 缓存未命中,请求API
        print(f"  [请求API] {endpoint}")
        data = simulate_api_request(endpoint, params)
        
        # 存入缓存
        cache.set(cache_key, data, cache_ttl)
        return data
    
    print("1. 第一次请求(会调用API):")
    data1 = get_with_cache('/api/users', {'page': 1})
    print(f"   返回: {data1['data']}")
    
    print("\n2. 立即再次请求(会从缓存读取):")
    data2 = get_with_cache('/api/users', {'page': 1})
    print(f"   返回: {data2['data']}")
    
    print("\n3. 清理过期缓存:")
    cache.cleanup_expired()
    
    print("\n实际应用:")
    print("  1. 缓存API响应,减少网络请求")
    print("  2. 缓存计算结果,提高性能")
    print("  3. 设置合适的TTL,平衡实时性和性能")

if __name__ == "__main__":
    demo_api_cache()

2.3 应用配置存储

场景:用数据库存储应用配置,替代配置文件

# app_config_db.py
import sqlite3
import json

class ConfigDB:
    """配置数据库"""
    
    def __init__(self, db_file='config.db'):
        self.db_file = db_file
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS config (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL,
                description TEXT
            )
        ''')
        
        # 插入默认配置
        default_configs = [
            ('app.name', '我的应用', '应用名称'),
            ('app.debug', 'false', '调试模式'),
            ('server.port', '8080', '服务器端口'),
            ('database.host', 'localhost', '数据库主机'),
        ]
        
        cursor.executemany(
            "INSERT OR IGNORE INTO config VALUES (?, ?, ?)",
            default_configs
        )
        
        conn.commit()
        conn.close()
    
    def get(self, key, default=None):
        """获取配置值"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute("SELECT value FROM config WHERE key = ?", (key,))
        result = cursor.fetchone()
        
        conn.close()
        
        if result is None:
            return default
        
        value = result[0]
        
        # 尝试转换为布尔值
        if value.lower() in ('true', 'false'):
            return value.lower() == 'true'
        
        # 尝试转换为整数
        if value.isdigit():
            return int(value)
        
        return value
    
    def set(self, key, value, description=None):
        """设置配置值"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute(
            '''INSERT OR REPLACE INTO config (key, value, description)
               VALUES (?, ?, COALESCE(?, 
                   (SELECT description FROM config WHERE key = ?)))''',
            (key, str(value), description, key)
        )
        
        conn.commit()
        conn.close()
        print(f"配置已更新: {key} = {value}")
    
    def list_all(self):
        """列出所有配置"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute("SELECT * FROM config ORDER BY key")
        configs = cursor.fetchall()
        
        conn.close()
        return configs

def demo_config_db():
    """演示配置数据库"""
    print("=== 应用配置数据库演示 ===\n")
    
    config = ConfigDB('demo_config.db')
    
    print("1. 获取默认配置:")
    print(f"   应用名称: {config.get('app.name')}")
    print(f"   调试模式: {config.get('app.debug')}")
    print(f"   服务器端口: {config.get('server.port')}")
    
    print("\n2. 更新配置:")
    config.set('app.debug', 'true')
    config.set('server.port', 9090)
    config.set('custom.setting', 'custom_value', '自定义设置')
    
    print("\n3. 获取更新后的配置:")
    print(f"   调试模式: {config.get('app.debug')} (类型: {type(config.get('app.debug'))})")
    print(f"   服务器端口: {config.get('server.port')} (类型: {type(config.get('server.port'))})")
    
    print("\n4. 所有配置:")
    configs = config.list_all()
    for key, value, desc in configs:
        print(f"   {key} = {value}  # {desc}")

if __name__ == "__main__":
    demo_config_db()

三、高级技巧

3.1 使用字典形式返回结果

默认返回元组,我们可以自定义返回字典:

# dict_row_factory.py
import sqlite3

def dict_factory(cursor, row):
    """将行转换为字典"""
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

def demo_dict_factory():
    """演示字典形式的返回结果"""
    conn = sqlite3.connect(':memory:')
    
    # 设置行工厂
    conn.row_factory = dict_factory
    cursor = conn.cursor()
    
    # 创建测试表
    cursor.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    ''')
    
    # 插入测试数据
    cursor.execute(
        "INSERT INTO users (name, email) VALUES ('张三', 'zhangsan@example.com')"
    )
    cursor.execute(
        "INSERT INTO users (name, email) VALUES ('李四', 'lisi@example.com')"
    )
    
    # 查询数据
    cursor.execute("SELECT * FROM users")
    rows = cursor.fetchall()
    
    print("查询结果(字典形式):")
    for row in rows:
        print(f"  ID: {row['id']}, 姓名: {row['name']}, 邮箱: {row['email']}")
    
    conn.close()

if __name__ == "__main__":
    demo_dict_factory()

3.2 使用事务

事务确保多个操作要么全部成功,要么全部失败:

# transaction_demo.py
import sqlite3

def demo_transaction():
    """演示事务使用"""
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()
    
    # 创建测试表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS accounts (
            id INTEGER PRIMARY KEY,
            name TEXT,
            balance REAL
        )
    ''')
    
    # 清空表
    cursor.execute("DELETE FROM accounts")
    
    # 插入测试数据
    cursor.execute("INSERT INTO accounts (name, balance) VALUES ('张三', 1000)")
    cursor.execute("INSERT INTO accounts (name, balance) VALUES ('李四', 500)")
    
    conn.commit()
    
    def transfer_money(from_name, to_name, amount):
        """转账函数"""
        try:
            # 开始事务
            cursor.execute("BEGIN TRANSACTION")
            
            # 检查余额
            cursor.execute(
                "SELECT balance FROM accounts WHERE name = ?",
                (from_name,)
            )
            from_balance = cursor.fetchone()[0]
            
            if from_balance < amount:
                raise ValueError("余额不足")
            
            # 扣款
            cursor.execute(
                "UPDATE accounts SET balance = balance - ? WHERE name = ?",
                (amount, from_name)
            )
            
            # 存款
            cursor.execute(
                "UPDATE accounts SET balance = balance + ? WHERE name = ?",
                (amount, to_name)
            )
            
            # 提交事务
            conn.commit()
            print(f"转账成功: {from_name} -> {to_name}{amount}")
            return True
            
        except Exception as e:
            # 回滚事务
            conn.rollback()
            print(f"转账失败: {e}")
            return False
    
    print("转账演示:")
    print("="*40)
    
    # 显示初始余额
    cursor.execute("SELECT * FROM accounts")
    print("\n初始余额:")
    for row in cursor.fetchall():
        print(f"  {row[1]}: ¥{row[2]}")
    
    print("\n" + "="*40)
    
    # 成功转账
    print("\n1. 成功转账:")
    transfer_money('张三', '李四', 200)
    
    cursor.execute("SELECT * FROM accounts")
    print("\n转账后余额:")
    for row in cursor.fetchall():
        print(f"  {row[1]}: ¥{row[2]}")
    
    print("\n" + "="*40)
    
    # 失败转账(余额不足)
    print("\n2. 失败转账(余额不足):")
    transfer_money('李四', '张三', 1000)
    
    cursor.execute("SELECT * FROM accounts")
    print("\n转账后余额(应保持不变):")
    for row in cursor.fetchall():
        print(f"  {row[1]}: ¥{row[2]}")
    
    conn.close()

if __name__ == "__main__":
    demo_transaction()

3.3 内存数据库

SQLite支持内存数据库,适合临时数据处理:

# memory_db.py
import sqlite3

def demo_memory_db():
    """演示内存数据库"""
    # 连接到内存数据库
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # 创建表
    cursor.execute('''
        CREATE TABLE temp_data (
            id INTEGER PRIMARY KEY,
            value TEXT
        )
    ''')
    
    # 插入数据
    data = [('数据1',), ('数据2',), ('数据3',)]
    cursor.executemany("INSERT INTO temp_data (value) VALUES (?)", data)
    
    # 查询数据
    cursor.execute("SELECT * FROM temp_data")
    rows = cursor.fetchall()
    
    print("内存数据库中的数据:")
    for row in rows:
        print(f"  ID: {row[0]}, 值: {row[1]}")
    
    # 关闭连接后,数据就消失了
    conn.close()
    
    print("\n注意: 内存数据库在连接关闭后数据就丢失了")
    print("适合临时计算、中间结果存储等场景")

if __name__ == "__main__":
    demo_memory_db()

四、实战案例

4.1 简易任务管理系统

创建一个完整的任务管理系统:

# task_manager_simple.py
import sqlite3
from datetime import datetime

class TaskManager:
    """简易任务管理器"""
    
    def __init__(self, db_file='tasks.db'):
        self.db_file = db_file
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS tasks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                description TEXT,
                status TEXT DEFAULT '待办',  -- 待办, 进行中, 完成
                priority INTEGER DEFAULT 1,  -- 1:低, 2:中, 3:高
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def add_task(self, title, description=''):
        """添加任务"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute(
            "INSERT INTO tasks (title, description) VALUES (?, ?)",
            (title, description)
        )
        
        conn.commit()
        conn.close()
        
        print(f"任务添加成功: {title}")
    
    def get_tasks(self, status=None):
        """获取任务列表"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        if status:
            cursor.execute(
                "SELECT * FROM tasks WHERE status = ? ORDER BY priority DESC, created_at",
                (status,)
            )
        else:
            cursor.execute("SELECT * FROM tasks ORDER BY priority DESC, created_at")
        
        tasks = cursor.fetchall()
        conn.close()
        return tasks
    
    def update_status(self, task_id, new_status):
        """更新任务状态"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute(
            "UPDATE tasks SET status = ? WHERE id = ?",
            (new_status, task_id)
        )
        
        if cursor.rowcount > 0:
            print(f"任务 {task_id} 状态更新为: {new_status}")
        else:
            print(f"任务 {task_id} 不存在")
        
        conn.commit()
        conn.close()
    
    def delete_task(self, task_id):
        """删除任务"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
        
        if cursor.rowcount > 0:
            print(f"任务 {task_id} 已删除")
        else:
            print(f"任务 {task_id} 不存在")
        
        conn.commit()
        conn.close()
    
    def print_tasks(self, tasks):
        """打印任务列表"""
        if not tasks:
            print("没有任务")
            return
        
        print(f"\n{'ID':<3} {'状态':<6} {'优先级':<6} {'标题':<20}")
        print("-" * 40)
        
        for task in tasks:
            task_id, title, description, status, priority, created_at = task
            
            # 优先级显示
            priority_map = {1: '低', 2: '中', 3: '高'}
            priority_display = priority_map.get(priority, priority)
            
            print(f"{task_id:<3} {status:<6} {priority_display:<6} {title:<20}")

def demo_task_manager():
    """演示任务管理器"""
    print("=== 简易任务管理系统 ===\n")
    
    task_mgr = TaskManager('demo_tasks.db')
    
    # 添加任务
    print("1. 添加任务:")
    tasks = [
        ('完成项目报告', '编写季度总结'),
        ('购买日用品', '牛奶、鸡蛋、面包'),
        ('学习Python', '学习sqlite3模块'),
        ('会议准备', '准备会议材料'),
    ]
    
    for title, description in tasks:
        task_mgr.add_task(title, description)
    
    print("\n2. 显示所有任务:")
    all_tasks = task_mgr.get_tasks()
    task_mgr.print_tasks(all_tasks)
    
    print("\n3. 更新任务状态:")
    task_mgr.update_status(1, '进行中')
    task_mgr.update_status(2, '完成')
    
    print("\n4. 显示待办任务:")
    pending_tasks = task_mgr.get_tasks('待办')
    task_mgr.print_tasks(pending_tasks)
    
    print("\n5. 删除任务:")
    task_mgr.delete_task(3)
    
    print("\n6. 最终任务列表:")
    final_tasks = task_mgr.get_tasks()
    task_mgr.print_tasks(final_tasks)

if __name__ == "__main__":
    demo_task_manager()

五、注意事项

5.1 使用限制

  1. 并发写入
  • SQLite支持多个读操作

  • 但同一时间只支持一个写操作

  1. 数据量限制
  • 适合中小型应用

  • 大数据量时性能可能下降

  1. 网络访问
  • 不支持远程访问

  • 数据库文件必须在本地

5.2 常见问题

Q: SQLite能替代MySQL吗?

A: 对于小型应用、移动应用、开发原型,SQLite是很好的选择。但对于高并发、大数据量、需要远程访问的场景,应该使用MySQL等数据库。

Q: 数据库文件损坏怎么办?

A: 定期备份数据库文件。可以使用.dump命令导出SQL语句备份。

Q: 如何优化SQLite性能?

A: 1. 使用事务批量操作

  1. 创建合适的索引

  2. 使用PRAGMA命令优化

  3. 合理设计表结构

Q: 如何处理并发访问?

A: SQLite使用文件锁处理并发。如果应用需要高并发写入,考虑使用客户端/服务器数据库。

Q: 数据库可以加密吗?

A: 原生SQLite不支持加密。可以使用SQLCipher等第三方扩展。

5.3 替代方案

  1. 客户端/服务器数据库
  • MySQL、PostgreSQL:功能更强大
  • MongoDB:文档数据库
  1. Python ORM库
  • SQLAlchemy:功能强大的ORM

  • Peewee:轻量级ORM

  • Django ORM:Django框架内置

  1. 其他嵌入式数据库
  • DuckDB:分析型数据库

  • TinyDB:纯Python文档数据库

什么情况下用SQLite

  • 开发原型或小型应用

  • 需要单文件数据库

  • 本地存储应用数据

  • 移动应用或嵌入式设备

  • 不需要高并发写入

六、总结

通过本文的学习,你应该已经掌握了:

  • SQLite基础:连接数据库、执行SQL、获取结果

  • 数据操作:增删改查、事务处理

  • 高级特性:内存数据库、字典返回

  • 实战应用:任务管理、配置存储、API缓存

sqlite3的核心价值

  1. 简单易用:几行代码就能用上数据库

  2. 零配置:无需安装配置

  3. 快速开发:原型开发利器

  4. 学习友好:学习SQL的好起点

最佳实践建议

  1. 使用参数化查询:防止SQL注入

  2. 处理异常:数据库操作可能失败

  3. 定期备份:重要数据定期备份

  4. 合理设计:设计合适的表结构

  5. 使用事务:确保数据一致性


你在项目中使用过SQLite吗?有什么经验或问题?欢迎在评论区分享你的想法!

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐