Vue + MQTT 实时消息推送,前端开发必会的几种实战方案
Vue + MQTT 实时消息推送,前端开发必会的几种实战方案
前言
干前端的朋友应该都遇到过这种需求:页面上的数据要实时刷新,比如大屏监控、设备状态、告警通知这些场景。后端不可能让你每隔几秒轮询一次接口,那样服务器扛不住,用户体验也差。
这时候就需要实时推送技术了。常见的方案有 WebSocket、SSE、Socket.IO,但在物联网(IoT)、工业监控、智慧园区这类场景下,MQTT 才是主流选择。今天我就结合公司实际项目,把前端接入 MQTT 的几种做法给你讲清楚。
一、MQTT 是什么?
简单理解,MQTT(Message Queuing Telemetry Transport)是一种发布/订阅模式的轻量级消息协议。它有三个核心角色:
- Publisher(发布者):发送消息的设备或服务,比如一个温度传感器
- Broker(消息中间件):消息的中转站,比如 EMQX、Mosquitto、RabbitMQ
- Subscriber(订阅者):接收消息的一方,比如你的前端页面
工作方式也很直观:发布者往某个 Topic(主题) 发消息,所有订阅了这个 Topic 的订阅者都能收到。Topic 是分层的,比如 /building/floor1/temperature,你可以用通配符批量订阅。
为什么前端要用 MQTT 而不是普通 WebSocket?
- MQTT 本身就是基于 TCP 的,在浏览器端通过 WebSocket 传输,天生支持双向通信
- 自带 QoS(服务质量等级),0/1/2 三档可选,保证消息不丢不重
- 自带心跳保活和断线重连机制
- Topic 机制非常灵活,一条连接可以订阅多个主题
- 物联网设备几乎都支持 MQTT,前后端统一协议
二、方案一:前端直连 MQTT Broker(Paho.js)
这是最直接的做法,也是我们公司项目中实际采用的方案。前端通过 Paho MQTT JavaScript 库,直接用 WebSocket 连到 MQTT Broker。
2.1 整体架构
[物联网设备] ──MQTT──> [MQTT Broker] <──WebSocket── [Vue 前端页面]
↑
[后端服务] ── 提供 MQTT 连接配置(host/port/topic/账号密码)
2.2 后端做什么?
后端不参与消息转发,只负责提供连接配置。前端先调一个接口拿到配置信息:
// src/api/mqtt.js
// 后端接口:返回 MQTT 的连接参数
export function mqttInfoViews(data) {
return request({
url: '/Setting/mqttInfoViews',
method: 'get',
params: { moduleType: data.moduleType }
})
}
后端返回的数据大致长这样(经过 AES 解密后):
[
{
"id": 1,
"host": "192.168.1.100",
"port": 8083,
"clientId": "web_client_001",
"userName": "admin",
"passWord": "123456",
"topic": "/alarm/",
"qos": 1,
"messageType": 3,
"moduleType": 1
}
]
这样做的好处是:MQTT 的地址、账号密码都存后端,前端不硬编码,安全且灵活。
2.3 前端核心代码
封装连接工具函数:
// src/util/mqttConnect.js
import Paho from '@/assets/js/mqtt'
export function mqttConnect(config, params, onMessageArrived, onConnectionLost) {
if (!config) return
let { host, port, clientId, userName, passWord, qos, topic } = config
let isHttps = window.location.href.indexOf('https') !== -1
// 根据消息类型调整端口(适配 RabbitMQ / EMQX 等不同 Broker)
if (config.messageType == 3) {
port = isHttps ? 8084 : 8083 // WebSocket 端口
}
if (!host || !port) {
console.log('mqtt配置参数不完整')
return null
}
// 创建 MQTT 客户端
let client = new Paho.MQTT.Client(host, Number(port), clientId)
let options = {
userName: userName,
password: passWord,
useSSL: isHttps ? true : false,
keepAliveInterval: 30, // 心跳间隔 30 秒
onSuccess: () => {
// 连接成功后订阅 Topic
// 支持单个和批量订阅
if (params.indexOf(',') !== -1) {
let arr = params.split(',')
arr.forEach(item => {
client.subscribe(topic + item, { qos: Number(qos) })
})
} else {
client.subscribe(topic + params, { qos: Number(qos) })
}
},
onFailure: (e) => {
console.log('连接失败,15秒后重试')
setTimeout(() => {
mqttConnect(config, params, onMessageArrived, onConnectionLost)
}, 15000)
}
}
client.connect(options)
// 掉线回调
client.onConnectionLost = (e) => {
console.log('连接断开:', e)
onConnectionLost && onConnectionLost()
}
// 接收消息回调
client.onMessageArrived = onMessageArrived
return client
}
封装管理类(单例模式管理多连接):
// src/util/mqttUtil.js
import { mqttInfoViews } from "@/api/mqtt"
import { mqttConnect } from "@/util/mqttConnect"
import store from '@/vuex'
class MqttUtil {
constructor() {
this.clientObj = {} // 存储所有 MQTT 连接实例
}
// 从后端获取配置并建立连接
async getMqttInfoViews(type, cb) {
let data = await mqttInfoViews({ moduleType: type })
// AES 解密配置
let newData = JSON.parse(decryptAes(data, dayjs().format("YYYYMMDD") + 'ABCDEFGHIJKLMN0123456789'))
newData.forEach((config) => {
if (this.clientObj[config.id]) this.closeClient(config.id)
this.createClient(config, type, cb)
})
}
// 创建单个 MQTT 连接
createClient(config, type, successCB) {
let client = mqttConnect(
config,
config.pid,
(res) => {
// 收到消息,解析 payload
const payload = JSON.parse(res.payloadString)
// 根据模块类型分发到不同的 Vuex Store
if (config.moduleType == 1) {
// 告警消息
store.commit("isGetAlarm", new Date().getTime())
} else if (config.moduleType == 10) {
// 实时数据
store.commit("yzSgccMqttRealtimeData", payload)
} else if (config.moduleType == 11) {
// 控制指令回执
store.commit("controlData", payload)
} else {
successCB && successCB(res, type)
}
},
(error) => {
// 断线后定时重连
setInterval(() => {
this.getMqttInfoViews(config.moduleType)
}, 30000)
}
)
if (client) {
this.clientObj[config.id] = client
}
}
// 关闭连接
closeClient(type) {
if (this.clientObj[type] && this.clientObj[type].isConnected()) {
this.clientObj[type].disconnect()
this.clientObj[type] = null
}
}
}
Vue 组件中使用:
// 在大屏页面入口初始化
import MqttUtil from "@/util/mqttUtil"
setup() {
let mqttUtil = new MqttUtil()
onMounted(() => {
// 初始化告警 MQTT 连接
mqttUtil.getMqttInfoViews(1)
})
onUnmounted(() => {
// 页面销毁时断开所有 MQTT 连接
mqttUtil.closeClient()
})
}
2.4 这种方案的优缺点
优点:
- 架构简单,前端直连 Broker,延迟最低
- 后端压力小,不参与消息转发
- Paho 库成熟稳定,兼容性好
缺点:
- MQTT 连接信息(账号密码)需要通过其他方式安全下发
- 前端直接暴露在 MQTT Broker 面前,需要做好 Broker 的 ACL 权限控制
- 安全性依赖 HTTPS/WSS 加密传输
三、方案二:后端代理转发(推荐生产环境使用)
在很多企业级项目中,不会让前端直接连 MQTT Broker,而是通过后端做一层代理。
3.1 整体架构
[物联网设备] ──MQTT──> [MQTT Broker] <──TCP── [后端服务] <──WebSocket── [Vue 前端]
↑
[认证/鉴权/过滤]
3.2 后端负责什么?
后端自己作为 MQTT 的客户端连接 Broker,然后起一个 WebSocket 服务暴露给前端。后端在这中间可以做:
- 统一的认证鉴权:验证前端 token,决定是否允许建立 WebSocket
- Topic 权限控制:不同角色的用户,只能订阅他们有权访问的 Topic
- 消息过滤和脱敏:敏感字段在后端处理后再下发
- 消息持久化:重要的消息写入数据库,方便追溯
- 消息聚合:合并多个设备的数据再推给前端,减少前端压力
3.3 后端伪代码(Node.js 示例)
// 后端 - Node.js + Express + mqtt.js + ws
const mqtt = require('mqtt')
const WebSocket = require('ws')
const jwt = require('jsonwebtoken')
// 连接 MQTT Broker
const mqttClient = mqtt.connect('mqtt://192.168.1.100:1883', {
username: 'backend_service',
password: 'xxx'
})
mqttClient.on('connect', () => {
// 订阅所有设备数据
mqttClient.subscribe('/device/#')
})
// 创建 WebSocket 服务
const wss = new WebSocket.Server({ port: 8080 })
wss.on('connection', (ws, req) => {
// 从 URL 参数中取 token 鉴权
const token = new URL(req.url, 'http://localhost').searchParams.get('token')
try {
const user = jwt.verify(token, 'SECRET_KEY')
// 根据用户角色,决定其可订阅的 Topic
const allowedTopics = getUserTopics(user.role)
// 转发 MQTT 消息到前端
const messageHandler = (topic, message) => {
if (allowedTopics.some(t => topic.startsWith(t))) {
// 可以在这里做数据脱敏
const safeData = sanitizeData(JSON.parse(message.toString()))
ws.send(JSON.stringify({ topic, data: safeData }))
}
}
mqttClient.on('message', messageHandler)
// 前端发来的消息(如控制指令),转发到 MQTT
ws.on('message', (data) => {
const { topic, payload } = JSON.parse(data)
// 校验用户是否有权限往这个 Topic 发消息
if (canPublish(user, topic)) {
mqttClient.publish(topic, JSON.stringify(payload))
}
})
ws.on('close', () => {
mqttClient.removeListener('message', messageHandler)
})
} catch (e) {
ws.close(4001, '认证失败')
}
})
3.4 前端代码
// 前端只需要连后端 WebSocket,简单很多
class MqttProxyClient {
constructor(token) {
this.ws = new WebSocket(`ws://localhost:8080?token=${token}`)
this.callbacks = {}
this.ws.onmessage = (event) => {
const { topic, data } = JSON.parse(event.data)
// 触发对应的回调
if (this.callbacks[topic]) {
this.callbacks[topic].forEach(cb => cb(data))
}
}
this.ws.onclose = () => {
console.log('WebSocket 断开,5秒后重连')
setTimeout(() => new MqttProxyClient(token), 5000)
}
}
// 订阅主题
subscribe(topic, callback) {
if (!this.callbacks[topic]) {
this.callbacks[topic] = []
// 通知后端订阅
this.ws.send(JSON.stringify({ action: 'subscribe', topic }))
}
this.callbacks[topic].push(callback)
}
// 发布消息(控制指令)
publish(topic, payload) {
this.ws.send(JSON.stringify({ action: 'publish', topic, payload }))
}
close() {
this.ws.close()
}
}
// 使用
const client = new MqttProxyClient(userToken)
client.subscribe('/building/floor1/temperature', (data) => {
console.log('收到温度数据:', data)
})
这种方案是生产环境的首选,虽然多了一层转发,但换来了安全性和可控性。
四、方案三:使用 MQTT.js 库(纯前端方案)
除了 Paho,还有一个更现代的库:mqtt.js(npm 包名 mqtt)。
4.1 安装和使用
npm install mqtt
import mqtt from 'mqtt'
// 直接在浏览器中连接(mqtt.js 内置了 ws 支持)
const client = mqtt.connect('ws://192.168.1.100:8083/mqtt', {
clientId: 'web_' + Math.random().toString(16).substr(2, 8),
username: 'admin',
password: '123456',
keepalive: 30,
clean: true
})
client.on('connect', () => {
console.log('MQTT 已连接')
// 订阅多个主题
client.subscribe(['/sensor/temperature', '/sensor/humidity'], { qos: 1 })
})
client.on('message', (topic, message) => {
// message 是 Buffer,需要转字符串
const data = JSON.parse(message.toString())
console.log(`收到消息 [${topic}]:`, data)
})
client.on('error', (err) => {
console.error('MQTT 连接错误:', err)
})
client.on('close', () => {
console.log('MQTT 连接已关闭')
})
// 发布消息
client.publish('/device/control', JSON.stringify({ cmd: 'open', deviceId: '001' }))
4.2 mqtt.js vs Paho.js 对比
| 对比维度 | Paho.js | mqtt.js |
|---|---|---|
| 协议版本 | MQTT 3.1 / 3.1.1 | MQTT 3.1.1 / 5.0 |
| 包体积 | 较大(完整实现) | 较小,按需引入 |
| API 风格 | 回调风格 | EventEmitter + Promise |
| 社区活跃度 | Eclipse 官方,稳定但不活跃 | npm 社区活跃,更新频繁 |
| TypeScript | 需额外类型定义 | 自带类型定义 |
如果你的项目是全新开发的,我建议用 mqtt.js,API 更现代,也支持 MQTT 5.0。
五、方案四:EMQX + Vue 全家桶
EMQX 是目前最流行的开源 MQTT Broker,它内置了 WebSocket 支持和一个强大的规则引擎。
5.1 EMQX 的优势
[设备] ──MQTT──> [EMQX Broker]
├── WebSocket 直连前端
├── 规则引擎 → 写入 MySQL/InfluxDB
├── 规则引擎 → 转发到 Kafka
└── HTTP 认证 → 后端用户系统
- 内置 WebSocket:开箱即用,不需要 Nginx 反代
- HTTP 认证:连接时回调你的后端接口做鉴权
- ACL 权限:细粒度控制哪个客户端能订阅哪些 Topic
- 规则引擎:消息来了自动写数据库、转发 Kafka,后端开发量小很多
5.2 前端配合方式
跟前两种方案一样,你依然可以选择是直连 EMQX 还是通过后端代理。EMQX 的好处是它帮你解决了 Broker 层的安全和管理问题。
// 直连 EMQX(EMQX 默认 WebSocket 端口 8083)
const client = mqtt.connect('ws://emqx-server:8083/mqtt', {
username: 'frontend_user',
password: 'jwt_token_here',
clientId: 'vue_app_' + Date.now()
})
重点:EMQX 的 HTTP 认证机制可以把 token 验证交给你的后端:
前端 --WebSocket连接(username=token)--> EMQX --HTTP回调--> 后端 /auth 接口 --> 返回允许/拒绝
六、在 Vue 组件中优雅地使用 MQTT
不管你用哪种方案,到了 Vue 组件层面,使用方式其实差不多。我总结几个最佳实践:
6.1 Composable 封装(Vue 3)
// composables/useMqtt.js
import { ref, onUnmounted, watch } from 'vue'
import mqtt from 'mqtt'
export function useMqtt(brokerUrl, options = {}) {
const connected = ref(false)
const messageMap = ref({}) // { topic: latestData }
let client = null
const connect = () => {
client = mqtt.connect(brokerUrl, {
keepalive: 30,
...options
})
client.on('connect', () => {
connected.value = true
})
client.on('message', (topic, message) => {
try {
const data = JSON.parse(message.toString())
messageMap.value[topic] = data
} catch (e) {
messageMap.value[topic] = message.toString()
}
})
client.on('close', () => {
connected.value = false
})
}
const subscribe = (topic, qos = 1) => {
if (client && connected.value) {
client.subscribe(topic, { qos })
}
}
const publish = (topic, payload, qos = 1) => {
if (client && connected.value) {
client.publish(topic, JSON.stringify(payload), { qos })
}
}
const disconnect = () => {
if (client) {
client.end()
client = null
}
}
onUnmounted(() => {
disconnect()
})
connect()
return {
connected,
messageMap,
subscribe,
publish,
disconnect
}
}
<!-- 组件中使用 -->
<template>
<div>
<span :class="{ online: connected }">
{{ connected ? '已连接' : '已断开' }}
</span>
<div>当前温度:{{ temperature }}°C</div>
</div>
</template>
<script setup>
import { computed, watch } from 'vue'
import { useMqtt } from '@/composables/useMqtt'
const { connected, messageMap, subscribe } = useMqtt('ws://192.168.1.100:8083/mqtt', {
username: 'admin',
password: '123456'
})
// 组件挂载后订阅
subscribe('/sensor/+/temperature')
// 计算温度数据
const temperature = computed(() => {
// 取最新的一条温度数据
const topics = Object.keys(messageMap.value).filter(t => t.includes('temperature'))
if (topics.length) return messageMap.value[topics[topics.length - 1]].value
return '--'
})
</script>
6.2 配合 Vuex/Pinia 做全局状态
大屏项目通常有多个组件需要同一份 MQTT 数据,这时候最好走状态管理:
// store/mqtt.js (Pinia)
import { defineStore } from 'pinia'
import mqtt from 'mqtt'
export const useMqttStore = defineStore('mqtt', {
state: () => ({
connected: false,
realtimeData: {}, // 实时数据
alarms: [], // 告警列表
deviceStatus: {} // 设备状态
}),
actions: {
initConnection(config) {
this.client = mqtt.connect(config.url, config.options)
this.client.on('connect', () => {
this.connected = true
this.client.subscribe(['/alarm/#', '/data/#', '/status/#'])
})
this.client.on('message', (topic, message) => {
const data = JSON.parse(message.toString())
// 按 Topic 前缀分发给不同状态
if (topic.startsWith('/alarm/')) {
this.alarms.unshift(data)
} else if (topic.startsWith('/data/')) {
this.realtimeData[topic] = data
} else if (topic.startsWith('/status/')) {
this.deviceStatus[topic.split('/')[2]] = data
}
})
this.client.on('close', () => {
this.connected = false
})
},
sendCommand(deviceId, command) {
if (this.client && this.connected) {
this.client.publish(`/cmd/${deviceId}`, JSON.stringify(command))
}
},
disconnect() {
this.client?.end()
}
}
})
七、前后端配合的几种典型场景
场景1:设备告警实时推送
这是最常见的场景,设备产生告警后,前端要弹窗或播放声音。
设备 --告警消息--> MQTT Broker --> 前端接收 --> 弹窗/声音/列表更新
关键点:前端收到消息后,一般通过 vuex.commit 来驱动界面变化。多个浏览器 Tab 页可以用 BroadcastChannel 同步告警状态。
场景2:远程控制设备
前端点击"开/关",指令通过 MQTT 下发到设备:
前端 publish --> MQTT Topic --> 设备收到指令 --> 执行 --> 回传状态 --> 前端更新状态
这里有个重要细节:先乐观更新 UI(界面立刻显示开关状态),等设备回执后再修正。这样用户感觉响应很快。
场景3:大屏实时数据展示
大屏上几十个图表都在刷新,如果每个图表都建一条 MQTT 连接,太浪费了。正确做法是:
一条 MQTT 连接 --> 订阅多个 Topic --> 收到消息后分发到 Vuex --> 各图表 watch 对应 state
八、总结:方案怎么选?
| 场景 | 推荐方案 |
|---|---|
| 内部工具 / 演示项目 | 方案一:Paho.js 直连 Broker |
| 新项目 / 需要 MQTT 5.0 | 方案一(升级版):mqtt.js 直连 |
| 生产环境 / 有安全要求 | 方案二:后端 WebSocket 代理转发 |
| 设备量大 / 数据需要持久化 | 方案四:EMQX + 规则引擎 |
| Vue 3 项目 | 组合式 API(composable)封装 |
| Vue 2 项目 | Mixin 或 原型挂载 + Vuex |
最后提醒几个容易踩的坑:
- HTTPS 页面必须用 WSS(WebSocket Secure),不能用 WS,浏览器会直接拦截
- clientId 必须唯一,多个页面同时打开要用不同 clientId,否则会互相踢下线
- 页面销毁时一定要断开连接,不然会内存泄漏,时间长了页面卡死
- QoS 不要盲目设 2,大部分场景 QoS 1 就够了,2 会增加很多开销
- 大屏长时间运行要处理断线重连,MQTT 连接不是一劳永逸的
好了,以上就是前端接入 MQTT 的完整实战经验。从公司的智慧园区大屏项目提炼出来的,希望对你有帮助。有问题欢迎评论区交流。
本文代码基于实际生产项目简化,完整项目包含了多类型 Broker 适配(RabbitMQ MQTT 插件 / EMQX / Mosquitto)、AES 配置加密、BroadcastChannel 多 Tab 同步、防抖处理等生产级细节。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)