Vue + MQTT 实时消息推送,前端开发必会的几种实战方案

前言

干前端的朋友应该都遇到过这种需求:页面上的数据要实时刷新,比如大屏监控、设备状态、告警通知这些场景。后端不可能让你每隔几秒轮询一次接口,那样服务器扛不住,用户体验也差。

这时候就需要实时推送技术了。常见的方案有 WebSocket、SSE、Socket.IO,但在物联网(IoT)、工业监控、智慧园区这类场景下,MQTT 才是主流选择。今天我就结合公司实际项目,把前端接入 MQTT 的几种做法给你讲清楚。


一、MQTT 是什么?

简单理解,MQTT(Message Queuing Telemetry Transport)是一种发布/订阅模式的轻量级消息协议。它有三个核心角色:

  • Publisher(发布者):发送消息的设备或服务,比如一个温度传感器
  • Broker(消息中间件):消息的中转站,比如 EMQX、Mosquitto、RabbitMQ
  • Subscriber(订阅者):接收消息的一方,比如你的前端页面

工作方式也很直观:发布者往某个 Topic(主题) 发消息,所有订阅了这个 Topic 的订阅者都能收到。Topic 是分层的,比如 /building/floor1/temperature,你可以用通配符批量订阅。

为什么前端要用 MQTT 而不是普通 WebSocket?

  • MQTT 本身就是基于 TCP 的,在浏览器端通过 WebSocket 传输,天生支持双向通信
  • 自带 QoS(服务质量等级),0/1/2 三档可选,保证消息不丢不重
  • 自带心跳保活断线重连机制
  • Topic 机制非常灵活,一条连接可以订阅多个主题
  • 物联网设备几乎都支持 MQTT,前后端统一协议

二、方案一:前端直连 MQTT Broker(Paho.js)

这是最直接的做法,也是我们公司项目中实际采用的方案。前端通过 Paho MQTT JavaScript 库,直接用 WebSocket 连到 MQTT Broker。

2.1 整体架构

[物联网设备] ──MQTT──> [MQTT Broker] <──WebSocket── [Vue 前端页面]
                              ↑
                         [后端服务] ── 提供 MQTT 连接配置(host/port/topic/账号密码)

2.2 后端做什么?

后端不参与消息转发,只负责提供连接配置。前端先调一个接口拿到配置信息:

// src/api/mqtt.js
// 后端接口:返回 MQTT 的连接参数
export function mqttInfoViews(data) {
    return request({
        url: '/Setting/mqttInfoViews',
        method: 'get',
        params: { moduleType: data.moduleType }
    })
}

后端返回的数据大致长这样(经过 AES 解密后):

[
  {
    "id": 1,
    "host": "192.168.1.100",
    "port": 8083,
    "clientId": "web_client_001",
    "userName": "admin",
    "passWord": "123456",
    "topic": "/alarm/",
    "qos": 1,
    "messageType": 3,
    "moduleType": 1
  }
]

这样做的好处是:MQTT 的地址、账号密码都存后端,前端不硬编码,安全且灵活。

2.3 前端核心代码

封装连接工具函数:

// src/util/mqttConnect.js
import Paho from '@/assets/js/mqtt'

export function mqttConnect(config, params, onMessageArrived, onConnectionLost) {
    if (!config) return
    
    let { host, port, clientId, userName, passWord, qos, topic } = config
    let isHttps = window.location.href.indexOf('https') !== -1
    
    // 根据消息类型调整端口(适配 RabbitMQ / EMQX 等不同 Broker)
    if (config.messageType == 3) {
        port = isHttps ? 8084 : 8083  // WebSocket 端口
    }
    
    if (!host || !port) {
        console.log('mqtt配置参数不完整')
        return null
    }
    
    // 创建 MQTT 客户端
    let client = new Paho.MQTT.Client(host, Number(port), clientId)
    
    let options = {
        userName: userName,
        password: passWord,
        useSSL: isHttps ? true : false,
        keepAliveInterval: 30,  // 心跳间隔 30 秒
        onSuccess: () => {
            // 连接成功后订阅 Topic
            // 支持单个和批量订阅
            if (params.indexOf(',') !== -1) {
                let arr = params.split(',')
                arr.forEach(item => {
                    client.subscribe(topic + item, { qos: Number(qos) })
                })
            } else {
                client.subscribe(topic + params, { qos: Number(qos) })
            }
        },
        onFailure: (e) => {
            console.log('连接失败,15秒后重试')
            setTimeout(() => {
                mqttConnect(config, params, onMessageArrived, onConnectionLost)
            }, 15000)
        }
    }
    
    client.connect(options)
    
    // 掉线回调
    client.onConnectionLost = (e) => {
        console.log('连接断开:', e)
        onConnectionLost && onConnectionLost()
    }
    
    // 接收消息回调
    client.onMessageArrived = onMessageArrived
    
    return client
}

封装管理类(单例模式管理多连接):

// src/util/mqttUtil.js
import { mqttInfoViews } from "@/api/mqtt"
import { mqttConnect } from "@/util/mqttConnect"
import store from '@/vuex'

class MqttUtil {
    constructor() {
        this.clientObj = {}  // 存储所有 MQTT 连接实例
    }
    
    // 从后端获取配置并建立连接
    async getMqttInfoViews(type, cb) {
        let data = await mqttInfoViews({ moduleType: type })
        // AES 解密配置
        let newData = JSON.parse(decryptAes(data, dayjs().format("YYYYMMDD") + 'ABCDEFGHIJKLMN0123456789'))
        
        newData.forEach((config) => {
            if (this.clientObj[config.id]) this.closeClient(config.id)
            this.createClient(config, type, cb)
        })
    }
    
    // 创建单个 MQTT 连接
    createClient(config, type, successCB) {
        let client = mqttConnect(
            config,
            config.pid,
            (res) => {
                // 收到消息,解析 payload
                const payload = JSON.parse(res.payloadString)
                
                // 根据模块类型分发到不同的 Vuex Store
                if (config.moduleType == 1) {
                    // 告警消息
                    store.commit("isGetAlarm", new Date().getTime())
                } else if (config.moduleType == 10) {
                    // 实时数据
                    store.commit("yzSgccMqttRealtimeData", payload)
                } else if (config.moduleType == 11) {
                    // 控制指令回执
                    store.commit("controlData", payload)
                } else {
                    successCB && successCB(res, type)
                }
            },
            (error) => {
                // 断线后定时重连
                setInterval(() => {
                    this.getMqttInfoViews(config.moduleType)
                }, 30000)
            }
        )
        
        if (client) {
            this.clientObj[config.id] = client
        }
    }
    
    // 关闭连接
    closeClient(type) {
        if (this.clientObj[type] && this.clientObj[type].isConnected()) {
            this.clientObj[type].disconnect()
            this.clientObj[type] = null
        }
    }
}

Vue 组件中使用:

// 在大屏页面入口初始化
import MqttUtil from "@/util/mqttUtil"

setup() {
    let mqttUtil = new MqttUtil()
    
    onMounted(() => {
        // 初始化告警 MQTT 连接
        mqttUtil.getMqttInfoViews(1)
    })
    
    onUnmounted(() => {
        // 页面销毁时断开所有 MQTT 连接
        mqttUtil.closeClient()
    })
}

2.4 这种方案的优缺点

优点:

  • 架构简单,前端直连 Broker,延迟最低
  • 后端压力小,不参与消息转发
  • Paho 库成熟稳定,兼容性好

缺点:

  • MQTT 连接信息(账号密码)需要通过其他方式安全下发
  • 前端直接暴露在 MQTT Broker 面前,需要做好 Broker 的 ACL 权限控制
  • 安全性依赖 HTTPS/WSS 加密传输

三、方案二:后端代理转发(推荐生产环境使用)

在很多企业级项目中,不会让前端直接连 MQTT Broker,而是通过后端做一层代理

3.1 整体架构

[物联网设备] ──MQTT──> [MQTT Broker] <──TCP── [后端服务] <──WebSocket── [Vue 前端]
                                                      ↑
                                                  [认证/鉴权/过滤]

3.2 后端负责什么?

后端自己作为 MQTT 的客户端连接 Broker,然后起一个 WebSocket 服务暴露给前端。后端在这中间可以做:

  1. 统一的认证鉴权:验证前端 token,决定是否允许建立 WebSocket
  2. Topic 权限控制:不同角色的用户,只能订阅他们有权访问的 Topic
  3. 消息过滤和脱敏:敏感字段在后端处理后再下发
  4. 消息持久化:重要的消息写入数据库,方便追溯
  5. 消息聚合:合并多个设备的数据再推给前端,减少前端压力

3.3 后端伪代码(Node.js 示例)

// 后端 - Node.js + Express + mqtt.js + ws
const mqtt = require('mqtt')
const WebSocket = require('ws')
const jwt = require('jsonwebtoken')

// 连接 MQTT Broker
const mqttClient = mqtt.connect('mqtt://192.168.1.100:1883', {
    username: 'backend_service',
    password: 'xxx'
})

mqttClient.on('connect', () => {
    // 订阅所有设备数据
    mqttClient.subscribe('/device/#')
})

// 创建 WebSocket 服务
const wss = new WebSocket.Server({ port: 8080 })

wss.on('connection', (ws, req) => {
    // 从 URL 参数中取 token 鉴权
    const token = new URL(req.url, 'http://localhost').searchParams.get('token')
    
    try {
        const user = jwt.verify(token, 'SECRET_KEY')
        
        // 根据用户角色,决定其可订阅的 Topic
        const allowedTopics = getUserTopics(user.role)
        
        // 转发 MQTT 消息到前端
        const messageHandler = (topic, message) => {
            if (allowedTopics.some(t => topic.startsWith(t))) {
                // 可以在这里做数据脱敏
                const safeData = sanitizeData(JSON.parse(message.toString()))
                ws.send(JSON.stringify({ topic, data: safeData }))
            }
        }
        
        mqttClient.on('message', messageHandler)
        
        // 前端发来的消息(如控制指令),转发到 MQTT
        ws.on('message', (data) => {
            const { topic, payload } = JSON.parse(data)
            // 校验用户是否有权限往这个 Topic 发消息
            if (canPublish(user, topic)) {
                mqttClient.publish(topic, JSON.stringify(payload))
            }
        })
        
        ws.on('close', () => {
            mqttClient.removeListener('message', messageHandler)
        })
        
    } catch (e) {
        ws.close(4001, '认证失败')
    }
})

3.4 前端代码

// 前端只需要连后端 WebSocket,简单很多
class MqttProxyClient {
    constructor(token) {
        this.ws = new WebSocket(`ws://localhost:8080?token=${token}`)
        this.callbacks = {}
        
        this.ws.onmessage = (event) => {
            const { topic, data } = JSON.parse(event.data)
            // 触发对应的回调
            if (this.callbacks[topic]) {
                this.callbacks[topic].forEach(cb => cb(data))
            }
        }
        
        this.ws.onclose = () => {
            console.log('WebSocket 断开,5秒后重连')
            setTimeout(() => new MqttProxyClient(token), 5000)
        }
    }
    
    // 订阅主题
    subscribe(topic, callback) {
        if (!this.callbacks[topic]) {
            this.callbacks[topic] = []
            // 通知后端订阅
            this.ws.send(JSON.stringify({ action: 'subscribe', topic }))
        }
        this.callbacks[topic].push(callback)
    }
    
    // 发布消息(控制指令)
    publish(topic, payload) {
        this.ws.send(JSON.stringify({ action: 'publish', topic, payload }))
    }
    
    close() {
        this.ws.close()
    }
}

// 使用
const client = new MqttProxyClient(userToken)
client.subscribe('/building/floor1/temperature', (data) => {
    console.log('收到温度数据:', data)
})

这种方案是生产环境的首选,虽然多了一层转发,但换来了安全性和可控性。


四、方案三:使用 MQTT.js 库(纯前端方案)

除了 Paho,还有一个更现代的库:mqtt.js(npm 包名 mqtt)。

4.1 安装和使用

npm install mqtt
import mqtt from 'mqtt'

// 直接在浏览器中连接(mqtt.js 内置了 ws 支持)
const client = mqtt.connect('ws://192.168.1.100:8083/mqtt', {
    clientId: 'web_' + Math.random().toString(16).substr(2, 8),
    username: 'admin',
    password: '123456',
    keepalive: 30,
    clean: true
})

client.on('connect', () => {
    console.log('MQTT 已连接')
    
    // 订阅多个主题
    client.subscribe(['/sensor/temperature', '/sensor/humidity'], { qos: 1 })
})

client.on('message', (topic, message) => {
    // message 是 Buffer,需要转字符串
    const data = JSON.parse(message.toString())
    console.log(`收到消息 [${topic}]:`, data)
})

client.on('error', (err) => {
    console.error('MQTT 连接错误:', err)
})

client.on('close', () => {
    console.log('MQTT 连接已关闭')
})

// 发布消息
client.publish('/device/control', JSON.stringify({ cmd: 'open', deviceId: '001' }))

4.2 mqtt.js vs Paho.js 对比

对比维度 Paho.js mqtt.js
协议版本 MQTT 3.1 / 3.1.1 MQTT 3.1.1 / 5.0
包体积 较大(完整实现) 较小,按需引入
API 风格 回调风格 EventEmitter + Promise
社区活跃度 Eclipse 官方,稳定但不活跃 npm 社区活跃,更新频繁
TypeScript 需额外类型定义 自带类型定义

如果你的项目是全新开发的,我建议用 mqtt.js,API 更现代,也支持 MQTT 5.0。


五、方案四:EMQX + Vue 全家桶

EMQX 是目前最流行的开源 MQTT Broker,它内置了 WebSocket 支持和一个强大的规则引擎

5.1 EMQX 的优势

[设备] ──MQTT──> [EMQX Broker]
                    ├── WebSocket 直连前端
                    ├── 规则引擎 → 写入 MySQL/InfluxDB
                    ├── 规则引擎 → 转发到 Kafka
                    └── HTTP 认证 → 后端用户系统
  • 内置 WebSocket:开箱即用,不需要 Nginx 反代
  • HTTP 认证:连接时回调你的后端接口做鉴权
  • ACL 权限:细粒度控制哪个客户端能订阅哪些 Topic
  • 规则引擎:消息来了自动写数据库、转发 Kafka,后端开发量小很多

5.2 前端配合方式

跟前两种方案一样,你依然可以选择是直连 EMQX 还是通过后端代理。EMQX 的好处是它帮你解决了 Broker 层的安全和管理问题。

// 直连 EMQX(EMQX 默认 WebSocket 端口 8083)
const client = mqtt.connect('ws://emqx-server:8083/mqtt', {
    username: 'frontend_user',
    password: 'jwt_token_here',
    clientId: 'vue_app_' + Date.now()
})

重点:EMQX 的 HTTP 认证机制可以把 token 验证交给你的后端:

前端 --WebSocket连接(username=token)--> EMQX --HTTP回调--> 后端 /auth 接口 --> 返回允许/拒绝

六、在 Vue 组件中优雅地使用 MQTT

不管你用哪种方案,到了 Vue 组件层面,使用方式其实差不多。我总结几个最佳实践:

6.1 Composable 封装(Vue 3)

// composables/useMqtt.js
import { ref, onUnmounted, watch } from 'vue'
import mqtt from 'mqtt'

export function useMqtt(brokerUrl, options = {}) {
    const connected = ref(false)
    const messageMap = ref({})  // { topic: latestData }
    let client = null
    
    const connect = () => {
        client = mqtt.connect(brokerUrl, {
            keepalive: 30,
            ...options
        })
        
        client.on('connect', () => {
            connected.value = true
        })
        
        client.on('message', (topic, message) => {
            try {
                const data = JSON.parse(message.toString())
                messageMap.value[topic] = data
            } catch (e) {
                messageMap.value[topic] = message.toString()
            }
        })
        
        client.on('close', () => {
            connected.value = false
        })
    }
    
    const subscribe = (topic, qos = 1) => {
        if (client && connected.value) {
            client.subscribe(topic, { qos })
        }
    }
    
    const publish = (topic, payload, qos = 1) => {
        if (client && connected.value) {
            client.publish(topic, JSON.stringify(payload), { qos })
        }
    }
    
    const disconnect = () => {
        if (client) {
            client.end()
            client = null
        }
    }
    
    onUnmounted(() => {
        disconnect()
    })
    
    connect()
    
    return {
        connected,
        messageMap,
        subscribe,
        publish,
        disconnect
    }
}
<!-- 组件中使用 -->
<template>
  <div>
    <span :class="{ online: connected }">
      {{ connected ? '已连接' : '已断开' }}
    </span>
    <div>当前温度:{{ temperature }}°C</div>
  </div>
</template>

<script setup>
import { computed, watch } from 'vue'
import { useMqtt } from '@/composables/useMqtt'

const { connected, messageMap, subscribe } = useMqtt('ws://192.168.1.100:8083/mqtt', {
    username: 'admin',
    password: '123456'
})

// 组件挂载后订阅
subscribe('/sensor/+/temperature')

// 计算温度数据
const temperature = computed(() => {
    // 取最新的一条温度数据
    const topics = Object.keys(messageMap.value).filter(t => t.includes('temperature'))
    if (topics.length) return messageMap.value[topics[topics.length - 1]].value
    return '--'
})
</script>

6.2 配合 Vuex/Pinia 做全局状态

大屏项目通常有多个组件需要同一份 MQTT 数据,这时候最好走状态管理:

// store/mqtt.js (Pinia)
import { defineStore } from 'pinia'
import mqtt from 'mqtt'

export const useMqttStore = defineStore('mqtt', {
    state: () => ({
        connected: false,
        realtimeData: {},    // 实时数据
        alarms: [],          // 告警列表
        deviceStatus: {}     // 设备状态
    }),
    
    actions: {
        initConnection(config) {
            this.client = mqtt.connect(config.url, config.options)
            
            this.client.on('connect', () => {
                this.connected = true
                this.client.subscribe(['/alarm/#', '/data/#', '/status/#'])
            })
            
            this.client.on('message', (topic, message) => {
                const data = JSON.parse(message.toString())
                
                // 按 Topic 前缀分发给不同状态
                if (topic.startsWith('/alarm/')) {
                    this.alarms.unshift(data)
                } else if (topic.startsWith('/data/')) {
                    this.realtimeData[topic] = data
                } else if (topic.startsWith('/status/')) {
                    this.deviceStatus[topic.split('/')[2]] = data
                }
            })
            
            this.client.on('close', () => {
                this.connected = false
            })
        },
        
        sendCommand(deviceId, command) {
            if (this.client && this.connected) {
                this.client.publish(`/cmd/${deviceId}`, JSON.stringify(command))
            }
        },
        
        disconnect() {
            this.client?.end()
        }
    }
})

七、前后端配合的几种典型场景

场景1:设备告警实时推送

这是最常见的场景,设备产生告警后,前端要弹窗或播放声音。

设备 --告警消息--> MQTT Broker --> 前端接收 --> 弹窗/声音/列表更新

关键点:前端收到消息后,一般通过 vuex.commit 来驱动界面变化。多个浏览器 Tab 页可以用 BroadcastChannel 同步告警状态。

场景2:远程控制设备

前端点击"开/关",指令通过 MQTT 下发到设备:

前端 publish --> MQTT Topic --> 设备收到指令 --> 执行 --> 回传状态 --> 前端更新状态

这里有个重要细节:先乐观更新 UI(界面立刻显示开关状态),等设备回执后再修正。这样用户感觉响应很快。

场景3:大屏实时数据展示

大屏上几十个图表都在刷新,如果每个图表都建一条 MQTT 连接,太浪费了。正确做法是:

一条 MQTT 连接 --> 订阅多个 Topic --> 收到消息后分发到 Vuex --> 各图表 watch 对应 state

八、总结:方案怎么选?

场景 推荐方案
内部工具 / 演示项目 方案一:Paho.js 直连 Broker
新项目 / 需要 MQTT 5.0 方案一(升级版):mqtt.js 直连
生产环境 / 有安全要求 方案二:后端 WebSocket 代理转发
设备量大 / 数据需要持久化 方案四:EMQX + 规则引擎
Vue 3 项目 组合式 API(composable)封装
Vue 2 项目 Mixin 或 原型挂载 + Vuex

最后提醒几个容易踩的坑:

  1. HTTPS 页面必须用 WSS(WebSocket Secure),不能用 WS,浏览器会直接拦截
  2. clientId 必须唯一,多个页面同时打开要用不同 clientId,否则会互相踢下线
  3. 页面销毁时一定要断开连接,不然会内存泄漏,时间长了页面卡死
  4. QoS 不要盲目设 2,大部分场景 QoS 1 就够了,2 会增加很多开销
  5. 大屏长时间运行要处理断线重连,MQTT 连接不是一劳永逸的

好了,以上就是前端接入 MQTT 的完整实战经验。从公司的智慧园区大屏项目提炼出来的,希望对你有帮助。有问题欢迎评论区交流。


本文代码基于实际生产项目简化,完整项目包含了多类型 Broker 适配(RabbitMQ MQTT 插件 / EMQX / Mosquitto)、AES 配置加密、BroadcastChannel 多 Tab 同步、防抖处理等生产级细节。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐