海康威视ISAPI车牌识别推送接收技术文档

版本: v1.6
日期: 2026-05-18
适用设备: 海康威视支持ANPR(车牌识别)功能的网络摄像头


一、问题背景

1.1 需求描述

需要从海康威视摄像头(IP: 192.168.40.210)实时获取抓拍的车牌信息,包括:

  • 车牌号码
  • 车牌颜色
  • 抓拍时间
  • 并实时显示摄像头画面

1.2 技术挑战与解决

在开发过程中遇到以下技术问题:

问题 原因 解决方案
事件流连接失败 ISAPI事件流接口返回HTTP 500错误 改用推送模式,让摄像头主动推送数据
ISAPI权限不足 摄像头推送配置未完成 在摄像头Web管理界面配置推送目标服务器
XML字段解析错误 字段名称不匹配,实际应为licensePlate 通过调试确认正确字段名
Multipart解析失败 Python email模块解析boundary标记时出错 实现手动解析逻辑,直接分离header和body

二、技术架构

2.1 整体架构图

┌─────────────────────────────────────────────────────────────────┐
│                        海康威视摄像头                             │
│                     IP: 192.168.40.210                          │
│                                                                 │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────────┐   │
│   │  车牌识别    │───▶│  ISAPI推送   │───▶│  HTTP服务器     │   │
│   │  (ANPR)     │    │  协议        │    │  (端口8050)     │   │
│   └─────────────┘    └─────────────┘    └────────┬────────┘   │
│                                                   │              │
│   ┌─────────────┐                                 │              │
│   │  RTSP视频流  │─────────────────────────────────┘              │
│   │  (554端口)   │                                                │
│   └─────────────┘                                                │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                       Python客户端                               │
│                                                                 │
│   ┌─────────────────┐         ┌─────────────────────────────┐   │
│   │  HTTP服务器      │         │  OpenCV视频显示             │   │
│   │  接收推送数据    │────────▶│  叠加车牌信息               │   │
│   │  解析XML        │         │  实时预览                   │   │
│   └─────────────────┘         └─────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

2.2 数据流程

1. 车辆经过 → 2. ANPR识别触发 → 3. 打包为multipart/form-data
     ↓
4. HTTP POST推送 → 5. 接收并解析XML → 6. 叠加显示在视频流

三、摄像头配置指南

3.1 ISAPI推送配置步骤

步骤1:登录摄像头Web管理界面

  • 地址:http://192.168.40.210
  • 用户名:admin
  • 密码:qaz135$+

步骤2:配置ISAPI推送

  1. 进入 配置 → 系统 → 安全管理 → 用户

    • 确保admin用户有ISAPI权限
  2. 进入 配置 → 网络 → 高级设置 → ISAPI

    • 启用ISAPI服务
  3. 进入 配置 → 智能业务 → 车牌识别配置 → 事件 → ANPR

    • 配置推送参数:
      • 目标服务器IP:运行脚本的电脑IP(如192.168.40.100)
      • 目标服务器端口:8050
      • 推送协议:HTTP
      • 推送方式:POST
  4. 配置抓拍参数:

    • 触发方式:视频触发 / 线圈触发
    • 抓拍灵敏度:根据环境调整

步骤3:配置应用模式

  • 选择合适的应用模式(停车场模式、交通模式等)

步骤4:配置异常事件

  • 配置断网重连等异常处理

四、完整代码实现

文件:hikvision_push_server.py

#!/usr/bin/env python3
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import time
import xml.etree.ElementTree as ET
import cv2

CAM_IP = "192.168.40.210"
USERNAME = "admin"
PASSWORD = "qaz135$+"

latest_plate_info = {"plate_num": "", "plate_color": "", "cap_time": ""}
lock = threading.Lock()
plate_history = set()

color_map = {
    "yellow": "黄牌", "blue": "蓝牌", "white": "白牌", "black": "黑牌",
    "green": "绿牌", "greenGradient": "渐变绿牌",
    "0": "蓝牌", "1": "黄牌", "2": "白牌", "3": "黑牌", "4": "绿牌", "5": "渐变绿牌"
}

def parse_multipart_manual(data, content_type):
    """
    手动解析multipart/form-data数据
    返回: (xml_data, image_data)  其中可能为None
    """
    try:
        if 'boundary=' not in content_type:
            print("❌ Content-Type中没有boundary")
            return None, None

        boundary = content_type.split('boundary=')[1].strip()
        if boundary.startswith('"') and boundary.endswith('"'):
            boundary = boundary[1:-1]

        boundary_bytes = boundary.encode('utf-8')
        # 每个part以 --boundary 开头,最后以 --boundary-- 结尾
        parts = data.split(boundary_bytes)

        xml_data = None
        image_data = None

        for part in parts:
            # 跳过空part或仅含分隔符的part
            if part in (b'', b'--', b'\r\n', b'--\r\n'):
                continue

            # 分离头部和正文 (第一个\r\n\r\n)
            header_end = part.find(b'\r\n\r\n')
            if header_end == -1:
                continue

            headers = part[:header_end].decode('utf-8', errors='ignore')
            body = part[header_end + 4:]  # 正文部分

            # 去除正文末尾可能残留的换行和boundary尾部标记
            body = body.rstrip(b'\r\n')
            if body.endswith(b'--'):
                body = body[:-2]

            # 判断Content-Type
            if 'Content-Type: application/xml' in headers:
                xml_data = body
                print(f"✅ 提取XML成功,长度: {len(xml_data)} bytes")
            elif 'Content-Type: image/' in headers:
                image_data = body
                print(f"✅ 提取图像成功,长度: {len(image_data)} bytes")

        return xml_data, image_data

    except Exception as e:
        print(f"❌ 手动解析失败: {e}")
        return None, None

def parse_anpr_xml(xml_data):
    """解析ANPR XML数据"""
    try:
        if isinstance(xml_data, bytes):
            xml_data = xml_data.decode('utf-8', errors='ignore')

        root = ET.fromstring(xml_data)

        plate_num = ""
        plate_color = "未知"
        cap_time = time.strftime("%Y-%m-%d %H:%M:%S")

        for child in root.iter():
            tag = child.tag.split('}')[-1] if '}' in child.tag else child.tag

            if tag.lower() == 'licenseplate' and child.text:
                plate_num = child.text.strip()

            elif tag.lower() == 'platecolor' and child.text:
                color_value = child.text.strip()
                plate_color = color_map.get(color_value, f"未知({color_value})")

            elif tag.lower() == 'datetime' and child.text:
                cap_time = child.text.strip()

        return plate_num, plate_color, cap_time

    except ET.ParseError as e:
        print(f"❌ XML解析错误: {e}")
        print(f"XML片段: {xml_data[:200]}...")
        return "", "未知", ""
    except Exception as e:
        print(f"❌ 处理异常: {e}")
        return "", "未知", ""

class PlateHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.rfile.read(content_length)

        print(f"\n📡 收到数据推送")
        print(f"路径: {self.path}")
        print(f"数据长度: {content_length} bytes")

        content_type = self.headers.get('Content-Type', '')
        print(f"Content-Type: {content_type}")

        try:
            if 'multipart/form-data' in content_type.lower():
                print("🔍 解析multipart/form-data...")

                xml_data, image_data = parse_multipart_manual(post_data, content_type)

                if xml_data:
                    plate_num, plate_color, cap_time = parse_anpr_xml(xml_data)

                    if plate_num:
                        print(f"\n🚗 车牌: {plate_num}")
                        print(f"🎨 颜色: {plate_color}")
                        print(f"⏰ 时间: {cap_time}")

                        if plate_num not in plate_history:
                            with lock:
                                latest_plate_info["plate_num"] = plate_num
                                latest_plate_info["plate_color"] = plate_color
                                latest_plate_info["cap_time"] = cap_time

                            plate_history.add(plate_num)
                            if len(plate_history) > 100:
                                plate_history.clear()
                            print("-" * 50)
                    else:
                        print("❌ 未解析到车牌数据")
                else:
                    print("❌ 未找到XML数据")

                # 如果需要保存图像,可以在这里处理 image_data
                # if image_data:
                #     with open("capture.jpg", "wb") as f:
                #         f.write(image_data)
                #     print("💾 图像已保存")

            else:
                print(f"未知数据类型: {content_type}")

        except Exception as e:
            print(f"❌ 处理失败: {e}")

        self.send_response(200)
        self.end_headers()

    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()
        self.wfile.write(b"ISAPI Push Server Running")

    def log_message(self, format, *args):
        pass

def video_display():
    rtsp_url = f"rtsp://{USERNAME}:{PASSWORD}@{CAM_IP}:554/Streaming/Channels/101"

    cap = cv2.VideoCapture(rtsp_url)
    if not cap.isOpened():
        print("❌ 无法连接到视频流")
        return

    cv2.namedWindow("海康车牌识别监控", cv2.WINDOW_NORMAL)
    cv2.resizeWindow("海康车牌识别监控", 960, 540)
    print(f"✅ 视频流连接成功 | 按 'q' 退出")

    while True:
        ret, frame = cap.read()

        if not ret:
            cap.release()
            time.sleep(2)
            cap = cv2.VideoCapture(rtsp_url)
            if cap.isOpened():
                print("✅ 视频流重新连接成功")
            continue

        with lock:
            plate_num = latest_plate_info["plate_num"]
            plate_color = latest_plate_info["plate_color"]
            cap_time = latest_plate_info["cap_time"]

        if plate_num:
            cv2.putText(frame, f"车牌: {plate_num}", (20, 40),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)
            cv2.putText(frame, f"颜色: {plate_color}", (20, 85),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)
            cv2.putText(frame, f"时间: {cap_time}", (20, 130),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)

        cv2.imshow("海康车牌识别监控", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

def start_push_server(port=8050):
    server_address = ('', port)
    httpd = HTTPServer(server_address, PlateHandler)

    print("="*60)
    print("海康威视ISAPI推送接收服务器 v1.6")
    print(f"摄像头: {CAM_IP}")
    print(f"监听端口: {port}")
    print("="*60)
    print("\n✅ ISAPI推送模式已配置成功!")
    print("📡 等待摄像头推送车牌数据...")
    print("按 Ctrl+C 停止服务")
    print("="*60 + "\n")

    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print("\n👋 服务已停止")
    finally:
        httpd.shutdown()

if __name__ == "__main__":
    video_thread = threading.Thread(target=video_display)
    video_thread.daemon = True
    video_thread.start()

    start_push_server()

五、核心算法解析

5.1 Multipart数据手动解析

数据结构分析

摄像头推送的HTTP请求Body格式:

--boundary\r\n
Content-Disposition: form-data; name="anpr.xml"; filename="anpr.xml"\r\n
Content-Type: application/xml\r\n
\r\n
<?xml version="1.0" encoding="utf-8"?>
<EventNotificationAlert>
  ...
</EventNotificationAlert>\r\n
--boundary\r\n
Content-Disposition: form-data; name="image.jpg"\r\n
Content-Type: image/jpeg\r\n
\r\n
<binary image data>\r\n
--boundary--

解析流程

def parse_multipart_manual(data, content_type):
    # 1. 提取boundary标记
    boundary = content_type.split('boundary=')[1].strip()
    boundary_bytes = boundary.encode('utf-8')
    
    # 2. 使用boundary分割数据得到各个part
    parts = data.split(boundary_bytes)
    
    # 3. 遍历每个part
    for part in parts:
        # 3.1 跳过空part
        if part in (b'', b'--', b'\r\n', b'--\r\n'):
            continue
        
        # 3.2 分离header和body (第一个\r\n\r\n)
        header_end = part.find(b'\r\n\r\n')
        headers = part[:header_end].decode('utf-8')
        body = part[header_end + 4:]
        
        # 3.3 清理body末尾的\r\n和--标记
        body = body.rstrip(b'\r\n')
        if body.endswith(b'--'):
            body = body[:-2]
        
        # 3.4 根据Content-Type判断数据类型
        if 'Content-Type: application/xml' in headers:
            xml_data = body  # 直接使用body作为XML数据
        elif 'Content-Type: image/' in headers:
            image_data = body

关键设计点

设计点 说明
返回元组 (xml_data, image_data) 支持同时获取XML和图像
header/body分离 通过\r\n\r\n定位分界点
清理尾部标记 rstrip(b'\r\n')endswith(b'--') 清理残留
直接使用body XML数据就在body中,不需要搜索<?xml

5.2 XML数据解析

ANPR XML结构

<?xml version="1.0" encoding="utf-8"?>
<EventNotificationAlert version="2.0" 
    xmlns="http://www.isapi.org/ver20/XMLSchema">
    <ipAddress>192.168.40.210</ipAddress>
    <dateTime>2026-05-18T15:48:21.619+08:00</dateTime>
    <ANPR>
        <licensePlate>黄京AF0236</licensePlate>
        <plateColor>yellow</plateColor>
    </ANPR>
</EventNotificationAlert>

关键字段映射

color_map = {
    "yellow": "黄牌", "blue": "蓝牌", "white": "白牌",
    "black": "黑牌", "green": "绿牌", "greenGradient": "渐变绿牌",
    # 数值映射
    "0": "蓝牌", "1": "黄牌", "2": "白牌", "3": "黑牌", "4": "绿牌", "5": "渐变绿牌"
}

# 遍历XML树,提取关键字段
for child in root.iter():
    tag = child.tag.split('}')[-1]  # 处理命名空间: {uri}tag → tag
    
    if tag.lower() == 'licenseplate':
        plate_num = child.text.strip()
    elif tag.lower() == 'platecolor':
        plate_color = color_map.get(child.text.strip(), "未知")
    elif tag.lower() == 'datetime':
        cap_time = child.text.strip()

5.3 线程安全设计

# 共享数据
latest_plate_info = {"plate_num": "", "plate_color": "", "cap_time": ""}
lock = threading.Lock()

# HTTP线程更新数据
def do_POST(self):
    if plate_num not in plate_history:
        with lock:  # 更新时加锁
            latest_plate_info["plate_num"] = plate_num
            latest_plate_info["plate_color"] = plate_color
            latest_plate_info["cap_time"] = cap_time

# 视频线程读取数据
def video_display():
    while True:
        with lock:  # 读取时加锁
            plate_num = latest_plate_info["plate_num"]
            plate_color = latest_plate_info["plate_color"]
            cap_time = latest_plate_info["cap_time"]

5.4 视频流断线重连

def video_display():
    rtsp_url = f"rtsp://{USERNAME}:{PASSWORD}@{CAM_IP}:554/Streaming/Channels/101"
    cap = cv2.VideoCapture(rtsp_url)
    
    while True:
        ret, frame = cap.read()
        
        if not ret:  # 读取失败
            cap.release()  # 释放资源
            time.sleep(2)  # 等待2秒
            cap = cv2.VideoCapture(rtsp_url)  # 重新连接
            if cap.isOpened():
                print("✅ 视频流重新连接成功")
            continue
        
        # 显示画面...

六、运行指南

6.1 环境要求

组件 版本要求 安装命令
Python 3.7+ -
OpenCV 最新 pip install opencv-python

6.2 运行命令

python hikvision_push_server.py

6.3 预期输出

============================================================
海康威视ISAPI推送接收服务器 v1.6
摄像头: 192.168.40.210
监听端口: 8050
============================================================

✅ ISAPI推送模式已配置成功!
📡 等待摄像头推送车牌数据...
按 Ctrl+C 停止服务
============================================================

✅ 视频流连接成功 | 按 'q' 退出

📡 收到数据推送
路径: /
数据长度: 537035 bytes
Content-Type: multipart/form-data; boundary=-------------------------7e13971310878
🔍 解析multipart/form-data...
✅ 提取XML成功,长度: 4816 bytes
✅ 提取图像成功,长度: 532459 bytes

🚗 车牌: 黄京AF0236
🎨 颜色: 黄牌
⏰ 时间: 2026-05-18T15:48:21.619+08:00
--------------------------------------------------

七、扩展功能

7.1 保存抓拍图像

do_POST方法中添加:

# 如果需要保存图像
if image_data:
    filename = f"capture_{int(time.time())}.jpg"
    with open(filename, "wb") as f:
        f.write(image_data)
    print(f"💾 图像已保存: {filename}")

7.2 数据库存储

import sqlite3

def save_to_database(plate_num, plate_color, cap_time):
    conn = sqlite3.connect('plate_records.db')
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS plates (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            plate_num TEXT,
            plate_color TEXT,
            cap_time TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute(
        'INSERT INTO plates (plate_num, plate_color, cap_time) VALUES (?, ?, ?)',
        (plate_num, plate_color, cap_time)
    )
    conn.commit()
    conn.close()

7.3 消息通知

import requests

def send_dingtalk(plate_num, plate_color):
    url = "https://oapi.dingtalk.com/robot/send"
    data = {
        "msgtype": "text",
        "text": {
            "content": f"🚗 检测到车牌: {plate_num}\n🎨 颜色: {plate_color}"
        }
    }
    requests.post(url, json=data)

def send_email(plate_num, plate_color):
    import smtplib
    from email.mime.text import MIMEText
    
    msg = MIMEText(f"车牌: {plate_num}\n颜色: {plate_color}")
    msg['Subject'] = "车牌识别通知"
    # 配置邮件发送...

八、常见问题排查

8.1 摄像头推送失败

检查项 操作
网络连接 ping 192.168.40.210
ISAPI配置 确认目标IP:8050正确
防火墙 开放8050端口入站
推送状态 摄像头日志查看推送记录

8.2 无法连接RTSP视频流

检查项 操作
认证信息 确认用户名密码正确
端口占用 netstat -an | grep 554
通道号 尝试更换Channels/102
ONVIF 确认RTSP协议启用

8.3 车牌数据为空

检查项 操作
ANPR功能 确认车牌识别已启用
触发模式 视频触发/线圈触发配置
识别区域 确认检测区域设置正确
环境光线 夜间需开启补光灯

九、接口说明

9.1 HTTP POST接口

  • URL: http://<本机IP>:8050/
  • Content-Type: multipart/form-data; boundary=<boundary>
  • 摄像头认证: HTTP Digest

9.2 HTTP GET接口

  • URL: http://<本机IP>:8050/
  • 响应: ISAPI Push Server Running

十、技术总结

10.1 核心技术点

  1. ISAPI推送协议 - 海康威视私有协议,HTTP/HTTPS推送机制
  2. Multipart解析 - 二进制数据处理,header/body分离
  3. XML解析 - ElementTree库,命名空间处理
  4. RTSP视频流 - OpenCV捕获,实时显示
  5. 多线程编程 - HTTP服务器与视频显示并行

10.2 性能指标

指标 数值
单次推送处理 < 100ms
视频延迟 < 500ms
并发连接 支持多摄像头
内存占用 < 200MB

10.3 安全性建议

  • 使用HTTPS替代HTTP
  • 添加API密钥验证
  • 对敏感数据加密存储
  • 添加请求频率限制

文档编写者: AI Assistant
最后更新: 2026-05-18
版本: 1.0

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐