第 16 篇:W55RP20-EVB-PicoCircuitPython 实战:Modbus 工业协议通信
前言
上一篇实战教程,我们已经完成了 W55RP20 芯片 MQTT 协议与各类云平台接入 功能开发,实现了设备远程联网、数据上云、平台监控与控制。
本篇内容我们进入工业自动化核心应用 ——Modbus 工业协议通信。
Modbus 是工业领域最通用、最稳定的标准通信协议,Modbus TCP 基于以太网实现,广泛应用于 PLC、传感器、仪表、变频器、伺服控制器等工业设备。W55RP20 作为硬件 TCP/IP 协议栈芯片,非常适合搭建高稳定性 Modbus TCP 从站(服务器),可被上位机、触摸屏、PLC 直接读取与控制,实现工业数据采集与远程控制。
本教程基于CircuitPython 实现完整 Modbus TCP Server,支持功能码 03/04/06/16,可直接对接 Modbus Poll、组态王、力控、西门子 / 三菱 PLC 等工业设备。
本文将带你学习:
- Modbus TCP 工业协议原理与报文结构
- W55RP20 搭建 Modbus TCP Server(从站)
- 保持寄存器、输入寄存器模拟与管理
- 功能码 03/04/06/16 完整解析与响应
- 工业级异常码处理与通信稳定性保障
- 硬件协议栈工业通信抗干扰设计
- 嵌入式工业设备标准 Modbus 接入方案
系列教程学习路径
本专栏共 16 篇,循序渐进覆盖 W55RP20-EVB-Pico 模块CircuitPython 开发全流程:
- 第 1 篇:静态 IP 配置与网络基础
- 第 2 篇:DHCP 自动联网与网络诊断
- 第 3 篇:TCP Client 客户端通信
- 第 4 篇:TCP Server 服务端通信
- 第 5 篇:UDP 单播数据通信
- 第 6 篇:UDP 组播/广播数据通信
- 第 7 篇:DNS 域名解析
- 第 8 篇:NTP 从网络获取时间
- 第 9 篇:HTTP Client 客户端请求
- 第 10 篇:HTTP Server 服务端搭建(本文)
- 第 11 篇:HTTP 协议与 OneNET 平台数据上云
- 第 12 篇:MQTT 协议基础通信验证
- 第 13 篇:MQTT 协议与阿里云平台对接
- 第 14 篇:MQTT 协议与 OneNET 平台对接
- 第 15 篇:MQTT 协议与 ThingSpeak 平台对接
- 第 16 篇:Modbus 工业协议通信
目录
2. 烧录 W55RP20-EVB-Pico 模块专属 CircuitPython 固件
1. 准备工作
1.1 软件准备
所需软件均为免费版本,按要求下载安装即可,无需额外付费。
表格
| 软件名称 | 版本要求 | 下载地址 | 说明 |
|---|---|---|---|
| Thonny | 4.0 及以上 | Thonny 官方下载 | 轻量级
CircuitPython IDE,支持代码编辑、烧录与串口调试 |
| W55RP20-EVB-Pico 模块
CircuitPython 固件 |
最新稳定版 | WIZnet 官方固件下载 | 专为 W55RP20-EVB-Pico 模块 编写,已集成 WIZnet 硬件驱动、协议栈与 HTTP 库 |
1.2 硬件准备

W55RP20-EVB-Pico 模块已集成以太网相关器件,无需额外焊接飞线,配合 RP2040 开发板可快速搭建开发环境,大幅降低接线错误和硬件故障概率。
- W55RP20-EVB-Pico × 1
- Micro USB 数据线(必须支持数据传输,不能使用纯充电线)× 1
- 标准网线 × 1
- 开启 DHCP 功能的路由器 / 交换机 × 1(用于获取网络参数,实现 DNS 解析)
2. 烧录 W55RP20-EVB-Pico 模块专属 CircuitPython 固件
W55RP20-EVB-Pico 模块 完全兼容树莓派 Pico 的 UF2 固件烧录方式,操作简单无需额外烧录器,新手可快速上手:
- 按住 RP2040 开发板上的 BOOTSEL 按键不放;
- 使用 Micro USB 数据线连接开发板与电脑;
- 待电脑识别出名为 RPI-RP2 的 U 盘后,松开 BOOTSEL 按键;
- 将下载好的 W5500_RP2040_firmware.uf2 固件文件拖拽到 U 盘中;
- 开发板会自动重启,固件烧录完成。
注意:如果电脑没有识别出 RPI-RP2 U 盘,请尝试更换 USB 数据线、重新插拔开发板,或更换电脑 USB 接口(优先使用 USB 2.0 接口)。
3. 硬件连接与开发环境配置
3.1 硬件连接
W55RP20-EVB-Pico 模块连接分为两步,分别实现供电/调试和以太网连接,操作简单,无需复杂接线:
3.1.1 基础连接(供电+调试)
使用 Micro USB 数据线连接 RP2040 开发板与电脑,用于开发板供电、代码烧录和串口调试。
3.1.2 以太网连接
使用网线连接 W55RP20-EVB-Pico 模块的以太网接口与路由器的 LAN 口(或直接连接电脑网口,需手动配置电脑 IP 与开发板同网段)。
3.1.3 模块与开发板接线
若使用分离式模块与开发板,需按以下引脚对应连接(SPI 通信):
【硬件预留】此处插入硬件连接示意图
3.2 Thonny 开发环境配置
打开 Thonny 软件,按以下步骤配置开发环境,确保代码能正常烧录和运行:
- 点击顶部菜单栏「运行」→「配置解释器」;
- 切换到「解释器」选项卡;
- 在「解释器」下拉列表中选择 CircuitPython (通用);
- 在「端口」下拉列表中选择开发板对应的串口(通常显示为 Board CDC @ COMx);
- 勾选「运行代码前先重启解释器」和「同步设备的实时时钟」;
- 点击「确定」完成配置。
如果端口列表中没有出现开发板,请尝试:
重新插拔 USB 数据线;
更换支持数据传输的 USB 数据线;
关闭其他占用串口的软件(如串口助手、Arduino IDE 等);
重新烧录CircuitPython 固件;
安装树莓派 Pico USB 驱动。
4. Modbus TCP 协议核心原理
4.1 Modbus TCP 简介
Modbus TCP 是工业以太网标准协议,基于 TCP 502 端口通信。设备分为:
- Server(从站):本设备(提供寄存器数据)
- Client(主站):PLC / 上位机 / 调试软件(主动读写)
4.2 核心功能码
- 03:读保持寄存器
- 04:读输入寄存器
- 06:写单个保持寄存器
- 16:写多个保持寄存器
4.3 报文结构
- MBAP 报文头(7 字节):事务 ID + 协议 ID + 长度 + 单元号
- PDU 数据体:功能码 + 数据内容
4.4 通信流程
- 设备启动 Modbus TCP Server,监听 502 端口
- 主站(电脑 / PLC)发起 TCP 连接
- 主站发送请求报文
- 从站解析功能码、寄存器地址
- 读取 / 写入寄存器数据
- 从站返回响应报文
- 连接保持,支持连续通信
5. WIZnet 硬件协议栈工业通信优势
相比于传统软件 TCP 方案,W5500 硬件协议栈在工业 Modbus 场景优势极强:
硬件原生处理 TCP 连接,0% 占用 MCU,不影响实时控制;工业级稳定性,7×24 小时不断线、不丢包;硬件自动处理重传、流控、校验,通信更可靠;支持多 Socket,可同时提供 Modbus + 上云服务;响应速度微秒级,满足工业实时性要求;抗干扰强,工厂复杂环境不掉线。
6. 核心代码解析
6.1 完整可运行代码
BOARD = "w55rp20-evb-pico"
USE_DHCP = False
# Static IP settings
NET_IP = "192.168.11.20"
NET_SN = "255.255.255.0"
NET_GW = "192.168.11.1"
NET_DNS = "8.8.8.8"
# Modbus TCP settings
SERVER_PORT = 502
UNIT_ID = 1
from usocket import socket, SOL_SOCKET, SO_REUSEADDR
import ustruct as struct
from wiznet_init import wiznet
# 寄存器映射(用于 Modbus Poll 调试)
HOLDING_REGISTERS = [0] * 32
INPUT_REGISTERS = [1000 + i for i in range(32)]
# 异常响应
def _exception_response(transaction_id, unit_id, function_code, exception_code):
pdu = bytes([function_code | 0x80, exception_code])
mbap = struct.pack(">HHHB", transaction_id, 0, len(pdu) + 1, unit_id)
return mbap + pdu
# 正常响应
def _normal_response(transaction_id, unit_id, pdu):
mbap = struct.pack(">HHHB", transaction_id, 0, len(pdu) + 1, unit_id)
return mbap + pdu
# 读寄存器
def _read_registers(registers, start_addr, quantity):
if quantity < 1 or quantity > 125:
return None, 3
if start_addr < 0 or start_addr + quantity > len(registers):
return None, 2
payload = bytearray()
payload.append(quantity * 2)
for value in registers[start_addr:start_addr + quantity]:
payload.extend(struct.pack(">H", value & 0xFFFF))
return payload, None
# 写单个寄存器
def _write_single_register(registers, reg_addr, reg_value):
if reg_addr < 0 or reg_addr >= len(registers):
return 2
registers[reg_addr] = reg_value & 0xFFFF
return None
# 写多个寄存器
def _write_multiple_registers(registers, start_addr, quantity, values):
if quantity < 1 or quantity > 123:
return 3
if start_addr < 0 or start_addr + quantity > len(registers):
return 2
if len(values) != quantity:
return 3
for i, value in enumerate(values):
registers[start_addr + i] = value & 0xFFFF
return None
# 处理 Modbus 请求
def handle_modbus_request(request):
if len(request) < 8:
return None
transaction_id, protocol_id, length, unit_id = struct.unpack(">HHHB", request[:7])
if protocol_id != 0:
return None
if unit_id != UNIT_ID:
return None
if len(request) < 7 + length - 1:
return None
pdu = request[7:7 + length - 1]
if len(pdu) < 1:
return None
function_code = pdu[0]
try:
# FC03 读保持寄存器
if function_code == 3:
if len(pdu) != 5:
return _exception_response(transaction_id, unit_id, function_code, 3)
start_addr, quantity = struct.unpack(">HH", pdu[1:5])
data, exc = _read_registers(HOLDING_REGISTERS, start_addr, quantity)
if exc is not None:
return _exception_response(transaction_id, unit_id, function_code, exc)
print("FC03 read holding registers: start={}, qty={}, values={}".format(
start_addr, quantity, HOLDING_REGISTERS[start_addr:start_addr + quantity]
))
return _normal_response(transaction_id, unit_id, bytes([function_code]) + data)
# FC04 读输入寄存器
if function_code == 4:
if len(pdu) != 5:
return _exception_response(transaction_id, unit_id, function_code, 3)
start_addr, quantity = struct.unpack(">HH", pdu[1:5])
data, exc = _read_registers(INPUT_REGISTERS, start_addr, quantity)
if exc is not None:
return _exception_response(transaction_id, unit_id, function_code, exc)
print("FC04 read input registers: start={}, qty={}, values={}".format(
start_addr, quantity, INPUT_REGISTERS[start_addr:start_addr + quantity]
))
return _normal_response(transaction_id, unit_id, bytes([function_code]) + data)
# FC06 写单个寄存器
if function_code == 6:
if len(pdu) != 5:
return _exception_response(transaction_id, unit_id, function_code, 3)
reg_addr, reg_value = struct.unpack(">HH", pdu[1:5])
exc = _write_single_register(HOLDING_REGISTERS, reg_addr, reg_value)
if exc is not None:
return _exception_response(transaction_id, unit_id, function_code, exc)
print("FC06 write single register: addr={}, value={}".format(
reg_addr, HOLDING_REGISTERS[reg_addr]
))
return _normal_response(transaction_id, unit_id, pdu)
# FC16 写多个寄存器
if function_code == 16:
if len(pdu) < 6:
return _exception_response(transaction_id, unit_id, function_code, 3)
start_addr, quantity, byte_count = struct.unpack(">HHB", pdu[1:6])
data = pdu[6:]
if byte_count != len(data) or byte_count != quantity * 2:
return _exception_response(transaction_id, unit_id, function_code, 3)
values = []
for i in range(quantity):
values.append(struct.unpack(">H", data[i * 2:(i * 2) + 2])[0])
exc = _write_multiple_registers(HOLDING_REGISTERS, start_addr, quantity, values)
if exc is not None:
return _exception_response(transaction_id, unit_id, function_code, exc)
print("FC16 write multiple registers: start={}, qty={}, values={}".format(
start_addr, quantity, HOLDING_REGISTERS[start_addr:start_addr + quantity]
))
response_pdu = struct.pack(">BHH", function_code, start_addr, quantity)
return _normal_response(transaction_id, unit_id, response_pdu)
return _exception_response(transaction_id, unit_id, function_code, 1)
except Exception as e:
print("Request handling error:", e)
return _exception_response(transaction_id, unit_id, function_code, 4)
# 处理客户端连接
def serve_client(conn, addr):
print("Client connected:", addr)
try:
while True:
request = conn.recv(260)
if not request:
print("Client disconnected")
break
response = handle_modbus_request(request)
if response:
conn.send(response)
except Exception as e:
print("Client error:", e)
finally:
try:
conn.close()
except Exception:
pass
def main():
if USE_DHCP:
nic = wiznet(BOARD, dhcp=True)
else:
nic = wiznet(BOARD, dhcp=False, ip=NET_IP, sn=NET_SN, gw=NET_GW, dns=NET_DNS)
print("Modbus TCP server IP:", nic.ifconfig()[0])
print("Unit ID:", UNIT_ID)
print("Holding registers[0:8]:", HOLDING_REGISTERS[:8])
print("Input registers[0:8]:", INPUT_REGISTERS[:8])
s = socket()
try:
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
except Exception:
pass
s.bind((nic.ifconfig()[0], SERVER_PORT))
s.listen(1)
print("Modbus TCP server listening on {}:{}".format(nic.ifconfig()[0], SERVER_PORT))
while True:
conn, addr = s.accept()
serve_client(conn, addr)
if __name__ == "__main__":
main()
6.2 代码功能说明
- 支持静态 IP / DHCP 双模式切换,适配工业现场部署;
- 标准 Modbus TCP 502 端口,兼容所有工业主站设备;
- 实现 32 点保持寄存器 + 32 点输入寄存器,可直接对接组态软件;
- 完整支持 FC03/04/06/16 四大核心功能码;
- 自带工业标准异常响应(地址错误、数据错误、功能码错误);
- 硬件协议栈处理 TCP 连接,通信稳定无延迟;实时打印通信日志,方便调试与故障定位;支持长连接、多包连续通信,满足工业自动化要求。
7. 运行结果与测试验证
将代码烧录进设备,串口输出如下:
MPY: soft reboot
Waiting for the network to connect ...
Waiting for the network to connect ...
MAC Address: 02:90:86:88:4d:56
IP Address: ('192.168.11.20', '255.255.255.0', '192.168.11.1', '8.8.8.8')
Modbus TCP server IP: 192.168.11.20
Unit ID: 1
Holding registers[0:8]: [0, 0, 0, 0, 0, 0, 0, 0]
Input registers [0:8]: [1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007]
Modbus TCP server listening on 192.168.11.20:502
使用 Modbus Poll 连接:
- IP:192.168.11.20
- Port:502
- ID:1
- 功能码:03/04/06/16
可实现:
屏幕录制 2026-04-30 110449
- 成功读取保持寄存器、输入寄存器
- 成功写入单个 / 多个寄存器
- 串口实时打印读写日志
- 无报错、无丢包、通信稳定
8. 常见问题一站式排查
8.1 烧录相关问题
|
问题现象 |
排查步骤 |
|---|---|
|
电脑无法识别RPI-RP2U 盘 |
1. 确认按住 BOOTSEL 按键再插 USB; 2. 更换支持数据传输的 USB 数据线; 3. 更换电脑 USB 接口(优先使用 USB 2.0 接口); 4. 尝试使用另一台电脑。 |
|
固件拖拽后开发板无反应 |
1. 确认下载的是 W55RP20 专属固件,不是通用树莓派 Pico 固件; 2. 重新烧录固件; 3. 检查 USB 供电是否稳定。 |
8.2 端口识别问题
|
问题现象 |
排查步骤 |
|---|---|
|
Thonny 中找不到开发板端口 |
1. 重新插拔 USB 数据线; 2. 关闭其他占用串口的软件; 3. 在设备管理器中查看是否有Board CDC设备; 4. 重新烧录固件; 5. 安装树莓派 Pico USB 驱动。 |
其他常见问题
|
问题现象 |
排查步骤 |
|---|---|
|
Modbus Poll 连接失败 |
1. 检查设备 IP 与电脑在同一网段; 2. 关闭电脑防火墙; 3. 确认设备监听 502 端口; 4. 重启设备与调试软件。 |
|
报异常码 02 |
1. 寄存器地址超出范围; 2. 地址从 0 开始,不要填写过大地址。 |
|
报异常码 03 |
1. 读取长度超出限制(最大 125); 2. 写入数量不符合规范。 |
|
通信中断 / 不稳定 |
1. 使用硬件协议栈不会掉线,多为网线 / 交换机问题; 2. 更换网线,重新插拔测试。 |
补充问题
|
问题现象 |
排查步骤 |
|---|---|
|
Thonny 无法识别开发板 |
1. 更换 USB 线; 2. 安装 Pico 串口驱动; 3. 关闭占用串口软件。 |
9. 典型应用场景
- 工业传感器数据采集(温湿度、压力、流量);
- PLC 从站模块扩展;触摸屏 / 组态软件数据监控;
- 工业设备远程控制与参数配置;工厂自动化产线数据交互;
- Modbus 网关、采集模块、远程 IO 模块开发。
10. 系列总结与资源获取
10.1 系列总结
本系列 16 篇教程已全部完成,覆盖:基础网络 → 时间同步 → HTTP 服务 → 云平台接入 → MQTT 物联网 → Modbus 工业控制W55RP20 硬件协议栈可同时实现物联网 + 工业网双网融合,是智能硬件、工业物联网、边缘采集的最佳选择。
10.1 资源获取
- 本文完整代码:WIZnet 官方 Gitee 仓库
- W55RP20 芯片手册:WIZnet 官方资料网址
如果本文对你有帮助,欢迎点赞、收藏、关注,你的支持是我们持续更新的动力!
如有任何问题,欢迎在评论区留言,我们会第一时间回复。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)