前言

上一篇实战教程,我们已经完成了 W55RP20 芯片 MQTT 协议与各类云平台接入 功能开发,实现了设备远程联网、数据上云、平台监控与控制。

本篇内容我们进入工业自动化核心应用 ——Modbus 工业协议通信

Modbus 是工业领域最通用、最稳定的标准通信协议,Modbus TCP 基于以太网实现,广泛应用于 PLC、传感器、仪表、变频器、伺服控制器等工业设备。W55RP20 作为硬件 TCP/IP 协议栈芯片,非常适合搭建高稳定性 Modbus TCP 从站(服务器),可被上位机、触摸屏、PLC 直接读取与控制,实现工业数据采集与远程控制。

本教程基于CircuitPython 实现完整 Modbus TCP Server,支持功能码 03/04/06/16,可直接对接 Modbus Poll、组态王、力控、西门子 / 三菱 PLC 等工业设备。

本文将带你学习:

  • Modbus TCP 工业协议原理与报文结构
  • W55RP20 搭建 Modbus TCP Server(从站)
  • 保持寄存器、输入寄存器模拟与管理
  • 功能码 03/04/06/16 完整解析与响应
  • 工业级异常码处理与通信稳定性保障
  • 硬件协议栈工业通信抗干扰设计
  • 嵌入式工业设备标准 Modbus 接入方案

系列教程学习路径

本专栏共 16 篇,循序渐进覆盖 W55RP20-EVB-Pico 模块CircuitPython 开发全流程:

  1. 第 1 篇:静态 IP 配置与网络基础
  2. 第 2 篇:DHCP 自动联网与网络诊断
  3. 第 3 篇:TCP Client 客户端通信
  4. 第 4 篇:TCP Server 服务端通信
  5. 第 5 篇:UDP 单播数据通信
  6. 第 6 篇:UDP 组播/广播数据通信
  7. 第 7 篇:DNS 域名解析
  8. 第 8 篇:NTP 从网络获取时间
  9. 第 9 篇:HTTP Client 客户端请求
  10. 第 10 篇:HTTP Server 服务端搭建(本文)
  11. 第 11 篇:HTTP 协议与 OneNET 平台数据上云
  12. 第 12 篇:MQTT 协议基础通信验证
  13. 第 13 篇:MQTT 协议与阿里云平台对接
  14. 第 14 篇:MQTT 协议与 OneNET 平台对接
  15. 第 15 篇:MQTT 协议与 ThingSpeak 平台对接
  16. 第 16 篇:Modbus 工业协议通信

目录

1. 准备工作

1.1 软件准备

1.2 硬件准备

2. 烧录 W55RP20-EVB-Pico 模块专属 CircuitPython 固件

3. 硬件连接与开发环境配置

3.1 硬件连接

3.1.1 基础连接(供电+调试)

3.1.2 以太网连接

3.1.3 模块与开发板接线

3.2 Thonny 开发环境配置

4. Modbus TCP 协议核心原理

4.1 Modbus TCP 简介

4.2 核心功能码

4.3 报文结构

4.4 通信流程

5. WIZnet 硬件协议栈工业通信优势

6. 核心代码解析

6.1 完整可运行代码

6.2 代码功能说明

7. 运行结果与测试验证

8. 常见问题一站式排查 

8.1 烧录相关问题

8.2 端口识别问题

其他常见问题

补充问题

9. 典型应用场景

10. 系列总结与资源获取

10.1 系列总结

10.1 资源获取

1. 准备工作

1.1 软件准备

所需软件均为免费版本,按要求下载安装即可,无需额外付费。

表格

软件名称 版本要求 下载地址 说明
Thonny 4.0 及以上 Thonny 官方下载 轻量级

CircuitPython

IDE,支持代码编辑、烧录与串口调试
W55RP20-EVB-Pico 模块

CircuitPython

固件
最新稳定版 WIZnet 官方固件下载 专为 W55RP20-EVB-Pico 模块 编写,已集成 WIZnet 硬件驱动、协议栈与 HTTP 库

1.2 硬件准备

W55RP20-EVB-Pico 模块已集成以太网相关器件,无需额外焊接飞线,配合 RP2040 开发板可快速搭建开发环境,大幅降低接线错误和硬件故障概率。

  • W55RP20-EVB-Pico × 1
  • Micro USB 数据线(必须支持数据传输,不能使用纯充电线)× 1
  • 标准网线 × 1
  • 开启 DHCP 功能的路由器 / 交换机 × 1(用于获取网络参数,实现 DNS 解析)

2. 烧录 W55RP20-EVB-Pico 模块专属 CircuitPython 固件

W55RP20-EVB-Pico 模块 完全兼容树莓派 Pico 的 UF2 固件烧录方式,操作简单无需额外烧录器,新手可快速上手:

  1. 按住 RP2040 开发板上的 BOOTSEL 按键不放;
  2. 使用 Micro USB 数据线连接开发板与电脑;
  3. 待电脑识别出名为 RPI-RP2 的 U 盘后,松开 BOOTSEL 按键;
  4. 将下载好的 W5500_RP2040_firmware.uf2 固件文件拖拽到 U 盘中;
  5. 开发板会自动重启,固件烧录完成。

注意:如果电脑没有识别出 RPI-RP2 U 盘,请尝试更换 USB 数据线、重新插拔开发板,或更换电脑 USB 接口(优先使用 USB 2.0 接口)。


3. 硬件连接与开发环境配置

3.1 硬件连接

W55RP20-EVB-Pico 模块连接分为两步,分别实现供电/调试和以太网连接,操作简单,无需复杂接线:

3.1.1 基础连接(供电+调试)

使用 Micro USB 数据线连接 RP2040 开发板与电脑,用于开发板供电、代码烧录和串口调试。

3.1.2 以太网连接

使用网线连接 W55RP20-EVB-Pico 模块的以太网接口与路由器的 LAN 口(或直接连接电脑网口,需手动配置电脑 IP 与开发板同网段)。

3.1.3 模块与开发板接线

若使用分离式模块与开发板,需按以下引脚对应连接(SPI 通信):

【硬件预留】此处插入硬件连接示意图

3.2 Thonny 开发环境配置

打开 Thonny 软件,按以下步骤配置开发环境,确保代码能正常烧录和运行:

  1. 点击顶部菜单栏「运行」→「配置解释器」;
  2. 切换到「解释器」选项卡;
  3. 在「解释器」下拉列表中选择 CircuitPython (通用);
  4. 在「端口」下拉列表中选择开发板对应的串口(通常显示为 Board CDC @ COMx);
  5. 勾选「运行代码前先重启解释器」和「同步设备的实时时钟」;
  6. 点击「确定」完成配置。

如果端口列表中没有出现开发板,请尝试:

  • 重新插拔 USB 数据线;

  • 更换支持数据传输的 USB 数据线;

  • 关闭其他占用串口的软件(如串口助手、Arduino IDE 等);

  • 重新烧录CircuitPython 固件;

  • 安装树莓派 Pico USB 驱动。


4. Modbus TCP 协议核心原理

4.1 Modbus TCP 简介

Modbus TCP 是工业以太网标准协议,基于 TCP 502 端口通信。设备分为:

  • Server(从站):本设备(提供寄存器数据)
  • Client(主站):PLC / 上位机 / 调试软件(主动读写)

4.2 核心功能码

  • 03:读保持寄存器
  • 04:读输入寄存器
  • 06:写单个保持寄存器
  • 16:写多个保持寄存器

4.3 报文结构

  • MBAP 报文头(7 字节):事务 ID + 协议 ID + 长度 + 单元号
  • PDU 数据体:功能码 + 数据内容

4.4 通信流程

  1. 设备启动 Modbus TCP Server,监听 502 端口
  2. 主站(电脑 / PLC)发起 TCP 连接
  3. 主站发送请求报文
  4. 从站解析功能码、寄存器地址
  5. 读取 / 写入寄存器数据
  6. 从站返回响应报文
  7. 连接保持,支持连续通信

5. WIZnet 硬件协议栈工业通信优势

相比于传统软件 TCP 方案,W5500 硬件协议栈在工业 Modbus 场景优势极强:

硬件原生处理 TCP 连接,0% 占用 MCU,不影响实时控制;工业级稳定性,7×24 小时不断线、不丢包;硬件自动处理重传、流控、校验,通信更可靠;支持多 Socket,可同时提供 Modbus + 上云服务;响应速度微秒级,满足工业实时性要求;抗干扰强,工厂复杂环境不掉线。


6. 核心代码解析

6.1 完整可运行代码

BOARD    = "w55rp20-evb-pico"
USE_DHCP = False

# Static IP settings
NET_IP   = "192.168.11.20"
NET_SN   = "255.255.255.0"
NET_GW   = "192.168.11.1"
NET_DNS  = "8.8.8.8"

# Modbus TCP settings
SERVER_PORT = 502
UNIT_ID     = 1

from usocket import socket, SOL_SOCKET, SO_REUSEADDR
import ustruct as struct
from wiznet_init import wiznet


# 寄存器映射(用于 Modbus Poll 调试)
HOLDING_REGISTERS = [0] * 32
INPUT_REGISTERS = [1000 + i for i in range(32)]


# 异常响应
def _exception_response(transaction_id, unit_id, function_code, exception_code):
    pdu = bytes([function_code | 0x80, exception_code])
    mbap = struct.pack(">HHHB", transaction_id, 0, len(pdu) + 1, unit_id)
    return mbap + pdu


# 正常响应
def _normal_response(transaction_id, unit_id, pdu):
    mbap = struct.pack(">HHHB", transaction_id, 0, len(pdu) + 1, unit_id)
    return mbap + pdu


# 读寄存器
def _read_registers(registers, start_addr, quantity):
    if quantity < 1 or quantity > 125:
        return None, 3
    if start_addr < 0 or start_addr + quantity > len(registers):
        return None, 2

    payload = bytearray()
    payload.append(quantity * 2)
    for value in registers[start_addr:start_addr + quantity]:
        payload.extend(struct.pack(">H", value & 0xFFFF))
    return payload, None


# 写单个寄存器
def _write_single_register(registers, reg_addr, reg_value):
    if reg_addr < 0 or reg_addr >= len(registers):
        return 2
    registers[reg_addr] = reg_value & 0xFFFF
    return None


# 写多个寄存器
def _write_multiple_registers(registers, start_addr, quantity, values):
    if quantity < 1 or quantity > 123:
        return 3
    if start_addr < 0 or start_addr + quantity > len(registers):
        return 2
    if len(values) != quantity:
        return 3

    for i, value in enumerate(values):
        registers[start_addr + i] = value & 0xFFFF
    return None


# 处理 Modbus 请求
def handle_modbus_request(request):
    if len(request) < 8:
        return None

    transaction_id, protocol_id, length, unit_id = struct.unpack(">HHHB", request[:7])
    if protocol_id != 0:
        return None
    if unit_id != UNIT_ID:
        return None
    if len(request) < 7 + length - 1:
        return None

    pdu = request[7:7 + length - 1]
    if len(pdu) < 1:
        return None

    function_code = pdu[0]

    try:
        # FC03 读保持寄存器
        if function_code == 3:
            if len(pdu) != 5:
                return _exception_response(transaction_id, unit_id, function_code, 3)
            start_addr, quantity = struct.unpack(">HH", pdu[1:5])
            data, exc = _read_registers(HOLDING_REGISTERS, start_addr, quantity)
            if exc is not None:
                return _exception_response(transaction_id, unit_id, function_code, exc)
            print("FC03 read holding registers: start={}, qty={}, values={}".format(
                start_addr, quantity, HOLDING_REGISTERS[start_addr:start_addr + quantity]
            ))
            return _normal_response(transaction_id, unit_id, bytes([function_code]) + data)

        # FC04 读输入寄存器
        if function_code == 4:
            if len(pdu) != 5:
                return _exception_response(transaction_id, unit_id, function_code, 3)
            start_addr, quantity = struct.unpack(">HH", pdu[1:5])
            data, exc = _read_registers(INPUT_REGISTERS, start_addr, quantity)
            if exc is not None:
                return _exception_response(transaction_id, unit_id, function_code, exc)
            print("FC04 read input registers: start={}, qty={}, values={}".format(
                start_addr, quantity, INPUT_REGISTERS[start_addr:start_addr + quantity]
            ))
            return _normal_response(transaction_id, unit_id, bytes([function_code]) + data)

        # FC06 写单个寄存器
        if function_code == 6:
            if len(pdu) != 5:
                return _exception_response(transaction_id, unit_id, function_code, 3)
            reg_addr, reg_value = struct.unpack(">HH", pdu[1:5])
            exc = _write_single_register(HOLDING_REGISTERS, reg_addr, reg_value)
            if exc is not None:
                return _exception_response(transaction_id, unit_id, function_code, exc)
            print("FC06 write single register: addr={}, value={}".format(
                reg_addr, HOLDING_REGISTERS[reg_addr]
            ))
            return _normal_response(transaction_id, unit_id, pdu)

        # FC16 写多个寄存器
        if function_code == 16:
            if len(pdu) < 6:
                return _exception_response(transaction_id, unit_id, function_code, 3)
            start_addr, quantity, byte_count = struct.unpack(">HHB", pdu[1:6])
            data = pdu[6:]
            if byte_count != len(data) or byte_count != quantity * 2:
                return _exception_response(transaction_id, unit_id, function_code, 3)

            values = []
            for i in range(quantity):
                values.append(struct.unpack(">H", data[i * 2:(i * 2) + 2])[0])

            exc = _write_multiple_registers(HOLDING_REGISTERS, start_addr, quantity, values)
            if exc is not None:
                return _exception_response(transaction_id, unit_id, function_code, exc)
            print("FC16 write multiple registers: start={}, qty={}, values={}".format(
                start_addr, quantity, HOLDING_REGISTERS[start_addr:start_addr + quantity]
            ))
            response_pdu = struct.pack(">BHH", function_code, start_addr, quantity)
            return _normal_response(transaction_id, unit_id, response_pdu)

        return _exception_response(transaction_id, unit_id, function_code, 1)

    except Exception as e:
        print("Request handling error:", e)
        return _exception_response(transaction_id, unit_id, function_code, 4)


# 处理客户端连接
def serve_client(conn, addr):
    print("Client connected:", addr)
    try:
        while True:
            request = conn.recv(260)
            if not request:
                print("Client disconnected")
                break

            response = handle_modbus_request(request)
            if response:
                conn.send(response)
    except Exception as e:
        print("Client error:", e)
    finally:
        try:
            conn.close()
        except Exception:
            pass


def main():
    if USE_DHCP:
        nic = wiznet(BOARD, dhcp=True)
    else:
        nic = wiznet(BOARD, dhcp=False, ip=NET_IP, sn=NET_SN, gw=NET_GW, dns=NET_DNS)

    print("Modbus TCP server IP:", nic.ifconfig()[0])
    print("Unit ID:", UNIT_ID)
    print("Holding registers[0:8]:", HOLDING_REGISTERS[:8])
    print("Input registers[0:8]:", INPUT_REGISTERS[:8])

    s = socket()
    try:
        s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    except Exception:
        pass

    s.bind((nic.ifconfig()[0], SERVER_PORT))
    s.listen(1)
    print("Modbus TCP server listening on {}:{}".format(nic.ifconfig()[0], SERVER_PORT))

    while True:
        conn, addr = s.accept()
        serve_client(conn, addr)


if __name__ == "__main__":
    main()

6.2 代码功能说明

  1. 支持静态 IP / DHCP 双模式切换,适配工业现场部署;
  2. 标准 Modbus TCP 502 端口,兼容所有工业主站设备;
  3. 实现 32 点保持寄存器 + 32 点输入寄存器,可直接对接组态软件;
  4. 完整支持 FC03/04/06/16 四大核心功能码;
  5. 自带工业标准异常响应(地址错误、数据错误、功能码错误);
  6. 硬件协议栈处理 TCP 连接,通信稳定无延迟;实时打印通信日志,方便调试与故障定位;支持长连接、多包连续通信,满足工业自动化要求。

7. 运行结果与测试验证

将代码烧录进设备,串口输出如下:

MPY: soft reboot
Waiting for the network to connect ...
Waiting for the network to connect ...
MAC Address: 02:90:86:88:4d:56
IP Address: ('192.168.11.20', '255.255.255.0', '192.168.11.1', '8.8.8.8')
Modbus TCP server IP: 192.168.11.20
Unit ID: 1
Holding registers[0:8]: [0, 0, 0, 0, 0, 0, 0, 0]
Input registers [0:8]: [1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007]
Modbus TCP server listening on 192.168.11.20:502

使用 Modbus Poll 连接:

  • IP:192.168.11.20
  • Port:502
  • ID:1
  • 功能码:03/04/06/16

可实现:

屏幕录制 2026-04-30 110449

  • 成功读取保持寄存器、输入寄存器
  • 成功写入单个 / 多个寄存器
  • 串口实时打印读写日志
  • 无报错、无丢包、通信稳定

8. 常见问题一站式排查 

8.1 烧录相关问题

问题现象

排查步骤

电脑无法识别RPI-RP2U 盘

1. 确认按住 BOOTSEL 按键再插 USB;

2. 更换支持数据传输的 USB 数据线;

3. 更换电脑 USB 接口(优先使用 USB 2.0 接口);

4. 尝试使用另一台电脑。

固件拖拽后开发板无反应

1. 确认下载的是 W55RP20 专属固件,不是通用树莓派 Pico 固件;

2. 重新烧录固件;

3. 检查 USB 供电是否稳定。

8.2 端口识别问题

问题现象

排查步骤

Thonny 中找不到开发板端口

1. 重新插拔 USB 数据线;

2. 关闭其他占用串口的软件;

3. 在设备管理器中查看是否有Board CDC设备;

4. 重新烧录固件;

5. 安装树莓派 Pico USB 驱动。

  

其他常见问题

问题现象

排查步骤

Modbus Poll 连接失败

1. 检查设备 IP 与电脑在同一网段;

2. 关闭电脑防火墙;

3. 确认设备监听 502 端口;

4. 重启设备与调试软件。

报异常码 02

1. 寄存器地址超出范围;

2. 地址从 0 开始,不要填写过大地址。

报异常码 03

1. 读取长度超出限制(最大 125);

2. 写入数量不符合规范。

通信中断 / 不稳定

1. 使用硬件协议栈不会掉线,多为网线 / 交换机问题;

2. 更换网线,重新插拔测试。

补充问题

问题现象

排查步骤

Thonny 无法识别开发板

1. 更换 USB 线;

2. 安装 Pico 串口驱动;

3. 关闭占用串口软件。


9. 典型应用场景

  1. 工业传感器数据采集(温湿度、压力、流量);
  2. PLC 从站模块扩展;触摸屏 / 组态软件数据监控;
  3. 工业设备远程控制与参数配置;工厂自动化产线数据交互;
  4. Modbus 网关、采集模块、远程 IO 模块开发。

10. 系列总结与资源获取

10.1 系列总结

本系列 16 篇教程已全部完成,覆盖:基础网络 → 时间同步 → HTTP 服务 → 云平台接入 → MQTT 物联网 → Modbus 工业控制W55RP20 硬件协议栈可同时实现物联网 + 工业网双网融合,是智能硬件、工业物联网、边缘采集的最佳选择。

10.1 资源获取

如果本文对你有帮助,欢迎点赞、收藏、关注,你的支持是我们持续更新的动力!

如有任何问题,欢迎在评论区留言,我们会第一时间回复。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐