1 IMA-GLM助力

咨询提示词:给出Windows下使用Python开发的电商订单分布微服务系统:订单服务、支付服务、库存服务,NACOS服务注册发现,MongoDB数据存储,统一网关服务并托管兼作页面web服务器,HTML/CSS/JS页面设计[相互关联的订单、支付、库存]。

2 系统整体架构与设计原则

微服务架构的核心在于将单体应用拆分为独立部署的服务单元,通过轻量级通信机制协作,解决传统“烟囱式架构”带来的信息孤岛、维护成本高和响应速度慢的问题。

本系统采用以下设计原则:

1. 单一职责与服务自治:订单、支付、库存三个服务各自聚焦特定业务能力,拥有独立的数据存储,实现技术与数据的解耦。

2. 轻量级通信与统一入口:服务间通过REST API同步通信,网关作为唯一入口路由请求,隐藏内部架构复杂性。

3. 数据一致性(Saga模式):电商场景下,跨服务的分布式事务是难点。本系统采用Saga模式,将长事务拆分为多个本地事务,若下游服务(如支付失败、库存不足)出错,则通过补偿操作回滚上游服务,实现最终一致性。

3 微服务核心模块设计与实现 (Python + MongoDB)

选用 FastAPI 作为微服务开发框架,其基于Starlette支持异步,性能优异(QPS可达8000+),非常适合高并发的电商API服务。数据库选用 MongoDB,其灵活的BSON文档模型能原生支持商品SKU变体、订单主体与物流信息等多层级嵌套数据,避免了关系型数据库多表关联的性能损耗,查询响应时间大幅缩短。

3.1 库存服务

inventorSvc.py

from flask import Flask, jsonify, request; from pymongo import MongoClient

from bson.objectid import ObjectId     # 用于处理MongoDB_id字段

app = Flask(__name__)

# 1. 配置MongoDB连接

client = MongoClient('mongodb://localhost:27017/')

db = client['ecommerce_db']          # 连接本地MongoDB实例,数据库名为'ecommerce_db'

products_collection = db['products']

# 2. 辅助函数:将MongoDBObjectId转换为字符串

def product_serializer(product):

    """MongoDB文档序列化为JSON友好的字典"""

    return { "id": str(product['_id']), "name": product.get('name'),

        "price": product.get('price'), "stock": product.get('stock'), "specs": product.get('specs', {}) }

# 3. API接口:获取商品列表

@app.route('/products', methods=['GET'])

def get_products():   # 获取商品列表接口,支持简单的查询参数过滤,?name=手机

    try:      

        query_name = request.args.get('name')       # 获取查询参数

        query = {}                             # 构建查询条件

        if query_name:

            # 使用正则表达式进行模糊查询,类似于SQL LIKE

            query['name'] = {"$regex": query_name, "$options": "i"}      

        # 从数据库查询数据,参考:使用find()方法查询集合中的所有文档

        products_cursor = products_collection.find(query)      

        # 使用列表推导式处理数据,将游标转换为列表并序列化

        products_list = [product_serializer(product) for product in products_cursor]      

        return jsonify({ "code": 200, "message": "查询成功", "data": products_list }), 200

    except Exception as e:

        return jsonify({"code": 500, "message": f"服务器错误: {str(e)}"}), 500

# 4. API接口:检查并扣减库存 (供订单服务调用)

@app.route('/inventory/check', methods=['POST'])

def check_and_reduce_stock():

    data = request.json; product_id = data.get('product_id'); quantity = data.get('quantity')  

    if not product_id or quantity is None:

        return jsonify({"error": "参数缺失"}), 400

    try:      

        product = products_collection.find_one({"_id": ObjectId(product_id)}) # 查询商品      

        if product and product.get('stock', 0) >= quantity:                   # 执行库存扣减

            new_stock = product['stock'] - quantity

            products_collection.update_one( {"_id": ObjectId(product_id)}, {"$set": {"stock": new_stock}} )

            return jsonify({"available": True, "price": product.get('price', 0)}), 200      

        return jsonify({"available": False, "message": "库存不足"}), 200

    except Exception as e:

        return jsonify({"error": str(e)}), 500

# 5. 初始化数据功能(便于测试)

def init_db_data():

    """如果集合为空,插入示例数据"""

    if products_collection.count_documents({}) == 0:

        sample_products = [

            {"name": "华为手机", "price": 2999.00, "stock": 100, "specs": {"color": "黑色", "ram": "8GB"}},

            {"name": "vivo手机", "price": 2599.00, "stock": 50, "specs": {"color": "白色", "ram": "12GB"}},

            {"name": "Apple", "price": 1.2, "stock": 100} # 示例数据

        ]

        products_collection.insert_many(sample_products)

        print("数据库初始化完成,已插入示例商品。")

if __name__ == '__main__':  

    init_db_data()                     # 启动时初始化数据  

    app.run(port=5002, debug=True)     # 运行服务,端口设为5002

3.2 订单服务

orderSvc.py

from flask import Flask, jsonify, request; import requests; from pymongo import MongoClient

app = Flask(__name__)

client = MongoClient('mongodb://localhost:27017/')  # 连接MongoDB

db = client['ecommerce_db']; orders_collection = db['orders']

@app.route('/orders', methods=['POST'])

def create_order():

    data = request.json; user_id = data.get('user_id')

    product_id = data.get('product_id'); quantity = data.get('quantity', 1)

    # 1. 调用库存服务检查库存,此处简化为直接调用,实际生产中应通过Nacos发现服务地址

    inventory_url = "http://localhost:5002/inventory/check"

    inv_response = requests.post(inventory_url, json={"product_id": product_id, "quantity": quantity})   

    if inv_response.status_code != 200 or not inv_response.json().get('available'):

        return jsonify({"error": "库存不足或服务不可用"}), 400

    # 2. 创建订单 (简化逻辑,未包含支付调用)

    order_doc = { "user_id": user_id, "product_id": product_id,

        "quantity": quantity, "status": "CREATED",

        "total_price": inv_response.json().get('price', 0) * quantity

    }

    result = orders_collection.insert_one(order_doc)   

    return jsonify({"message": "订单创建成功", "order_id": str(result.inserted_id)}), 201

if __name__ == '__main__':

    app.run(port=5001)

3.3 支付服务

paymentSvc.py

from flask import Flask, jsonify, request; from pymongo import MongoClient; from bson.objectid import ObjectId

app = Flask(__name__)

client = MongoClient('mongodb://localhost:27017/')

db = client['ecommerce_db']

payments_collection = db['payments']

@app.route('/pay', methods=['POST'])

def process_payment():

    data = request.json; order_id = data.get('order_id'); amount = data.get('amount')  

    payment_doc = {             # 模拟支付处理

        "order_id": order_id, "amount": amount,

        "status": "SUCCESS",           # 模拟支付成功

        "transaction_id": "TRANS123456"

    }

    payments_collection.insert_one(payment_doc)   

    # 实际场景中,支付成功后可能通过消息队列异步通知订单服务更新状态

    return jsonify({"message": "支付成功", "status": "SUCCESS"}), 200

if __name__ == '__main__':

    app.run(port=5003)

4 Nacos服务注册与发现集成

设计思路:通过Nacos的Open API,Python微服务可以实现服务注册与心跳保活,实现与Spring Cloud生态的互联互通。

Python服务注册示例 (registerNacosAll.py):

import requests, time, threading

NACOS_SERVER = "http://localhost:8848"; SERVICE_IP = "192.168.0.102"  # 本机IP

def register_service(svrNm, svrPt):

    url = f"{NACOS_SERVER}/nacos/v1/ns/instance"

    params = { "serviceName": svrNm, "ip": SERVICE_IP, "port": svrPt, "weight": 1.0, "enable": True,

        "healthy": True, "metadata": '{"preserved.register.source":"PYTHON"}' }

    try:

        response = requests.post(url, params=params)

        if response.status_code == 200:

            print(f"服务 {svrNm} 注册成功")

        else:

            print(f"注册失败: {response.text}")

    except Exception as e:

        print(f"连接Nacos失败: {e}")

def start_heartbeat(svrNm, svrPt):   # 启动发送心跳

    url = f"{NACOS_SERVER}/nacos/v1/ns/instance/beat"

    params = { "serviceName": svrNm, "ip": SERVICE_IP, "port": svrPt,

        "beat": f'{{"ip":"{SERVICE_IP}","port":{svrPt},"serviceName":"{svrNm}"}}' }

    try:

        requests.put(url, params=params)

    except:

        pass

def send_heartbeat():            # 发送心跳维持注册状态

    while True:

        start_heartbeat("orderSvc", "5001"); start_heartbeat("inventorySvc", "5002")

        start_heartbeat("paymentSvc", "5003"); start_heartbeat("gateway", "8080")

        time.sleep(5)

if __name__ == '__main__':

    register_service("orderSvc", "5001");  register_service("inventorySvc", "5002")

    register_service("paymentSvc", "5003");  register_service("gateway", "8080")   

    threading.Thread(target=send_heartbeat, daemon=True).start() # 启动心跳线程

5 统一网关与前端网页设计

5.1 网关服务实现

gateway.py

from flask import Flask, request, jsonify, send_from_directory

import requests

app = Flask(__name__, static_folder='static')

SERVICE_MAP = {     # 简单的路由映射表(生产环境应从Nacos动态获取)

    "orderSvc": "http://localhost:5001", "paymentSvc": "http://localhost:5003",

    "inventorySvc": http://localhost:5002 }

@app.route('/api/<service_name>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])

def proxy(service_name, path):

    target_url = SERVICE_MAP.get(service_name)

    if not target_url:

        return jsonify({"error": "Service not found"}), 404   

    url = f"{target_url}/{path}"

    resp = requests.request( method=request.method, url=url,

        headers={k: v for k, v in request.headers if k != 'Host'},

        data=request.get_data(), params=request.args )

    return (resp.content, resp.status_code, resp.headers.items())

# 托管前端页面

@app.route('/')

def index():

    return send_from_directory('static', 'index.html')

@app.route('/<path:filename>')

def static_files(filename):

    return send_from_directory('static', filename)

if __name__ == '__main__':

    app.run(port=8080)

6.2 前端页面设计

前端页面通过AJAX调用网关API,实现订单、支付、库存的交互。

HTML结构 (static/index.html):

<!DOCTYPE html>

<html lang="zh-CN">

<head>

    <meta charset="UTF-8">

    <title>电商微服务系统</title>

    <!--link rel="stylesheet" href="style.css"-->

    <script src="script.js"></script>

</head>

<body>

    <div class="container">

        <h1>电商订单系统</h1>

        <div class="section">

            <h2>商品列表 (库存服务)</h2>

            <div id="product-list"></div>

        </div>       

        <div class="section">

            <h2>创建订单</h2>

            <input type="text" id="user-id" placeholder="用户ID">

            <input type="text" id="product-id" placeholder="商品ID">

            <button onclick="createOrder()">下单</button>

        </div>       

        <div class="section">

            <h2>支付订单</h2>

            <input type="text" id="order-id" placeholder="订单ID">

            <button onclick="payOrder()">支付</button>

        </div>

    </div>

</body>

</html>

JavaScript交互 (static/script.js):

const API_GATEWAY = 'http://localhost:8080/api'

async function loadProducts() {    // 加载商品列表

    const response = await fetch(`${API_GATEWAY}/inventorySvc/products`)

    const products = await response.json(); const listDiv = document.getElementById('product-list')

    listDiv.innerHTML = products.data.map(p =>

        `<div class="product">

            <span>${p.name} - 库存: ${p.stock}</span>

            <button onclick="document.getElementById('product-id').value='${p.id}'">选择</button>

        </div>`

    ).join('')

}

async function createOrder() {     // 创建订单

    const userId = document.getElementById('user-id').value

    const productId = document.getElementById('product-id').value   

    const response = await fetch(`${API_GATEWAY}/orderSvc/orders`, {

        method: 'POST', headers: {'Content-Type': 'application/json'},

        body: JSON.stringify({user_id: userId, product_id: productId, quantity: 1})

    })

    const result = await response.json(); alert(result.message || result.error)

async function payOrder() {        // 支付订单

    const orderId = document.getElementById('order-id').value

    // 简化:假设支付金额固定

    const response = await fetch(`${API_GATEWAY}/paymentSvc/pay`, {

        method: 'POST', headers: {'Content-Type': 'application/json'},

        body: JSON.stringify({order_id: orderId, amount: 100}) })

    const result = await response.json()

    alert(result.message)

}

window.onload = loadProducts       // 页面加载时初始化

7 部署运行测试

7.1 前提准备

安装Python

安装MongoDB及其Navicat管理器

安装JDK1.8以上版本

安装并运行Nacos

7.2 核心依赖库清单

针对该电商微服务系统(订单、支付、库存、网关),需要在 requirements.txt 中声明以下核心依赖库。这些库覆盖了Web服务构建、数据库连接、服务注册发现及HTTP请求处理。

# Web框架与网关基础

Flask==3.0.0          # 轻量级Web框架,用于构建各个微服务及网关

Werkzeug==3.0.1       # WSGI工具库,Flask的依赖

# 数据库连接

pymongo==4.6.0        # MongoDBPython驱动,用于连接MongoDB数据库

# 服务注册与发现

nacos-sdk-python==0.1.12  # NacosPython客户端,用于服务注册与配置管理

# HTTP请求与服务间调用

requests==2.31.0      # 用于服务间(如订单调用库存)HTTP请求

# 生产环境运行服务器 (可选,用于部署)

# gunicorn==21.2.0      # Linux/macOS下的WSGI HTTP服务器

# waitress==2.1.2       # Windows下的WSGI HTTP服务器,纯Python实现

Flask: 作为微服务的核心框架,负责路由分发与业务逻辑承载。

pymongo: 实现Python与MongoDB的交互,支持电商数据的文档存储。

nacos-sdk-python: 实现微服务向Nacos的注册、心跳发送及配置获取。

requests: 微服务架构中,服务间通信(如订单服务调用库存服务)的基石。

7.3 虚拟环境操作指南

1.创建虚拟环境:python -m venv venv

2. 激活虚拟环境:在CMD中,venv\Scripts\activate.bat

3.安装依赖:pip install -r requirements.txt

4.依赖管理与导出

7.4 部署运行与测试

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐