本文为 WIZnet W55RP20 芯片 MicroPython 教程第 12 篇,基于官方最新固件编写,代码均经过实际验证,可直接烧录运行。
版权声明:本文为 WIZnet 官方原创技术文章,转载请注明出处。

前言

上一篇实战教程,我们已经完成了W55RP20芯片HTTP协议与OneNET平台数据上云。
本篇内容我们聚焦物联网核心通信协议-MQTT.MQTT是物联网最主流的轻量级通信协议,采用发布/订阅(Pub/Sub)模式,带宽占用小、功耗低、支持断线重连,非常适合嵌入式以太网设备上云与设备间通信。W55RP20集成硬件 TCP/IP 协议栈,搭配MicroPython的umqtt库,几行代码即可实现稳定MQTT通信,无需关心底层Socket细节。
学完本文,你将掌握:

  •    MQTT协议核心原理与发布/订阅机制
  •    W55RP20连接本地/公共MQTT服务器
  •    实现消息发布、订阅、回调处理
  •    心跳保活、断线重连等工业级稳定机制
  •    本地MQTT测试环境搭建

传统嵌入式MQTT开发的3大痛点,W55RP20一键解决:

1. 协议栈调试复杂:软件TCP/IP协议栈需手动移植、调试,占用MCU资源,易出现卡顿、内存溢出;W55RP20内置硬件TCP/IP协议栈,网络层处理完全由硬件完成,不占用CPU资源,无需调试协议栈。

2. 硬件接线繁琐:外接网络模块需手动焊接、调试接口时序,新手易出错;W55RP20板载以太网接口,无需额外外接模块,即插即用,大幅简化接线操作。

3. 入门门槛高:MQTT客户端连接、心跳保活等细节需手动编写,调试周期长;本文提供可直接运行的完整代码,注释清晰,配合10分钟分步操作,新手也能快速出效果,依托MicroPython免编译特性,代码修改即时生效、调试实时反馈,快速获得成就感。

本节课核心目标:

· 完成W55RP20与电脑的硬件连接、开发环境配置

· 理解MQTT协议核心原理与极简工作流程

· 烧录MQTT客户端代码,实现与公共MQTT服务器的连接

· 完成主题订阅、消息发布与回环通信测试

· 掌握常见故障的快速排查方法

系列教程学习路径

本专栏共 16 篇,循序渐进覆盖 W55RP20-EVB-Pico 模块 C语言开发全流程:

 1.第 1 篇:静态 IP 配置与网络基础

2.第 2 篇:DHCP 自动联网与网络诊断

3.第 3 篇:TCP Client 客户端通信

4.第 4 篇:TCP Server 服务端通信

5.第 5 篇:UDP 单播数据通信

6.第 6 篇:UDP 组播/广播数据通信

7.第 7 篇:DNS 域名解析

8.第 8 篇:NTP 从网络获取时间

9.第 9 篇:HTTP Client 客户端请求

10.第 10 篇:HTTP Server 服务端搭建

11.第 11 篇:HTTP 协议与 OneNET 平台数据上云

12.第 12 篇:MQTT 协议基础通信验证(本文)

13.第 13 篇:MQTT 协议与阿里云平台对接

14.第 14 篇:MQTT 协议与 OneNET 平台对接

15.第 15 篇:MQTT 协议与 ThingSpeak 平台对接

16.第 16 篇:Modbus 工业协议通信

建议收藏本专栏,跟随教程逐步学习,所有代码均会同步更新至官方 Gitee 仓库

目录

前言

系列教程学习路径

 1. 准备工作

1.1 软件准备

1.2 硬件准备

2. 烧录 W55RP20-EVB-MKR  模块专属 MicroPython 固件

3. 硬件连接与开发环境配置

3.1 硬件连接

3.1.1 基础连接(供电+调试)

3.1.2 以太网连接

3.1.3 模块与开发板接线

3.2 Thonny 开发环境配置

4. MQTT协议极简解析

5. 核心代码复制与烧录

5.1 依赖库说明

5.2 完整代码以及烧录历程

6. 运行结果与MQTT通信验证

6.1 串口输出结果

6.2 MQTT回环通信验证

7. 常见问题一站式排查

8. W55RP20 核心优势对比

9. 典型应用场景

10. 系列预告与资源获取

10.1系列预告

10.2 资源获取

 1. 准备工作

1.1 软件准备

所需软件均为免费版本,按要求下载安装即可,无需额外付费。

软件名称

版本要求

下载地址

说明

Thonny

4.0 及以上

Thonny 官方下载

轻量级 MicroPython IDE,支持代码编辑、烧录与串口调试,新手友好

W55RP20-EVB-MKR 模块 MicroPython 固件

最新稳定版

WIZnet 官方固件下载

专为 W55RP20-EVB-MKR 模块 编写,已集成 WIZnet 硬件驱动与协议栈

1.2 硬件准备

  • W55RP20-EVB-MKR  × 1
  • Micro USB 数据线(必须支持数据传输,不能使用纯充电线)× 1
  • 标准网线 × 1
  • 开启 DHCP 功能的路由器 / 交换机 × 1(用于获取网络参数,实现 DNS 解析)

W55RP20-EVB-MKR 模块已集成以太网相关器件,无需额外焊接飞线,配合 RP2040 开发板可快速搭建开发环境,大幅降低接线错误和硬件故障概率。

2. 烧录 W55RP20-EVB-MKR  模块专属 MicroPython 固件

W55RP20-EVB-Pico 模块 完全兼容树莓派 Pico 的 UF2 固件烧录方式,操作简单无需额外烧录器,新手可快速上手:

  1. 按住 RP2040 开发板上的 BOOTSEL 按键不放;
  2. 使用 Micro USB 数据线连接开发板与电脑;
  3. 待电脑识别出名为 RPI-RP2 的 U 盘后,松开 BOOTSEL 按键;
  4. 将下载好的 W5500_RP2040_firmware.uf2 固件文件拖拽到 U 盘中;
  5. 开发板会自动重启,固件烧录完成。

注意:如果电脑没有识别出 RPI-RP2 U 盘,请尝试更换 USB 数据线、重新插拔开发板,或更换电脑 USB 接口(优先使用 USB 2.0 接口)。

3. 硬件连接与开发环境配置

3.1 硬件连接

W55RP20-EVB-MKR  模块连接分为两步,分别实现供电/调试和以太网连接,操作简单,无需复杂接线:

3.1.1 基础连接(供电+调试)

使用 Micro USB 数据线连接 RP2040 开发板与电脑,用于开发板供电、代码烧录和串口调试。

3.1.2 以太网连接

使用网线连接 W55RP20-EVB-MKR  模块的以太网接口与路由器的 LAN 口(或直接连接电脑网口,需手动配置电脑 IP 与开发板同网段)。

3.1.3 模块与开发板接线

若使用分离式模块与开发板,需按以下引脚对应连接(SPI 通信):

3.2 Thonny 开发环境配置

打开 Thonny 软件,按以下步骤配置开发环境,确保代码能正常烧录和运行:

  1. 点击顶部菜单栏「运行」→「配置解释器」;
  2. 切换到「解释器」选项卡;
  3. 在「解释器」下拉列表中选择 MicroPython (通用);
  4. 在「端口」下拉列表中选择开发板对应的串口(通常显示为 Board CDC @ COMx);
  5. 勾选「运行代码前先重启解释器」和「同步设备的实时时钟」;
  6. 点击「确定」完成配置。

如果端口列表中没有出现开发板,请尝试:

  • 重新插拔 USB 数据线;

  • 更换支持数据传输的 USB 数据线;

  • 关闭其他占用串口的软件(如串口助手、Arduino IDE 等);

  • 重新烧录 MicroPython 固件;

  • 安装树莓派 Pico USB 驱动。

4. MQTT协议极简解析

MQTT是一种轻量级物联网消息传输协议,基于发布/订阅模式,适用于硬件资源有限的嵌入式设备和带宽有限的网络环境,广泛应用于物联网、智能硬件、工业控制等场景,核心优势是轻量、高效、可靠、易实现,最小控制消息仅需两个数据字节,消息头精简,可优化网络带宽。

极简工作流程(4步):

1. 客户端(W55RP20)连接到MQTT服务器(本文使用免费公共服务器,无需注册);

2. 客户端订阅指定主题(如/W55RP20/sub),用于接收消息;

3. 客户端(或其他设备)向指定主题发布消息;

4. 所有订阅该主题的客户端,均可接收并处理消息。

本文实战使用免费公共MQTT服务器(broker-cn.emqx.io,端口1883),无需注册,直接连接即可使用,进一步节省时间,助力10分钟完成实战。

5. 核心代码复制与烧录

5.1 依赖库说明

核心依赖 umqttsimple.py 库,用于实现MQTT客户端的连接、订阅、发布等基础功能,该库是MicroPython生态中常用的轻量级MQTT库,体积小、适配性强,无需额外修改,直接保存至W55RP20即可使用,可通过MicroPython官方仓库下载或直接复制本文提供的完整代码。

5.2 完整代码以及烧录历程

所需的开发环境
Thonny
如果你必须编译MicroPython,则必须使用Linux或Unix环境。
第一步:将程序复制到Thonny中,然后选择环境为Raspberry Pi Pico。

第二步:将umqttsimple.py库文件保存到开发板 中。

第三步:运行程序,并打开MQTTX ,连接相同的服务器,然后订阅主题为开发板的发布主题,发布主题为开发板的订阅主题。下面有图

第四步:在MQTTX发送消息观察回环测试效果。

注意:因为MicroPython的print函数是启用了stdout缓冲的,所以有时候并不会第一时间打印出内容。

mqttsimple.py代码:

#mqtt.py file
import network
import time
try:
    from machine import Pin, WIZNET_PIO_SPI
except ImportError:
    WIZNET_PIO_SPI = None
    Pin = None

_DEFAULTS = {
    "w5100s-evb-pico":  {},
    "w5500-evb-pico":   {},
    "w6100-evb-pico":   {},
    "w5100s-evb-pico2": {},
    "w5500-evb-pico2":  {},
    "w6100-evb-pico2":  {},
    "w55rp20-evb-pico": {"baudrate": 31250000, "sck": 21, "cs": 20, "mosi": 23, "miso": 22, "reset": 25},
    "w6300-evb-pico":  {"baudrate": 31250000, "sck": 17, "cs": 16, "io0": 18, "io1": 19, "io2": 20, "io3": 21, "reset": 22},
    "w6300-evb-pico2": {"baudrate": 31250000, "sck": 17, "cs": 16, "io0": 18, "io1": 19, "io2": 20, "io3": 21, "reset": 22},
}
_AUTO = {
    "w5100s-evb-pico", "w5500-evb-pico", "w6100-evb-pico",
    "w5100s-evb-pico2","w5500-evb-pico2","w6100-evb-pico2",
}
_SINGLE = {"w55rp20-evb-pico"}
_QSPI   = {"w6300-evb-pico", "w6300-evb-pico2"}

def _pin(x): return x if isinstance(x, Pin) else Pin(x)

def wiznet(board, *, dhcp=True, spi=None, cs=None, reset=None, **kw):
    board = board.strip().lower()
    if board not in _DEFAULTS:
        raise ValueError("Unsupported board: {}".format(board))
    cfg = _DEFAULTS[board].copy()
    cfg.update(kw)
    
    if spi is not None:
        if cs is None or reset is None:
            raise ValueError("When passing custom spi, also pass cs and reset")
        nic = network.WIZNET6K(spi, cs, reset)
    else:
        if board in _AUTO:
            nic = network.WIZNET6K()
        elif board in _SINGLE:
            if WIZNET_PIO_SPI is None or Pin is None:
                raise RuntimeError("WIZNET_PIO_SPI/Pin not available on this port")
            required = ["sck", "cs", "mosi", "miso", "reset"]
            missing = [k for k in required if k not in cfg]
            if missing:
                raise ValueError("Missing pins for W55RP20 single-SPI: " + ", ".join(missing))
            spi = WIZNET_PIO_SPI(
                baudrate=cfg.get("baudrate", 31250000),
                sck=_pin(cfg["sck"]), cs=_pin(cfg["cs"]),
                mosi=_pin(cfg["mosi"]), miso=_pin(cfg["miso"]),
            )
            nic = network.WIZNET6K(spi, _pin(cfg["cs"]), _pin(cfg["reset"]))
        elif board in _QSPI:
            if WIZNET_PIO_SPI is None or Pin is None:
                raise RuntimeError("WIZNET_PIO_SPI/Pin not available on this port")
            for k in ["sck","cs","io0","io1","io2","io3"]:
                if k not in cfg: raise ValueError("Missing pin '{}' for W6300 QSPI".format(k))
            spi = WIZNET_PIO_SPI(
                baudrate=cfg.get("baudrate", 31250000),
                sck=_pin(cfg["sck"]), cs=_pin(cfg["cs"]),
                io0=_pin(cfg["io0"]), io1=_pin(cfg["io1"]),
                io2=_pin(cfg["io2"]), io3=_pin(cfg["io3"]),
            )
            nic = network.WIZNET6K(spi, _pin(cfg["cs"]), _pin(cfg.get("reset", cfg["cs"])))
        else:
            raise ValueError("Unexpected board mapping")

    try: nic.active(True)
    except AttributeError: pass

    if dhcp:
        try: nic.ifconfig("dhcp")
        except Exception: pass
    else:
        ip = cfg.get("ip"); sn = cfg.get("sn"); gw = cfg.get("gw"); dns = cfg.get("dns", gw or "8.8.8.8")
        if not (ip and sn and gw): raise ValueError("Static mode requires ip/sn/gw")
        nic.ifconfig((ip, sn, gw, dns))
    while not nic.isconnected():
        print("Waiting for the network to connect...")
        time.sleep(1)

    print("MAC Address:", ":".join("%02x" % b for b in nic.config("mac")))
    print("IP Address:", nic.ifconfig()[0])
    return nic

from umqttsimple import MQTTClient
from machine import Timer
import machine

#mqtt config
mqtt_params = {}
mqtt_params['url'] = 'broker.emqx.io'
mqtt_params['port'] = 1883
mqtt_params['clientid'] = 'W55RP20'
mqtt_params['pubtopic'] = '/W55RP20/pub'
mqtt_params['subtopic'] = '/W55RP20/sub'
mqtt_params['pubqos'] = 0
mqtt_params['subqos'] = 0

timer_1s_count = 0
tim = Timer()
client = None

def w5x00_init():
    nic = wiznet("w55rp20-evb-pico", dhcp=True)
    print("IP         :",nic.ifconfig()[0])
    print("Subnet Mask:",nic.ifconfig()[1])
    print("Gateway    :",nic.ifconfig()[2])
    print("DNS        :",nic.ifconfig()[3],"\r\n")
    return nic

def sub_cb(topic, msg):
    topic = topic.decode('utf-8')
    msg = msg.decode('utf-8')
    if topic == mqtt_params['subtopic']:
        global client
        print("\r\ntopic:",topic,"\r\nrecv:", msg)
        client.publish(mqtt_params['pubtopic'],msg,qos=mqtt_params['pubqos'])
        print('\r\ntopic:',mqtt_params['pubtopic'],'\r\nsend:',msg)

def mqtt_connect():
    global client
    # 每次重连换不同ID,防止服务器拒绝
    from time import time
    new_id = "W55RP20_" + str(time())
    client = MQTTClient(new_id, mqtt_params['url'], mqtt_params['port'], keepalive=60)
    client.connect()
    print('Connected to %s MQTT Broker'%(mqtt_params['url']))
    return client

def reconnect():
    print('MQTT 连接断开,正在重新连接...')
    time.sleep(3)

def tick(timer):
    global timer_1s_count
    global client
    timer_1s_count += 1
    if timer_1s_count >= 30:
        timer_1s_count = 0 
        try:
            client.ping()
        except:
            pass

def subscribe(client):
    client.set_callback(sub_cb)
    client.subscribe(mqtt_params['subtopic'],mqtt_params['subqos'])
    print('subscribed to %s'%mqtt_params['subtopic'])

def main():
    global client
    print("W55RP20 chip MQTT example")

    # 网络只初始化一次
    w5x00_init()

    # ========== 核心:无限循环 + 出错自动重连 ==========
    while True:
        try:
            client = mqtt_connect()
            subscribe(client)
            tim.init(freq=1, callback=tick)

            # 正常运行
            while True:
                client.wait_msg()

        except Exception as e:
            print("检测到错误,即将重连:", e)
            try:
                client.disconnect()
            except:
                pass
            time.sleep(2)

if __name__ == "__main__":
    main()

补充umqttsimple.py库代码(复制保存至W55RP20开发板,与mqtt.py同级):

#umqttsimple.py file
import usocket as socket
import ustruct as struct
from ubinascii import hexlify


class MQTTException(Exception):
    pass


class MQTTClient:
    def __init__(
        self,
        client_id,
        server,
        port=0,
        user=None,
        password=None,
        keepalive=0,
        ssl=False,
        ssl_params={},
    ):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False

    def _send_str(self, s):
        self.sock.write(struct.pack("!H", len(s)))
        self.sock.write(s)

    def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            b = self.sock.read(1)[0]
            n |= (b & 0x7F) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
        self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    def connect(self, clean_session=True):
        self.sock = socket.socket()
        addr = socket.getaddrinfo(self.server, self.port)[0][-1]
        self.sock.connect(addr)
        if self.ssl:
            import ussl

            self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
        premsg = bytearray(b"\x10\0\0\0\0\0")
        msg = bytearray(b"\x04MQTT\x04\x02\0\0")

        sz = 10 + 2 + len(self.client_id)
        msg[6] = clean_session << 1
        if self.user is not None:
            sz += 2 + len(self.user) + 2 + len(self.pswd)
            msg[6] |= 0xC0
        if self.keepalive:
            assert self.keepalive < 65536
            msg[7] |= self.keepalive >> 8
            msg[8] |= self.keepalive & 0x00FF
        if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[6] |= self.lw_retain << 5

        i = 1
        while sz > 0x7F:
            premsg[i] = (sz & 0x7F) | 0x80
            sz >>= 7
            i += 1
        premsg[i] = sz

        self.sock.write(premsg, i + 2)
        self.sock.write(msg)
        # print(hex(len(msg)), hexlify(msg, ":"))
        self._send_str(self.client_id)
        if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
        if self.user is not None:
            self._send_str(self.user)
            self._send_str(self.pswd)
        resp = self.sock.read(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        return resp[2] & 1

    def disconnect(self):
        self.sock.write(b"\xe0\0")
        self.sock.close()

    def ping(self):
        self.sock.write(b"\xc0\0")

    def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7F:
            pkt[i] = (sz & 0x7F) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        # print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt, i + 1)
        self._send_str(topic)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
        self.sock.write(msg)
        if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                    sz = self.sock.read(1)
                    assert sz == b"\x02"
                    rcv_pid = self.sock.read(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
                        return
        elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        # print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt)
        self._send_str(topic)
        self.sock.write(qos.to_bytes(1, "little"))
        while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                # print(resp)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])
                return

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = self.sock.read(1)
#         self.sock.setblocking(True)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)
        if res == b"\xd0":  # PINGRESP
            sz = self.sock.read(1)[0]
            assert sz == 0
            return None
        op = res[0]
        if op & 0xF0 != 0x30:
            return op
        sz = self._recv_len()
        topic_len = self.sock.read(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = self.sock.read(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = self.sock.read(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = self.sock.read(sz)
        self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            self.sock.write(pkt)
        elif op & 6 == 4:
            assert 0
        return op

    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
#         self.sock.setblocking(False)
        return self.wait_msg()

补充wiznet_init.py库代码(复制保存至W55RP20开发板,与mqtt.py同级):

import network
import time
try:
    from machine import Pin, WIZNET_PIO_SPI
except ImportError:
    WIZNET_PIO_SPI = None
    Pin = None

_DEFAULTS = {
    # Auto-construct boards (no explicit PIO SPI)
    "w5100s-evb-pico":  {},
    "w5500-evb-pico":   {},
    "w6100-evb-pico":   {},
    "w5100s-evb-pico2": {},
    "w5500-evb-pico2":  {},
    "w6100-evb-pico2":  {},

    # W55RP20 — single SPI (PIO SPI)
    "w55rp20-evb-pico": {"baudrate": 31250000, "sck": 21, "cs": 20, "mosi": 23, "miso": 22, "reset": 25},

    # W6300 — QSPI QUAD(io0..io3)
    "w6300-evb-pico":  {"baudrate": 31250000, "sck": 17, "cs": 16, "io0": 18, "io1": 19, "io2": 20, "io3": 21, "reset": 22},
    "w6300-evb-pico2": {"baudrate": 31250000, "sck": 17, "cs": 16, "io0": 18, "io1": 19, "io2": 20, "io3": 21, "reset": 22},
}
_AUTO = {
    "w5100s-evb-pico", "w5500-evb-pico", "w6100-evb-pico",
    "w5100s-evb-pico2","w5500-evb-pico2","w6100-evb-pico2",
}
_SINGLE = {"w55rp20-evb-pico"}   # PIO single-SPI
_QSPI   = {"w6300-evb-pico", "w6300-evb-pico2"}

def _pin(x): return x if isinstance(x, Pin) else Pin(x)

def wiznet(board, *, dhcp=True, spi=None, cs=None, reset=None, **kw):
    board = board.strip().lower()
    if board not in _DEFAULTS:
        raise ValueError("Unsupported board: {}".format(board))
    cfg = _DEFAULTS[board].copy()
    cfg.update(kw)
    
    # Manual override path: if spi is provided, use it directly
    if spi is not None:
        if cs is None or reset is None:
            raise ValueError("When passing custom spi, also pass cs and reset")
        nic = network.WIZNET6K(spi, cs, reset)
    else:
        if board in _AUTO:
            nic = network.WIZNET6K()

        elif board in _SINGLE:
            if WIZNET_PIO_SPI is None or Pin is None:
                raise RuntimeError("WIZNET_PIO_SPI/Pin not available on this port")
            required = ["sck", "cs", "mosi", "miso", "reset"]
            missing = [k for k in required if k not in cfg]
            if missing:
                raise ValueError("Missing pins for W55RP20 single-SPI: " + ", ".join(missing))
            spi = WIZNET_PIO_SPI(
                baudrate=cfg.get("baudrate", 31250000),
                sck=_pin(cfg["sck"]), cs=_pin(cfg["cs"]),
                mosi=_pin(cfg["mosi"]), miso=_pin(cfg["miso"]),
            )
            nic = network.WIZNET6K(spi, _pin(cfg["cs"]), _pin(cfg["reset"]))

        elif board in _QSPI:
            if WIZNET_PIO_SPI is None or Pin is None:
                raise RuntimeError("WIZNET_PIO_SPI/Pin not available on this port")
            for k in ["sck","cs","io0","io1","io2","io3"]:
                if k not in cfg: raise ValueError("Missing pin '{}' for W6300 QSPI".format(k))
            spi = WIZNET_PIO_SPI(
                baudrate=cfg.get("baudrate", 31250000),
                sck=_pin(cfg["sck"]), cs=_pin(cfg["cs"]),
                io0=_pin(cfg["io0"]), io1=_pin(cfg["io1"]),
                io2=_pin(cfg["io2"]), io3=_pin(cfg["io3"]),
            )
            nic = network.WIZNET6K(spi, _pin(cfg["cs"]), _pin(cfg.get("reset", cfg["cs"])))

        else:
            raise ValueError("Unexpected board mapping")

    # Bring up (if supported)
    try: nic.active(True)
    except AttributeError: pass

    if dhcp:
        try: nic.ifconfig("dhcp")
        except Exception: pass
    else:
        ip = cfg.get("ip"); sn = cfg.get("sn"); gw = cfg.get("gw"); dns = cfg.get("dns", gw or "8.8.8.8")
        if not (ip and sn and gw): raise ValueError("Static mode requires ip/sn/gw")
        nic.ifconfig((ip, sn, gw, dns))
    while not nic.isconnected():
        print("Waiting for the network to connect...")
        time.sleep(1)

    print("MAC Address:", ":".join("%02x" % b for b in nic.config("mac")))
    print("IP Address:", nic.ifconfig())
    return nic


6. 运行结果与MQTT通信验证

6.1 串口输出结果

在Thonny中点击运行按钮(或按F5键),Shell窗口会输出以下内容,说明网络初始化和MQTT连接成功:

说明:若打印“Waiting for the network to connect.....”,属于正常现象,等待1-3秒即可获取IP;若一直无法获取IP,参考“常见问题排查”部分。


MPY: soft reboot
W55RP20 chip MQTT example
Waiting for the network to connect...
Waiting for the network to connect...
Waiting for the network to connect...
Waiting for the network to connect...
MAC Address: 02:90:86:88:4d:56
IP Address: 192.168.1.118
IP         : 192.168.1.118
Subnet Mask: 255.255.255.0
Gateway    : 192.168.1.1
DNS        : 202.96.134.33 

Connected to broker.emqx.io MQTT Broker
subscribed to /W55RP20/sub

6.2 MQTT回环通信验证

使用MQTTX工具测试,步骤如下(不会的照着下面的图来):

1. 打开MQTTX,点击“+”号,创建新连接:

- 名称:自定义(如W55RP20测试)

- 服务器:broker-cn.emqx.io

- 端口:1883

- 其他参数默认,点击“连接”;

2. 订阅W55RP20的发布主题:/W55RP20/pub;可以自己定义

3. 向W55RP20的订阅主题:/W55RP20/sub 发送消息(如“W55RP20 MQTT测试”);可以自己定义

4. 观察两个地方的反馈:

- Thonny Shell窗口:打印收到的消息,以及“消息回环成功”提示;

- MQTTX工具:收到W55RP20回环发布的消息(与发送的消息一致)。

至此,W55RP20的MQTT通信实战完成

mqttx网址:MQTTX:全功能 MQTT 客户端工具

7. 常见问题一站式排查

问题现象 排查步骤
1. Thonny 无法识别 W55RP20 端口 1. 更换支持数据传输的 USB 数据线2. 重新插拔开发板、关闭其他占用串口的软件(如串口助手)
2. 无法获取 IP 地址,一直打印 “等待网络连接中...” 1. 检查网线是否插紧2. 路由器是否开启 DHCP3. 网线是否连接路由器 LAN 口(不是 WAN 口),重启路由器和开发板
3. MQTT 连接失败,触发重连 确认网络已连接、MQTT 服务器地址和端口正确(本文使用的公共服务器无需注册)、umqttsimple.py 库已正确保存。
4. 收到消息但无法回环发布 1. 确认代码中发布主题和订阅主题配置正确2. MQTT 连接未断开3. 重启开发板重新运行代码
5. 串口无打印或打印乱码 1. 确认 Thonny 解释器和端口配置正确2. 固件为 W55RP20 专属固件3. 重新烧录固件

8. W55RP20 核心优势对比

为了让你更直观地了解 W55RP20 的价值,我们对比了目前主流的三种嵌入式以太网方案:

对比维度 W55RP20 集成方案 外接 PHY 芯片 方案 外接 串口转以太网模块 方案
BOM 成本 低(单芯片) 中高(MCU + 模块 + 外围器件)
PCB 面积 小(仅需网口电路) 大(需预留芯片和布线空间)
开发难度 低(一行代码联网) 中高(调试协议栈、编写驱动)
网络稳定性 极高(WIZnet 专注硬件 TCP/IP 协议栈 25 年) 不定(对于研发人员要求高,熟悉协议栈与网络开发,才能调试稳定) 不定(视研发公司能力水平)
CPU 资源占用 0%(协议栈网络处理完全由硬件完成) 50% 以上(协议栈完全运行在 MCU 上,占用相关资源) 0%
硬件 Socket 数量 8 个独立硬件 Socket 视 MCU 能力而定,理论支持多路拓展 一般为单路透传
网络吞吐量 最高 15Mbps 视 MCU 能力而定 约 3-5Mbps
接口易用性 单芯片集成 要 MCU 带有 MII/RMII 等接口 TTL 接口
部署难度 低(MicroPython 成熟固件,应用层协议绝大部分均有库文件,可灵活添加部署) 高(应用层协议需要手动移植开源库适配) 视模块集成情况,无集成的功能需要自我封包拆包

9. 典型应用场景

W55RP20 芯片集成以太网功能,结合其工业级稳定性,非常适合以下应用场景:

1.工业数据采集网关:简化现场部署,实现传感器数据的稳定上传
2.远程监控终端:用于工厂、机房、变电站等环境的设备状态远程监控
3.串口转网口设备:将传统 RS232/RS485 串口设备快速升级为以太网设备
4.智能楼宇节点:用于照明、空调、门禁等楼宇设备的网络控制
5.工业 PLC 扩展模块:为 PLC 增加以太网通信能力,实现远程编程和数据采集

10. 系列预告与资源获取

10.1系列预告

下一篇教程我们将讲解 W55RP20 MicroPython开发下的 MQTT+阿里云实现上传数据,带你了解连接建立、心跳包维护、异常重连等关键机制,掌握嵌入式以太网通信的基础能力。

10.2 资源获取

 - 本文完整代码:WIZnet Pico MicroPython 示例工程
- W55RP20 芯片手册:WIZnet 官方资料页面

下一篇讲解:W55RP20-EVB-MKR MicroPython 实战(13):MQTT 协议与阿里云 IoT 平台对接


如果本文对你有帮助,欢迎点赞、收藏、关注,你的支持是我们持续更新的动力!如有任何问题,欢迎在评论区留言,我们会第一时间回复。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐