轻量级数据库引擎 —— sqlite3模块
SQLite不是一个独立的服务器进程,而是一个嵌入式数据库。它的整个数据库就是一个普通的文件,你可以像处理其他文件一样复制、移动、备份它。SQLite的特点零配置:不用安装,不用配置无服务器:没有单独的数据库服务单文件:整个数据库就是一个文件跨平台:文件在不同系统间通用功能完整:支持SQL标准的大多数功能通过本文的学习,你应该已经掌握了:✅SQLite基础:连接数据库、执行SQL、获取结果✅数据操
一、核心概念解析
1.1 基础定义:什么是SQLite?
SQLite不是一个独立的服务器进程,而是一个嵌入式数据库。它的整个数据库就是一个普通的文件,你可以像处理其他文件一样复制、移动、备份它。
SQLite的特点:
-
零配置:不用安装,不用配置
-
无服务器:没有单独的数据库服务
-
单文件:整个数据库就是一个文件
-
跨平台:文件在不同系统间通用
-
功能完整:支持SQL标准的大多数功能
1.2 基本语法:快速上手
用sqlite3只需4步:连接、创建游标、执行SQL、关闭连接。
# sqlite3_basic.py
import sqlite3
# 1. 连接到数据库(不存在则创建)
conn = sqlite3.connect('example.db')
# 2. 创建游标对象
cursor = conn.cursor()
# 3. 执行SQL语句
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER
)
''')
# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES ('张三', 25)")
cursor.execute("INSERT INTO users (name, age) VALUES ('李四', 30)")
# 提交更改
conn.commit()
# 4. 查询数据
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
print("用户列表:")
for row in rows:
print(f" ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}")
# 5. 关闭连接
conn.close()
运行这个脚本,你会看到两个用户信息,当前目录会生成example.db文件。
1.3 核心特点:为什么选择SQLite?
-
零依赖:Python自带,不用安装任何东西
-
简单易用:像操作文件一样简单
-
快速开发:原型开发的完美选择
-
学习友好:学习SQL的最佳起点
-
适用场景广:适合大多数小型应用
二、应用场景详解
2.1 用户数据存储
场景:存储用户信息,支持增删改查
# user_management.py
import sqlite3
def setup_database():
"""初始化数据库"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
print("数据库初始化完成")
def add_user(username, email, age):
"""添加用户"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
(username, email, age)
)
conn.commit()
print(f"用户 {username} 添加成功")
except sqlite3.IntegrityError:
print(f"用户 {username} 或邮箱 {email} 已存在")
finally:
conn.close()
def get_users():
"""获取所有用户"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM users ORDER BY id")
users = cursor.fetchall()
conn.close()
return users
def update_user_age(username, new_age):
"""更新用户年龄"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET age = ? WHERE username = ?",
(new_age, username)
)
if cursor.rowcount > 0:
print(f"用户 {username} 年龄更新为 {new_age}")
else:
print(f"用户 {username} 不存在")
conn.commit()
conn.close()
def delete_user(username):
"""删除用户"""
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE username = ?", (username,))
if cursor.rowcount > 0:
print(f"用户 {username} 已删除")
else:
print(f"用户 {username} 不存在")
conn.commit()
conn.close()
# 演示
if __name__ == "__main__":
print("=== 用户管理系统演示 ===\n")
# 初始化数据库
setup_database()
# 添加用户
print("1. 添加用户:")
add_user("张三", "zhangsan@example.com", 25)
add_user("李四", "lisi@example.com", 30)
add_user("王五", "wangwu@example.com", 28)
print("\n2. 显示所有用户:")
users = get_users()
for user in users:
print(f" {user[1]} ({user[2]}), 年龄: {user[3]}")
print("\n3. 更新用户年龄:")
update_user_age("张三", 26)
print("\n4. 删除用户:")
delete_user("王五")
print("\n5. 最终用户列表:")
users = get_users()
for user in users:
print(f" {user[1]} ({user[2]}), 年龄: {user[3]}")
2.2 缓存API数据
场景:缓存网络请求结果,提高响应速度
# api_cache.py
import sqlite3
import time
import json
from datetime import datetime, timedelta
class APICache:
"""API缓存管理器"""
def __init__(self, db_file='api_cache.db'):
self.db_file = db_file
self.setup_database()
def setup_database(self):
"""设置数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
data TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_expires ON cache (expires_at)
''')
conn.commit()
conn.close()
def get(self, key):
"""获取缓存数据"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute(
"SELECT data FROM cache WHERE key = ? AND expires_at > ?",
(key, datetime.now())
)
result = cursor.fetchone()
conn.close()
if result:
return json.loads(result[0])
return None
def set(self, key, data, ttl_seconds=300):
"""设置缓存数据"""
expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute(
'''INSERT OR REPLACE INTO cache (key, data, expires_at)
VALUES (?, ?, ?)''',
(key, json.dumps(data), expires_at)
)
conn.commit()
conn.close()
def cleanup_expired(self):
"""清理过期缓存"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute("DELETE FROM cache WHERE expires_at <= ?", (datetime.now(),))
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted > 0:
print(f"清理了 {deleted} 个过期缓存")
return deleted
def simulate_api_request(endpoint, params):
"""模拟API请求(实际中可能是requests.get)"""
print(f"请求API: {endpoint} {params}")
time.sleep(1) # 模拟网络延迟
# 模拟返回数据
return {
'endpoint': endpoint,
'params': params,
'data': f"{endpoint} 的返回数据",
'timestamp': datetime.now().isoformat()
}
def demo_api_cache():
"""演示API缓存"""
print("=== API缓存演示 ===\n")
cache = APICache()
def get_with_cache(endpoint, params, cache_ttl=10):
"""带缓存的API请求"""
cache_key = f"{endpoint}:{json.dumps(params, sort_keys=True)}"
# 尝试从缓存获取
cached_data = cache.get(cache_key)
if cached_data:
print(f" [缓存命中] {endpoint}")
return cached_data
# 缓存未命中,请求API
print(f" [请求API] {endpoint}")
data = simulate_api_request(endpoint, params)
# 存入缓存
cache.set(cache_key, data, cache_ttl)
return data
print("1. 第一次请求(会调用API):")
data1 = get_with_cache('/api/users', {'page': 1})
print(f" 返回: {data1['data']}")
print("\n2. 立即再次请求(会从缓存读取):")
data2 = get_with_cache('/api/users', {'page': 1})
print(f" 返回: {data2['data']}")
print("\n3. 清理过期缓存:")
cache.cleanup_expired()
print("\n实际应用:")
print(" 1. 缓存API响应,减少网络请求")
print(" 2. 缓存计算结果,提高性能")
print(" 3. 设置合适的TTL,平衡实时性和性能")
if __name__ == "__main__":
demo_api_cache()
2.3 应用配置存储
场景:用数据库存储应用配置,替代配置文件
# app_config_db.py
import sqlite3
import json
class ConfigDB:
"""配置数据库"""
def __init__(self, db_file='config.db'):
self.db_file = db_file
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
description TEXT
)
''')
# 插入默认配置
default_configs = [
('app.name', '我的应用', '应用名称'),
('app.debug', 'false', '调试模式'),
('server.port', '8080', '服务器端口'),
('database.host', 'localhost', '数据库主机'),
]
cursor.executemany(
"INSERT OR IGNORE INTO config VALUES (?, ?, ?)",
default_configs
)
conn.commit()
conn.close()
def get(self, key, default=None):
"""获取配置值"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute("SELECT value FROM config WHERE key = ?", (key,))
result = cursor.fetchone()
conn.close()
if result is None:
return default
value = result[0]
# 尝试转换为布尔值
if value.lower() in ('true', 'false'):
return value.lower() == 'true'
# 尝试转换为整数
if value.isdigit():
return int(value)
return value
def set(self, key, value, description=None):
"""设置配置值"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute(
'''INSERT OR REPLACE INTO config (key, value, description)
VALUES (?, ?, COALESCE(?,
(SELECT description FROM config WHERE key = ?)))''',
(key, str(value), description, key)
)
conn.commit()
conn.close()
print(f"配置已更新: {key} = {value}")
def list_all(self):
"""列出所有配置"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute("SELECT * FROM config ORDER BY key")
configs = cursor.fetchall()
conn.close()
return configs
def demo_config_db():
"""演示配置数据库"""
print("=== 应用配置数据库演示 ===\n")
config = ConfigDB('demo_config.db')
print("1. 获取默认配置:")
print(f" 应用名称: {config.get('app.name')}")
print(f" 调试模式: {config.get('app.debug')}")
print(f" 服务器端口: {config.get('server.port')}")
print("\n2. 更新配置:")
config.set('app.debug', 'true')
config.set('server.port', 9090)
config.set('custom.setting', 'custom_value', '自定义设置')
print("\n3. 获取更新后的配置:")
print(f" 调试模式: {config.get('app.debug')} (类型: {type(config.get('app.debug'))})")
print(f" 服务器端口: {config.get('server.port')} (类型: {type(config.get('server.port'))})")
print("\n4. 所有配置:")
configs = config.list_all()
for key, value, desc in configs:
print(f" {key} = {value} # {desc}")
if __name__ == "__main__":
demo_config_db()
三、高级技巧
3.1 使用字典形式返回结果
默认返回元组,我们可以自定义返回字典:
# dict_row_factory.py
import sqlite3
def dict_factory(cursor, row):
"""将行转换为字典"""
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def demo_dict_factory():
"""演示字典形式的返回结果"""
conn = sqlite3.connect(':memory:')
# 设置行工厂
conn.row_factory = dict_factory
cursor = conn.cursor()
# 创建测试表
cursor.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
''')
# 插入测试数据
cursor.execute(
"INSERT INTO users (name, email) VALUES ('张三', 'zhangsan@example.com')"
)
cursor.execute(
"INSERT INTO users (name, email) VALUES ('李四', 'lisi@example.com')"
)
# 查询数据
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
print("查询结果(字典形式):")
for row in rows:
print(f" ID: {row['id']}, 姓名: {row['name']}, 邮箱: {row['email']}")
conn.close()
if __name__ == "__main__":
demo_dict_factory()
3.2 使用事务
事务确保多个操作要么全部成功,要么全部失败:
# transaction_demo.py
import sqlite3
def demo_transaction():
"""演示事务使用"""
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 创建测试表
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY,
name TEXT,
balance REAL
)
''')
# 清空表
cursor.execute("DELETE FROM accounts")
# 插入测试数据
cursor.execute("INSERT INTO accounts (name, balance) VALUES ('张三', 1000)")
cursor.execute("INSERT INTO accounts (name, balance) VALUES ('李四', 500)")
conn.commit()
def transfer_money(from_name, to_name, amount):
"""转账函数"""
try:
# 开始事务
cursor.execute("BEGIN TRANSACTION")
# 检查余额
cursor.execute(
"SELECT balance FROM accounts WHERE name = ?",
(from_name,)
)
from_balance = cursor.fetchone()[0]
if from_balance < amount:
raise ValueError("余额不足")
# 扣款
cursor.execute(
"UPDATE accounts SET balance = balance - ? WHERE name = ?",
(amount, from_name)
)
# 存款
cursor.execute(
"UPDATE accounts SET balance = balance + ? WHERE name = ?",
(amount, to_name)
)
# 提交事务
conn.commit()
print(f"转账成功: {from_name} -> {to_name} ¥{amount}")
return True
except Exception as e:
# 回滚事务
conn.rollback()
print(f"转账失败: {e}")
return False
print("转账演示:")
print("="*40)
# 显示初始余额
cursor.execute("SELECT * FROM accounts")
print("\n初始余额:")
for row in cursor.fetchall():
print(f" {row[1]}: ¥{row[2]}")
print("\n" + "="*40)
# 成功转账
print("\n1. 成功转账:")
transfer_money('张三', '李四', 200)
cursor.execute("SELECT * FROM accounts")
print("\n转账后余额:")
for row in cursor.fetchall():
print(f" {row[1]}: ¥{row[2]}")
print("\n" + "="*40)
# 失败转账(余额不足)
print("\n2. 失败转账(余额不足):")
transfer_money('李四', '张三', 1000)
cursor.execute("SELECT * FROM accounts")
print("\n转账后余额(应保持不变):")
for row in cursor.fetchall():
print(f" {row[1]}: ¥{row[2]}")
conn.close()
if __name__ == "__main__":
demo_transaction()
3.3 内存数据库
SQLite支持内存数据库,适合临时数据处理:
# memory_db.py
import sqlite3
def demo_memory_db():
"""演示内存数据库"""
# 连接到内存数据库
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE temp_data (
id INTEGER PRIMARY KEY,
value TEXT
)
''')
# 插入数据
data = [('数据1',), ('数据2',), ('数据3',)]
cursor.executemany("INSERT INTO temp_data (value) VALUES (?)", data)
# 查询数据
cursor.execute("SELECT * FROM temp_data")
rows = cursor.fetchall()
print("内存数据库中的数据:")
for row in rows:
print(f" ID: {row[0]}, 值: {row[1]}")
# 关闭连接后,数据就消失了
conn.close()
print("\n注意: 内存数据库在连接关闭后数据就丢失了")
print("适合临时计算、中间结果存储等场景")
if __name__ == "__main__":
demo_memory_db()
四、实战案例
4.1 简易任务管理系统
创建一个完整的任务管理系统:
# task_manager_simple.py
import sqlite3
from datetime import datetime
class TaskManager:
"""简易任务管理器"""
def __init__(self, db_file='tasks.db'):
self.db_file = db_file
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT '待办', -- 待办, 进行中, 完成
priority INTEGER DEFAULT 1, -- 1:低, 2:中, 3:高
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def add_task(self, title, description=''):
"""添加任务"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO tasks (title, description) VALUES (?, ?)",
(title, description)
)
conn.commit()
conn.close()
print(f"任务添加成功: {title}")
def get_tasks(self, status=None):
"""获取任务列表"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
if status:
cursor.execute(
"SELECT * FROM tasks WHERE status = ? ORDER BY priority DESC, created_at",
(status,)
)
else:
cursor.execute("SELECT * FROM tasks ORDER BY priority DESC, created_at")
tasks = cursor.fetchall()
conn.close()
return tasks
def update_status(self, task_id, new_status):
"""更新任务状态"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute(
"UPDATE tasks SET status = ? WHERE id = ?",
(new_status, task_id)
)
if cursor.rowcount > 0:
print(f"任务 {task_id} 状态更新为: {new_status}")
else:
print(f"任务 {task_id} 不存在")
conn.commit()
conn.close()
def delete_task(self, task_id):
"""删除任务"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
if cursor.rowcount > 0:
print(f"任务 {task_id} 已删除")
else:
print(f"任务 {task_id} 不存在")
conn.commit()
conn.close()
def print_tasks(self, tasks):
"""打印任务列表"""
if not tasks:
print("没有任务")
return
print(f"\n{'ID':<3} {'状态':<6} {'优先级':<6} {'标题':<20}")
print("-" * 40)
for task in tasks:
task_id, title, description, status, priority, created_at = task
# 优先级显示
priority_map = {1: '低', 2: '中', 3: '高'}
priority_display = priority_map.get(priority, priority)
print(f"{task_id:<3} {status:<6} {priority_display:<6} {title:<20}")
def demo_task_manager():
"""演示任务管理器"""
print("=== 简易任务管理系统 ===\n")
task_mgr = TaskManager('demo_tasks.db')
# 添加任务
print("1. 添加任务:")
tasks = [
('完成项目报告', '编写季度总结'),
('购买日用品', '牛奶、鸡蛋、面包'),
('学习Python', '学习sqlite3模块'),
('会议准备', '准备会议材料'),
]
for title, description in tasks:
task_mgr.add_task(title, description)
print("\n2. 显示所有任务:")
all_tasks = task_mgr.get_tasks()
task_mgr.print_tasks(all_tasks)
print("\n3. 更新任务状态:")
task_mgr.update_status(1, '进行中')
task_mgr.update_status(2, '完成')
print("\n4. 显示待办任务:")
pending_tasks = task_mgr.get_tasks('待办')
task_mgr.print_tasks(pending_tasks)
print("\n5. 删除任务:")
task_mgr.delete_task(3)
print("\n6. 最终任务列表:")
final_tasks = task_mgr.get_tasks()
task_mgr.print_tasks(final_tasks)
if __name__ == "__main__":
demo_task_manager()
五、注意事项
5.1 使用限制
- 并发写入:
-
SQLite支持多个读操作
-
但同一时间只支持一个写操作
- 数据量限制:
-
适合中小型应用
-
大数据量时性能可能下降
- 网络访问:
-
不支持远程访问
-
数据库文件必须在本地
5.2 常见问题
Q: SQLite能替代MySQL吗?
A: 对于小型应用、移动应用、开发原型,SQLite是很好的选择。但对于高并发、大数据量、需要远程访问的场景,应该使用MySQL等数据库。
Q: 数据库文件损坏怎么办?
A: 定期备份数据库文件。可以使用.dump命令导出SQL语句备份。
Q: 如何优化SQLite性能?
A: 1. 使用事务批量操作
-
创建合适的索引
-
使用
PRAGMA命令优化 -
合理设计表结构
Q: 如何处理并发访问?
A: SQLite使用文件锁处理并发。如果应用需要高并发写入,考虑使用客户端/服务器数据库。
Q: 数据库可以加密吗?
A: 原生SQLite不支持加密。可以使用SQLCipher等第三方扩展。
5.3 替代方案
- 客户端/服务器数据库:
- MySQL、PostgreSQL:功能更强大
- MongoDB:文档数据库
- Python ORM库:
-
SQLAlchemy:功能强大的ORM
-
Peewee:轻量级ORM
-
Django ORM:Django框架内置
- 其他嵌入式数据库:
-
DuckDB:分析型数据库
-
TinyDB:纯Python文档数据库
什么情况下用SQLite:
-
开发原型或小型应用
-
需要单文件数据库
-
本地存储应用数据
-
移动应用或嵌入式设备
-
不需要高并发写入
六、总结
通过本文的学习,你应该已经掌握了:
-
✅ SQLite基础:连接数据库、执行SQL、获取结果
-
✅ 数据操作:增删改查、事务处理
-
✅ 高级特性:内存数据库、字典返回
-
✅ 实战应用:任务管理、配置存储、API缓存
sqlite3的核心价值:
-
简单易用:几行代码就能用上数据库
-
零配置:无需安装配置
-
快速开发:原型开发利器
-
学习友好:学习SQL的好起点
最佳实践建议:
-
使用参数化查询:防止SQL注入
-
处理异常:数据库操作可能失败
-
定期备份:重要数据定期备份
-
合理设计:设计合适的表结构
-
使用事务:确保数据一致性
你在项目中使用过SQLite吗?有什么经验或问题?欢迎在评论区分享你的想法!
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)