Python电商-分布式微服务系统开发
咨询提示词:给出Windows下使用Python开发的电商订单分布微服务系统:订单服务、支付服务、库存服务,NACOS服务注册发现,MongoDB数据存储,统一网关服务并托管兼作页面web服务器,HTML/CSS/JS页面设计[相互关联的订单、支付、库存]。本系统采用Saga模式,将长事务拆分为多个本地事务,若下游服务(如支付失败、库存不足)出错,则通过补偿操作回滚上游服务,实现最终一致性。1.
1 IMA-GLM助力
咨询提示词:给出Windows下使用Python开发的电商订单分布微服务系统:订单服务、支付服务、库存服务,NACOS服务注册发现,MongoDB数据存储,统一网关服务并托管兼作页面web服务器,HTML/CSS/JS页面设计[相互关联的订单、支付、库存]。

2 系统整体架构与设计原则
微服务架构的核心在于将单体应用拆分为独立部署的服务单元,通过轻量级通信机制协作,解决传统“烟囱式架构”带来的信息孤岛、维护成本高和响应速度慢的问题。
本系统采用以下设计原则:
1. 单一职责与服务自治:订单、支付、库存三个服务各自聚焦特定业务能力,拥有独立的数据存储,实现技术与数据的解耦。
2. 轻量级通信与统一入口:服务间通过REST API同步通信,网关作为唯一入口路由请求,隐藏内部架构复杂性。
3. 数据一致性(Saga模式):电商场景下,跨服务的分布式事务是难点。本系统采用Saga模式,将长事务拆分为多个本地事务,若下游服务(如支付失败、库存不足)出错,则通过补偿操作回滚上游服务,实现最终一致性。

3 微服务核心模块设计与实现 (Python + MongoDB)
选用 FastAPI 作为微服务开发框架,其基于Starlette支持异步,性能优异(QPS可达8000+),非常适合高并发的电商API服务。数据库选用 MongoDB,其灵活的BSON文档模型能原生支持商品SKU变体、订单主体与物流信息等多层级嵌套数据,避免了关系型数据库多表关联的性能损耗,查询响应时间大幅缩短。
3.1 库存服务
inventorSvc.py
from flask import Flask, jsonify, request; from pymongo import MongoClient
from bson.objectid import ObjectId # 用于处理MongoDB的_id字段
app = Flask(__name__)
# 1. 配置MongoDB连接
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce_db'] # 连接本地MongoDB实例,数据库名为'ecommerce_db'
products_collection = db['products']
# 2. 辅助函数:将MongoDB的ObjectId转换为字符串
def product_serializer(product):
"""将MongoDB文档序列化为JSON友好的字典"""
return { "id": str(product['_id']), "name": product.get('name'),
"price": product.get('price'), "stock": product.get('stock'), "specs": product.get('specs', {}) }
# 3. API接口:获取商品列表
@app.route('/products', methods=['GET'])
def get_products(): # 获取商品列表接口,支持简单的查询参数过滤,如?name=手机
try:
query_name = request.args.get('name') # 获取查询参数
query = {} # 构建查询条件
if query_name:
# 使用正则表达式进行模糊查询,类似于SQL的 LIKE
query['name'] = {"$regex": query_name, "$options": "i"}
# 从数据库查询数据,参考:使用find()方法查询集合中的所有文档
products_cursor = products_collection.find(query)
# 使用列表推导式处理数据,将游标转换为列表并序列化
products_list = [product_serializer(product) for product in products_cursor]
return jsonify({ "code": 200, "message": "查询成功", "data": products_list }), 200
except Exception as e:
return jsonify({"code": 500, "message": f"服务器错误: {str(e)}"}), 500
# 4. API接口:检查并扣减库存 (供订单服务调用)
@app.route('/inventory/check', methods=['POST'])
def check_and_reduce_stock():
data = request.json; product_id = data.get('product_id'); quantity = data.get('quantity')
if not product_id or quantity is None:
return jsonify({"error": "参数缺失"}), 400
try:
product = products_collection.find_one({"_id": ObjectId(product_id)}) # 查询商品
if product and product.get('stock', 0) >= quantity: # 执行库存扣减
new_stock = product['stock'] - quantity
products_collection.update_one( {"_id": ObjectId(product_id)}, {"$set": {"stock": new_stock}} )
return jsonify({"available": True, "price": product.get('price', 0)}), 200
return jsonify({"available": False, "message": "库存不足"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# 5. 初始化数据功能(便于测试)
def init_db_data():
"""如果集合为空,插入示例数据"""
if products_collection.count_documents({}) == 0:
sample_products = [
{"name": "华为手机", "price": 2999.00, "stock": 100, "specs": {"color": "黑色", "ram": "8GB"}},
{"name": "vivo手机", "price": 2599.00, "stock": 50, "specs": {"color": "白色", "ram": "12GB"}},
{"name": "Apple", "price": 1.2, "stock": 100} # 示例数据
]
products_collection.insert_many(sample_products)
print("数据库初始化完成,已插入示例商品。")
if __name__ == '__main__':
init_db_data() # 启动时初始化数据
app.run(port=5002, debug=True) # 运行服务,端口设为5002
3.2 订单服务
orderSvc.py
from flask import Flask, jsonify, request; import requests; from pymongo import MongoClient
app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/') # 连接MongoDB
db = client['ecommerce_db']; orders_collection = db['orders']
@app.route('/orders', methods=['POST'])
def create_order():
data = request.json; user_id = data.get('user_id')
product_id = data.get('product_id'); quantity = data.get('quantity', 1)
# 1. 调用库存服务检查库存,此处简化为直接调用,实际生产中应通过Nacos发现服务地址
inventory_url = "http://localhost:5002/inventory/check"
inv_response = requests.post(inventory_url, json={"product_id": product_id, "quantity": quantity})
if inv_response.status_code != 200 or not inv_response.json().get('available'):
return jsonify({"error": "库存不足或服务不可用"}), 400
# 2. 创建订单 (简化逻辑,未包含支付调用)
order_doc = { "user_id": user_id, "product_id": product_id,
"quantity": quantity, "status": "CREATED",
"total_price": inv_response.json().get('price', 0) * quantity
}
result = orders_collection.insert_one(order_doc)
return jsonify({"message": "订单创建成功", "order_id": str(result.inserted_id)}), 201
if __name__ == '__main__':
app.run(port=5001)
3.3 支付服务
paymentSvc.py
from flask import Flask, jsonify, request; from pymongo import MongoClient; from bson.objectid import ObjectId
app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce_db']
payments_collection = db['payments']
@app.route('/pay', methods=['POST'])
def process_payment():
data = request.json; order_id = data.get('order_id'); amount = data.get('amount')
payment_doc = { # 模拟支付处理
"order_id": order_id, "amount": amount,
"status": "SUCCESS", # 模拟支付成功
"transaction_id": "TRANS123456"
}
payments_collection.insert_one(payment_doc)
# 实际场景中,支付成功后可能通过消息队列异步通知订单服务更新状态
return jsonify({"message": "支付成功", "status": "SUCCESS"}), 200
if __name__ == '__main__':
app.run(port=5003)
4 Nacos服务注册与发现集成
设计思路:通过Nacos的Open API,Python微服务可以实现服务注册与心跳保活,实现与Spring Cloud生态的互联互通。
Python服务注册示例 (registerNacosAll.py):
import requests, time, threading
NACOS_SERVER = "http://localhost:8848"; SERVICE_IP = "192.168.0.102" # 本机IP
def register_service(svrNm, svrPt):
url = f"{NACOS_SERVER}/nacos/v1/ns/instance"
params = { "serviceName": svrNm, "ip": SERVICE_IP, "port": svrPt, "weight": 1.0, "enable": True,
"healthy": True, "metadata": '{"preserved.register.source":"PYTHON"}' }
try:
response = requests.post(url, params=params)
if response.status_code == 200:
print(f"服务 {svrNm} 注册成功")
else:
print(f"注册失败: {response.text}")
except Exception as e:
print(f"连接Nacos失败: {e}")
def start_heartbeat(svrNm, svrPt): # 启动发送心跳
url = f"{NACOS_SERVER}/nacos/v1/ns/instance/beat"
params = { "serviceName": svrNm, "ip": SERVICE_IP, "port": svrPt,
"beat": f'{{"ip":"{SERVICE_IP}","port":{svrPt},"serviceName":"{svrNm}"}}' }
try:
requests.put(url, params=params)
except:
pass
def send_heartbeat(): # 发送心跳维持注册状态
while True:
start_heartbeat("orderSvc", "5001"); start_heartbeat("inventorySvc", "5002")
start_heartbeat("paymentSvc", "5003"); start_heartbeat("gateway", "8080")
time.sleep(5)
if __name__ == '__main__':
register_service("orderSvc", "5001"); register_service("inventorySvc", "5002")
register_service("paymentSvc", "5003"); register_service("gateway", "8080")
threading.Thread(target=send_heartbeat, daemon=True).start() # 启动心跳线程
5 统一网关与前端网页设计
5.1 网关服务实现
gateway.py
from flask import Flask, request, jsonify, send_from_directory
import requests
app = Flask(__name__, static_folder='static')
SERVICE_MAP = { # 简单的路由映射表(生产环境应从Nacos动态获取)
"orderSvc": "http://localhost:5001", "paymentSvc": "http://localhost:5003",
"inventorySvc": http://localhost:5002 }
@app.route('/api/<service_name>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(service_name, path):
target_url = SERVICE_MAP.get(service_name)
if not target_url:
return jsonify({"error": "Service not found"}), 404
url = f"{target_url}/{path}"
resp = requests.request( method=request.method, url=url,
headers={k: v for k, v in request.headers if k != 'Host'},
data=request.get_data(), params=request.args )
return (resp.content, resp.status_code, resp.headers.items())
# 托管前端页面
@app.route('/')
def index():
return send_from_directory('static', 'index.html')
@app.route('/<path:filename>')
def static_files(filename):
return send_from_directory('static', filename)
if __name__ == '__main__':
app.run(port=8080)
6.2 前端页面设计
前端页面通过AJAX调用网关API,实现订单、支付、库存的交互。
HTML结构 (static/index.html):
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>电商微服务系统</title>
<!--link rel="stylesheet" href="style.css"-->
<script src="script.js"></script>
</head>
<body>
<div class="container">
<h1>电商订单系统</h1>
<div class="section">
<h2>商品列表 (库存服务)</h2>
<div id="product-list"></div>
</div>
<div class="section">
<h2>创建订单</h2>
<input type="text" id="user-id" placeholder="用户ID">
<input type="text" id="product-id" placeholder="商品ID">
<button onclick="createOrder()">下单</button>
</div>
<div class="section">
<h2>支付订单</h2>
<input type="text" id="order-id" placeholder="订单ID">
<button onclick="payOrder()">支付</button>
</div>
</div>
</body>
</html>
JavaScript交互 (static/script.js):
const API_GATEWAY = 'http://localhost:8080/api'
async function loadProducts() { // 加载商品列表
const response = await fetch(`${API_GATEWAY}/inventorySvc/products`)
const products = await response.json(); const listDiv = document.getElementById('product-list')
listDiv.innerHTML = products.data.map(p =>
`<div class="product">
<span>${p.name} - 库存: ${p.stock}</span>
<button onclick="document.getElementById('product-id').value='${p.id}'">选择</button>
</div>`
).join('')
}
async function createOrder() { // 创建订单
const userId = document.getElementById('user-id').value
const productId = document.getElementById('product-id').value
const response = await fetch(`${API_GATEWAY}/orderSvc/orders`, {
method: 'POST', headers: {'Content-Type': 'application/json'},
body: JSON.stringify({user_id: userId, product_id: productId, quantity: 1})
})
const result = await response.json(); alert(result.message || result.error)
async function payOrder() { // 支付订单
const orderId = document.getElementById('order-id').value
// 简化:假设支付金额固定
const response = await fetch(`${API_GATEWAY}/paymentSvc/pay`, {
method: 'POST', headers: {'Content-Type': 'application/json'},
body: JSON.stringify({order_id: orderId, amount: 100}) })
const result = await response.json()
alert(result.message)
}
window.onload = loadProducts // 页面加载时初始化
7 部署运行测试
7.1 前提准备
安装Python
安装MongoDB及其Navicat管理器
安装JDK1.8以上版本
安装并运行Nacos

7.2 核心依赖库清单
针对该电商微服务系统(订单、支付、库存、网关),需要在 requirements.txt 中声明以下核心依赖库。这些库覆盖了Web服务构建、数据库连接、服务注册发现及HTTP请求处理。
# Web框架与网关基础
Flask==3.0.0 # 轻量级Web框架,用于构建各个微服务及网关
Werkzeug==3.0.1 # WSGI工具库,Flask的依赖
# 数据库连接
pymongo==4.6.0 # MongoDB的Python驱动,用于连接MongoDB数据库
# 服务注册与发现
nacos-sdk-python==0.1.12 # Nacos的Python客户端,用于服务注册与配置管理
# HTTP请求与服务间调用
requests==2.31.0 # 用于服务间(如订单调用库存)的HTTP请求
# 生产环境运行服务器 (可选,用于部署)
# gunicorn==21.2.0 # Linux/macOS下的WSGI HTTP服务器
# waitress==2.1.2 # Windows下的WSGI HTTP服务器,纯Python实现
Flask: 作为微服务的核心框架,负责路由分发与业务逻辑承载。
pymongo: 实现Python与MongoDB的交互,支持电商数据的文档存储。
nacos-sdk-python: 实现微服务向Nacos的注册、心跳发送及配置获取。
requests: 微服务架构中,服务间通信(如订单服务调用库存服务)的基石。
7.3 虚拟环境操作指南
1.创建虚拟环境:python -m venv venv
2. 激活虚拟环境:在CMD中,venv\Scripts\activate.bat
3.安装依赖:pip install -r requirements.txt
4.依赖管理与导出

7.4 部署运行与测试



openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)