接下来,我们将用pyhton编写代码,向订阅主题发送 “Hello World from Raspberry Pi!” (梦回 hello world  = ̄ω ̄=   ),做个简单的测试,试试上次部署的emqx能不能成功接收数据。  

前言:        

        树莓派刚刚开机后,终端输入sudo docker start emqx ,使emqx运行起来。

        本想着直接安装 Python 的 MQTT 客户端库 paho-mqtt(在终端输入pip install paho-mqtt),但是输入后就报错了,安装不了。

        这个错误是因为我的树莓派系统可能是较新的 Raspberry Pi OS Bookworm为了保护系统稳定性,禁止直接使用 pip 向全局环境安装 Python 包。这是 PEP 668 规范引入的安全机制,目的是防止 pip 安装的包与系统自带的 apt 包管理器发生冲突。

        而且对于后续如果你把所有包都装在全局系统里,升级一个库可能会搞坏另一个项目。

        所以,这里我们使用虚拟环境。虚拟环境可实现一台设备运行多个系统,具备环境隔离防护作用,风险程序不会影响本机。

        不错吧。对于后续的开发后很友好。O(∩_∩)O!

一、创建并激活虚拟环境

        这一步相当于给你的项目创建一个独立的“工作间”。

1、创建虚拟环境

        在终端输入

python3 -m venv my_mqtt_env

解释:这会在当前目录下生成一个名为 my_mqtt_env 的文件夹

2、激活虚拟环境

        输入

source my_mqtt_env/bin/activate

注意:如果成功,你的命令行提示符前面会出现 (my_mqtt_env) 字样,说明你已经进入这个独立环境了。

二、安装MQTT库

        现在可以在这个独立环境里安全地安装第三方库了。

        输入

pip install paho-mqtt

三、编写测试代码

1、创建文件

        输入命令,打开nano编译器

nano mqtt_test.py

2、编写代码

import paho.mqtt.client as mqtt
import time

# config info
BROKER_IP = "1*.***.***.**"#这里编写自己树莓派的ip,自己记得改
PORT = 1883
TOPIC = "test/hello"
USERNAME = "admin"
PASSWORD = "pipi1234"


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected successfully({BROKER_IP})")
        client.subscribe(TOPIC)
    else:
        print(f"Connected failed with code:{rc}")


def on_message(client, userdata, msg):
    print(f"Received message[topic:{msg.topic}]:{msg.payload.decode()}")


try:
    client = mqtt.Client()
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message

    print("Trying to connect...")
    client.connect(BROKER_IP, PORT, 60)

    client.loop_start()

    time.sleep(1)
    print(f"Publishing message to topic '{TOPIC}'...")
    client.publish(TOPIC, "Hello World from Raspberry Pi!")

    while True:
        time.sleep(1)

except KeyboardInterrupt:
    print("\nProgram stopped")
    client.loop_stop()
    client.disconnect()

        这里大家注意python编写的格式,我就是在缩进部分报了n个错误了,改死我了( ̄ε(# ̄)。

3、保存并退出

        按Ctrl + O ,然后回车 确定保存。

        按Ctrl + X ,退出编译器。

四、运行测试

        确保你还在虚拟环境中(提示符前有 (my_mqtt_env)),输入:

python mqtt_test.py

五、结果

        EMQX 默认配置是完全够用的,不需要做任何额外修改。只需要确保树莓派的防火墙放行了 MQTT 端口,并在 EMQX 后台确认服务状态即可。

        我是下了一个MQTTX 的软件,所以可以在那里便捷的看到可视化信息。

(此乃官方出的图形化测试工具)

        在mqttx中订阅这个test/hello主题,接收到树莓派发来的 “Hello World from Raspberry Pi!”。这里的{“msg”:"hello"}是我在mqttx上发过去的,树莓派接收到了,并且在终端显示。

        发送hello world,树莓派接收到了

        按Ctrl + X 停止哦。

六、搞点其他的玩

        在上述代码中,树莓派是只发送了一次信息,然后一直循环1s等待主题那边的信息来接收。

        那我就是想让树莓派一直发信息,像瀑布一样,一秒一个才测试起来爽。

        行。那就改改代码。

        在之前代码的基础上,就改了上图的部分。

        然后就得到了:

        爽了爽了。。d=====( ̄▽ ̄*)b

        按Ctrl + X 停止哦。

最后小贴士:

        下次你再开机想运行这个项目时,记得先执行这一句来“唤醒”环境:

source my_mqtt_env/bin/activate

        然后再运行 python mqtt_test.py 即可。

输入 deactivate 可以退出虚拟系统。

接下来,可以尝试用树莓派外设接收的信息来发送吧。ο(=•ω<=)ρ⌒☆

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐