WebSocket心跳与重连:HarmonyOS开发中连接保活策略

让你的长连接像小强一样顽强,断线重连、永不离线

一、为什么需要心跳和重连?

WebSocket虽然建立了持久连接,但这个"持久"并不等于"永久"。网络环境复杂多变,连接随时可能中断:

网络切换:用户从WiFi切到4G,或者走进电梯信号丢失,底层TCP连接断开,但应用层可能还没感知到。

运营商超时:某些运营商会对长时间无数据传输的连接进行清理,NAT映射也有超时时间,通常是几分钟。

服务器重启:服务器部署更新、崩溃重启,客户端连接被强制断开。

客户端休眠:移动设备进入休眠模式,网络被系统暂停,唤醒后连接已失效。

如果没有心跳机制,客户端可能长时间不知道连接已断,等到真正需要发消息时才发现——用户体验极差。

如果没有重连机制,连接断开后需要用户手动刷新页面才能恢复——用户会以为应用出bug了。

心跳和重连,就是让WebSocket连接"死而复生"的关键机制。

二、核心原理:心跳检测与重连策略

2.1 心跳机制工作流程

WebSocket连接建立

启动心跳定时器

等待心跳间隔

连接正常?

发送Ping帧

等待Pong响应

收到Pong?

重置心跳计时

未响应计数+1

超过阈值?

判定连接断开

触发重连流程

重连成功?

达到重试上限?

等待退避时间

连接失败

2.2 心跳策略对比

策略 实现方式 优点 缺点 适用场景
Ping/Pong帧 WebSocket协议原生支持 标准化、开销小 部分服务器不支持 推荐首选
应用层心跳 发送自定义消息 灵活可控 需要服务器配合 通用方案
双向心跳 客户端和服务器都发 检测更可靠 实现复杂 高可用场景

2.3 重连策略设计

重连不是简单的"失败了再连",需要考虑:

退避策略:立即重连可能失败(服务器还在重启),等待时间逐渐增加。

抖动因子:大量客户端同时重连会造成"惊群效应",加入随机抖动分散压力。

重试上限:无限重连浪费资源,达到上限后应通知用户或降级处理。

状态恢复:重连成功后,需要恢复之前的状态(如订阅关系、会话信息)。

三、代码实战:完整实现方案

场景一:标准心跳检测实现

实现一个健壮的心跳机制,支持Ping/Pong和应用层心跳两种模式。

import webSocket from '@ohos.net.webSocket';
import { BusinessError } from '@ohos.base';

// 心跳配置
interface HeartbeatConfig {
  interval: number;           // 心跳间隔(毫秒)
  timeout: number;            // 响应超时(毫秒)
  maxMissed: number;          // 最大未响应次数
  mode: 'ping_pong' | 'app_level';  // 心跳模式
  appLevelPing?: string;      // 应用层心跳消息
  appLevelPong?: string;      // 应用层心跳响应
}

// 默认心跳配置
const DEFAULT_HEARTBEAT_CONFIG: HeartbeatConfig = {
  interval: 30000,    // 30秒发送一次心跳
  timeout: 5000,      // 5秒内等待响应
  maxMissed: 3,       // 连续3次未响应判定断开
  mode: 'app_level',
  appLevelPing: '{"type":"ping"}',
  appLevelPong: '{"type":"pong"}'
};

// 带心跳的WebSocket客户端
class HeartbeatWebSocket {
  private ws: webSocket.WebSocket | null = null;
  private config: HeartbeatConfig;
  private url: string;
  
  // 心跳状态
  private heartbeatTimer: number = -1;
  private timeoutTimer: number = -1;
  private missedCount: number = 0;
  private lastPongTime: number = 0;
  
  // 连接状态
  private isConnected: boolean = false;
  private isReconnecting: boolean = false;
  
  // 回调
  private onMessage: ((data: any) => void) | null = null;
  private onConnectionChange: ((connected: boolean) => void) | null = null;
  private onHeartbeatMissed: ((count: number) => void) | null = null;
  
  constructor(url: string, config: Partial<HeartbeatConfig> = {}) {
    this.url = url;
    this.config = { ...DEFAULT_HEARTBEAT_CONFIG, ...config };
  }
  
  // 连接服务器
  async connect(): Promise<boolean> {
    if (this.isConnected) {
      console.warn('已连接,无需重复连接');
      return true;
    }
  
    this.ws = webSocket.createWebSocket();
  
    try {
      // 连接服务器
      await this.ws.connect(this.url);
    
      this.isConnected = true;
      this.missedCount = 0;
    
      // 设置消息监听
      this.setupMessageListener();
    
      // 启动心跳
      this.startHeartbeat();
    
      // 通知连接状态
      this.onConnectionChange?.(true);
    
      console.info('WebSocket连接成功');
      return true;
    
    } catch (error) {
      console.error('WebSocket连接失败:', error);
      this.ws = null;
      return false;
    }
  }
  
  // 设置消息监听
  private setupMessageListener(): void {
    if (!this.ws) return;
  
    this.ws.on('message', (err: BusinessError, value: string | ArrayBuffer) => {
      if (err) {
        console.error('消息接收错误:', err.message);
        return;
      }
    
      // 处理心跳响应
      if (this.config.mode === 'app_level') {
        let message = typeof value === 'string' ? value : new TextDecoder().decode(value);
      
        if (message === this.config.appLevelPong) {
          // 收到心跳响应
          this.handlePong();
          return;
        }
      }
    
      // 其他消息交给业务处理
      try {
        let data = typeof value === 'string' ? JSON.parse(value) : value;
        this.onMessage?.(data);
      } catch (error) {
        console.error('消息解析失败:', error);
      }
    });
  
    // 监听Pong帧(Ping/Pong模式)
    if (this.config.mode === 'ping_pong') {
      this.ws.on('pong', (err: BusinessError) => {
        if (!err) {
          this.handlePong();
        }
      });
    }
  
    // 监听连接关闭
    this.ws.on('close', (err: BusinessError, value: webSocket.CloseResult) => {
      console.info(`连接关闭: code=${value.code}, reason=${value.reason}`);
      this.handleDisconnect();
    });
  
    // 监听错误
    this.ws.on('error', (err: BusinessError) => {
      console.error('连接错误:', err.message);
      this.handleDisconnect();
    });
  }
  
  // 启动心跳
  private startHeartbeat(): void {
    this.stopHeartbeat();  // 先清理旧的
  
    console.info('启动心跳检测');
  
    this.heartbeatTimer = setInterval(() => {
      this.sendPing();
    }, this.config.interval);
  
    // 立即发送一次
    this.sendPing();
  }
  
  // 停止心跳
  private stopHeartbeat(): void {
    if (this.heartbeatTimer !== -1) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = -1;
    }
  
    if (this.timeoutTimer !== -1) {
      clearTimeout(this.timeoutTimer);
      this.timeoutTimer = -1;
    }
  }
  
  // 发送心跳
  private async sendPing(): Promise<void> {
    if (!this.ws || !this.isConnected) {
      return;
    }
  
    try {
      if (this.config.mode === 'ping_pong') {
        // 发送Ping帧
        await this.ws.ping();
      } else {
        // 发送应用层心跳
        await this.ws.send(this.config.appLevelPing || '{"type":"ping"}');
      }
    
      // 设置响应超时
      this.timeoutTimer = setTimeout(() => {
        this.handlePingTimeout();
      }, this.config.timeout);
    
    } catch (error) {
      console.error('发送心跳失败:', error);
      this.handleDisconnect();
    }
  }
  
  // 处理Pong响应
  private handlePong(): void {
    // 清除超时计时器
    if (this.timeoutTimer !== -1) {
      clearTimeout(this.timeoutTimer);
      this.timeoutTimer = -1;
    }
  
    // 重置未响应计数
    this.missedCount = 0;
    this.lastPongTime = Date.now();
  
    console.debug('心跳响应正常');
  }
  
  // 处理心跳超时
  private handlePingTimeout(): void {
    this.missedCount++;
  
    console.warn(`心跳未响应 (${this.missedCount}/${this.config.maxMissed})`);
  
    // 通知心跳丢失
    this.onHeartbeatMissed?.(this.missedCount);
  
    // 超过阈值,判定连接断开
    if (this.missedCount >= this.config.maxMissed) {
      console.error('心跳连续未响应,判定连接断开');
      this.handleDisconnect();
    }
  }
  
  // 处理连接断开
  private handleDisconnect(): void {
    if (!this.isConnected) {
      return;
    }
  
    this.isConnected = false;
    this.stopHeartbeat();
    this.onConnectionChange?.(false);
  
    console.info('连接已断开');
  }
  
  // 发送消息
  async send(data: string | ArrayBuffer): Promise<boolean> {
    if (!this.ws || !this.isConnected) {
      console.error('连接未建立,无法发送');
      return false;
    }
  
    try {
      await this.ws.send(data);
      return true;
    } catch (error) {
      console.error('发送失败:', error);
      return false;
    }
  }
  
  // 关闭连接
  async close(): Promise<void> {
    this.stopHeartbeat();
  
    if (this.ws) {
      try {
        await this.ws.close(1000, 'Client closing');
      } catch (error) {
        console.error('关闭连接失败:', error);
      }
      this.ws = null;
    }
  
    this.isConnected = false;
  }
  
  // 设置回调
  setOnMessage(callback: (data: any) => void): void {
    this.onMessage = callback;
  }
  
  setOnConnectionChange(callback: (connected: boolean) => void): void {
    this.onConnectionChange = callback;
  }
  
  setOnHeartbeatMissed(callback: (count: number) => void): void {
    this.onHeartbeatMissed = callback;
  }
  
  // 获取连接状态
  getConnectionStatus(): boolean {
    return this.isConnected;
  }
  
  // 获取心跳统计
  getHeartbeatStats(): { missedCount: number; lastPongTime: number } {
    return {
      missedCount: this.missedCount,
      lastPongTime: this.lastPongTime
    };
  }
}

场景二:智能重连机制

实现指数退避加随机抖动的重连策略。

import webSocket from '@ohos.net.webSocket';

// 重连配置
interface ReconnectConfig {
  maxAttempts: number;        // 最大重试次数
  initialDelay: number;       // 初始延迟(毫秒)
  maxDelay: number;           // 最大延迟(毫秒)
  backoffFactor: number;      // 退避因子
  jitterFactor: number;       // 抖动因子(0-1)
  resetAfter: number;         // 成功连接多久后重置重试计数(毫秒)
}

// 默认重连配置
const DEFAULT_RECONNECT_CONFIG: ReconnectConfig = {
  maxAttempts: 10,
  initialDelay: 1000,
  maxDelay: 30000,
  backoffFactor: 2,
  jitterFactor: 0.3,
  resetAfter: 60000  // 连接稳定1分钟后重置
};

// 带重连的WebSocket客户端
class ReconnectWebSocket {
  private ws: webSocket.WebSocket | null = null;
  private url: string;
  private reconnectConfig: ReconnectConfig;
  
  // 重连状态
  private reconnectAttempts: number = 0;
  private reconnectTimer: number = -1;
  private stableTimer: number = -1;
  private isReconnecting: boolean = false;
  
  // 连接状态
  private isConnected: boolean = false;
  
  // 回调
  private onMessage: ((data: any) => void) | null = null;
  private onConnectionChange: ((connected: boolean) => void) | null = null;
  private onReconnectAttempt: ((attempt: number, delay: number) => void) | null = null;
  private onReconnectFailed: (() => void) | null = null;
  
  // 状态恢复回调
  private onStateRestore: (() => Promise<void>) | null = null;
  
  constructor(url: string, config: Partial<ReconnectConfig> = {}) {
    this.url = url;
    this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
  }
  
  // 连接服务器
  async connect(): Promise<boolean> {
    // 清理之前的连接
    await this.cleanup();
  
    this.ws = webSocket.createWebSocket();
  
    try {
      await this.ws.connect(this.url);
    
      this.isConnected = true;
      this.isReconnecting = false;
    
      // 设置监听
      this.setupListeners();
    
      // 通知连接成功
      this.onConnectionChange?.(true);
    
      // 启动稳定计时器
      this.startStableTimer();
    
      console.info('WebSocket连接成功');
      return true;
    
    } catch (error) {
      console.error('WebSocket连接失败:', error);
    
      // 尝试重连
      this.scheduleReconnect();
    
      return false;
    }
  }
  
  // 设置监听
  private setupListeners(): void {
    if (!this.ws) return;
  
    this.ws.on('message', (err, value) => {
      if (err) return;
    
      try {
        let data = typeof value === 'string' ? JSON.parse(value) : value;
        this.onMessage?.(data);
      } catch (error) {
        // 忽略解析错误
      }
    });
  
    this.ws.on('close', (err, value) => {
      console.info(`连接关闭: code=${value.code}`);
      this.handleDisconnect();
    });
  
    this.ws.on('error', (err) => {
      console.error('连接错误:', err.message);
      this.handleDisconnect();
    });
  }
  
  // 处理断开
  private handleDisconnect(): void {
    if (!this.isConnected) {
      return;
    }
  
    this.isConnected = false;
    this.stopStableTimer();
    this.onConnectionChange?.(false);
  
    // 尝试重连
    this.scheduleReconnect();
  }
  
  // 调度重连
  private scheduleReconnect(): void {
    if (this.isReconnecting) {
      return;
    }
  
    // 检查重试次数
    if (this.reconnectAttempts >= this.reconnectConfig.maxAttempts) {
      console.error('达到最大重试次数,停止重连');
      this.onReconnectFailed?.();
      return;
    }
  
    this.isReconnecting = true;
    this.reconnectAttempts++;
  
    // 计算延迟时间
    let delay = this.calculateDelay();
  
    console.info(`将在 ${delay}ms 后进行第 ${this.reconnectAttempts} 次重连`);
  
    // 通知重连尝试
    this.onReconnectAttempt?.(this.reconnectAttempts, delay);
  
    // 设置重连定时器
    this.reconnectTimer = setTimeout(async () => {
      console.info(`开始第 ${this.reconnectAttempts} 次重连`);
    
      let success = await this.connect();
    
      if (success) {
        // 重连成功,恢复状态
        await this.restoreState();
      }
    
    }, delay);
  }
  
  // 计算重连延迟(指数退避 + 随机抖动)
  private calculateDelay(): number {
    // 指数退避
    let baseDelay = this.reconnectConfig.initialDelay * 
                    Math.pow(this.reconnectConfig.backoffFactor, this.reconnectAttempts - 1);
  
    // 随机抖动
    let jitter = 1 - this.reconnectConfig.jitterFactor + 
                 Math.random() * 2 * this.reconnectConfig.jitterFactor;
  
    let delay = baseDelay * jitter;
  
    // 限制最大延迟
    return Math.min(delay, this.reconnectConfig.maxDelay);
  }
  
  // 启动稳定计时器
  private startStableTimer(): void {
    this.stopStableTimer();
  
    this.stableTimer = setTimeout(() => {
      // 连接稳定一段时间后,重置重试计数
      console.info('连接稳定,重置重试计数');
      this.reconnectAttempts = 0;
    }, this.reconnectConfig.resetAfter);
  }
  
  // 停止稳定计时器
  private stopStableTimer(): void {
    if (this.stableTimer !== -1) {
      clearTimeout(this.stableTimer);
      this.stableTimer = -1;
    }
  }
  
  // 恢复状态
  private async restoreState(): Promise<void> {
    try {
      await this.onStateRestore?.();
      console.info('状态恢复成功');
    } catch (error) {
      console.error('状态恢复失败:', error);
    }
  }
  
  // 清理资源
  private async cleanup(): Promise<void> {
    // 清除重连定时器
    if (this.reconnectTimer !== -1) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = -1;
    }
  
    this.stopStableTimer();
  
    // 关闭旧连接
    if (this.ws) {
      try {
        await this.ws.close();
      } catch (error) {
        // 忽略关闭错误
      }
      this.ws = null;
    }
  }
  
  // 发送消息
  async send(data: string | ArrayBuffer): Promise<boolean> {
    if (!this.ws || !this.isConnected) {
      return false;
    }
  
    try {
      await this.ws.send(data);
      return true;
    } catch (error) {
      return false;
    }
  }
  
  // 关闭连接(不重连)
  async close(): Promise<void> {
    this.isReconnecting = true;  // 阻止重连
    await this.cleanup();
    this.isConnected = false;
  }
  
  // 手动触发重连
  async reconnect(): Promise<void> {
    this.reconnectAttempts = 0;  // 重置计数
    await this.cleanup();
    await this.connect();
  }
  
  // 设置回调
  setOnMessage(callback: (data: any) => void): void {
    this.onMessage = callback;
  }
  
  setOnConnectionChange(callback: (connected: boolean) => void): void {
    this.onConnectionChange = callback;
  }
  
  setOnReconnectAttempt(callback: (attempt: number, delay: number) => void): void {
    this.onReconnectAttempt = callback;
  }
  
  setOnReconnectFailed(callback: () => void): void {
    this.onReconnectFailed = callback;
  }
  
  setOnStateRestore(callback: () => Promise<void>): void {
    this.onStateRestore = callback;
  }
  
  // 获取状态
  getConnectionStatus(): boolean {
    return this.isConnected;
  }
  
  getReconnectAttempts(): number {
    return this.reconnectAttempts;
  }
}

场景三:完整的心跳+重连组合实现

将心跳检测和重连机制组合,实现生产级的WebSocket客户端。

import webSocket from '@ohos.net.webSocket';

// 完整配置
interface RobustWebSocketConfig {
  url: string;
  heartbeat: Partial<HeartbeatConfig>;
  reconnect: Partial<ReconnectConfig>;
}

// 生产级WebSocket客户端
class RobustWebSocket {
  private ws: webSocket.WebSocket | null = null;
  private config: {
    url: string;
    heartbeat: HeartbeatConfig;
    reconnect: ReconnectConfig;
  };
  
  // 心跳状态
  private heartbeatTimer: number = -1;
  private timeoutTimer: number = -1;
  private missedCount: number = 0;
  
  // 重连状态
  private reconnectAttempts: number = 0;
  private reconnectTimer: number = -1;
  private stableTimer: number = -1;
  
  // 连接状态
  private isConnected: boolean = false;
  private isManualClose: boolean = false;
  
  // 回调
  private callbacks = {
    onMessage: null as ((data: any) => void) | null,
    onConnectionChange: null as ((connected: boolean) => void) | null,
    onReconnectAttempt: null as ((attempt: number, delay: number) => void) | null,
    onReconnectFailed: null as (() => void) | null,
    onStateRestore: null as (() => Promise<void>) | null
  };
  
  constructor(config: RobustWebSocketConfig) {
    this.config = {
      url: config.url,
      heartbeat: { ...DEFAULT_HEARTBEAT_CONFIG, ...config.heartbeat },
      reconnect: { ...DEFAULT_RECONNECT_CONFIG, ...config.reconnect }
    };
  }
  
  // 连接
  async connect(): Promise<boolean> {
    this.isManualClose = false;
    await this.cleanup();
  
    this.ws = webSocket.createWebSocket();
  
    try {
      await this.ws.connect(this.config.url);
    
      this.isConnected = true;
      this.missedCount = 0;
      this.reconnectAttempts = 0;
    
      this.setupListeners();
      this.startHeartbeat();
      this.startStableTimer();
    
      this.callbacks.onConnectionChange?.(true);
    
      console.info('WebSocket连接成功');
      return true;
    
    } catch (error) {
      console.error('连接失败:', error);
      this.scheduleReconnect();
      return false;
    }
  }
  
  // 设置监听
  private setupListeners(): void {
    if (!this.ws) return;
  
    this.ws.on('message', (err, value) => {
      if (err) return;
    
      // 检查心跳响应
      let message = typeof value === 'string' ? value : new TextDecoder().decode(value);
      if (message === this.config.heartbeat.appLevelPong) {
        this.handlePong();
        return;
      }
    
      // 业务消息
      try {
        let data = JSON.parse(message);
        this.callbacks.onMessage?.(data);
      } catch (error) {
        // 忽略
      }
    });
  
    this.ws.on('close', (err, value) => {
      this.handleDisconnect();
    });
  
    this.ws.on('error', (err) => {
      this.handleDisconnect();
    });
  }
  
  // 心跳相关
  private startHeartbeat(): void {
    this.stopHeartbeat();
  
    this.heartbeatTimer = setInterval(() => {
      this.sendPing();
    }, this.config.heartbeat.interval);
  
    this.sendPing();
  }
  
  private stopHeartbeat(): void {
    if (this.heartbeatTimer !== -1) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = -1;
    }
    if (this.timeoutTimer !== -1) {
      clearTimeout(this.timeoutTimer);
      this.timeoutTimer = -1;
    }
  }
  
  private async sendPing(): Promise<void> {
    if (!this.ws || !this.isConnected) return;
  
    try {
      await this.ws.send(this.config.heartbeat.appLevelPing || '{"type":"ping"}');
    
      this.timeoutTimer = setTimeout(() => {
        this.missedCount++;
      
        if (this.missedCount >= this.config.heartbeat.maxMissed) {
          console.warn('心跳超时,判定断开');
          this.handleDisconnect();
        }
      }, this.config.heartbeat.timeout);
    
    } catch (error) {
      this.handleDisconnect();
    }
  }
  
  private handlePong(): void {
    if (this.timeoutTimer !== -1) {
      clearTimeout(this.timeoutTimer);
      this.timeoutTimer = -1;
    }
    this.missedCount = 0;
  }
  
  // 重连相关
  private handleDisconnect(): void {
    if (!this.isConnected || this.isManualClose) return;
  
    this.isConnected = false;
    this.stopHeartbeat();
    this.stopStableTimer();
    this.callbacks.onConnectionChange?.(false);
  
    this.scheduleReconnect();
  }
  
  private scheduleReconnect(): void {
    if (this.isManualClose) return;
    if (this.reconnectAttempts >= this.config.reconnect.maxAttempts) {
      console.error('重连失败');
      this.callbacks.onReconnectFailed?.();
      return;
    }
  
    this.reconnectAttempts++;
  
    let delay = Math.min(
      this.config.reconnect.initialDelay * 
      Math.pow(this.config.reconnect.backoffFactor, this.reconnectAttempts - 1) *
      (1 - this.config.reconnect.jitterFactor + Math.random() * 2 * this.config.reconnect.jitterFactor),
      this.config.reconnect.maxDelay
    );
  
    this.callbacks.onReconnectAttempt?.(this.reconnectAttempts, delay);
  
    this.reconnectTimer = setTimeout(async () => {
      let success = await this.connect();
      if (success) {
        await this.callbacks.onStateRestore?.();
      }
    }, delay);
  }
  
  private startStableTimer(): void {
    this.stopStableTimer();
  
    this.stableTimer = setTimeout(() => {
      this.reconnectAttempts = 0;
    }, this.config.reconnect.resetAfter);
  }
  
  private stopStableTimer(): void {
    if (this.stableTimer !== -1) {
      clearTimeout(this.stableTimer);
      this.stableTimer = -1;
    }
  }
  
  // 清理
  private async cleanup(): Promise<void> {
    this.stopHeartbeat();
    this.stopStableTimer();
  
    if (this.reconnectTimer !== -1) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = -1;
    }
  
    if (this.ws) {
      try {
        await this.ws.close();
      } catch (error) {
        // 忽略
      }
      this.ws = null;
    }
  }
  
  // 公共方法
  async send(data: string | ArrayBuffer): Promise<boolean> {
    if (!this.ws || !this.isConnected) return false;
  
    try {
      await this.ws.send(data);
      return true;
    } catch (error) {
      return false;
    }
  }
  
  async close(): Promise<void> {
    this.isManualClose = true;
    await this.cleanup();
    this.isConnected = false;
  }
  
  // 设置回调
  on(event: 'message' | 'connectionChange' | 'reconnectAttempt' | 'reconnectFailed' | 'stateRestore',
     callback: any): void {
    switch (event) {
      case 'message':
        this.callbacks.onMessage = callback;
        break;
      case 'connectionChange':
        this.callbacks.onConnectionChange = callback;
        break;
      case 'reconnectAttempt':
        this.callbacks.onReconnectAttempt = callback;
        break;
      case 'reconnectFailed':
        this.callbacks.onReconnectFailed = callback;
        break;
      case 'stateRestore':
        this.callbacks.onStateRestore = callback;
        break;
    }
  }
  
  getStatus(): { connected: boolean; attempts: number; missed: number } {
    return {
      connected: this.isConnected,
      attempts: this.reconnectAttempts,
      missed: this.missedCount
    };
  }
}

// 使用示例
@Entry
@Component
struct RobustWebSocketPage {
  @State connectionStatus: string = '未连接';
  @State reconnectInfo: string = '';
  @State messages: string[] = [];
  
  private wsClient: RobustWebSocket | null = null;
  
  async aboutToAppear() {
    this.wsClient = new RobustWebSocket({
      url: 'wss://api.example.com/ws',
      heartbeat: {
        interval: 25000,
        maxMissed: 3
      },
      reconnect: {
        maxAttempts: 10,
        initialDelay: 1000
      }
    });
  
    // 设置回调
    this.wsClient.on('connectionChange', (connected: boolean) => {
      this.connectionStatus = connected ? '已连接' : '未连接';
    });
  
    this.wsClient.on('reconnectAttempt', (attempt: number, delay: number) => {
      this.reconnectInfo = `${attempt}次重连,${delay}ms后执行`;
    });
  
    this.wsClient.on('message', (data: any) => {
      this.messages.unshift(JSON.stringify(data));
      if (this.messages.length > 10) {
        this.messages.pop();
      }
    });
  
    this.wsClient.on('stateRestore', async () => {
      // 重连后恢复订阅
      await this.wsClient?.send('{"action":"resubscribe"}');
    });
  
    // 连接
    await this.wsClient.connect();
  }
  
  async aboutToDisappear() {
    await this.wsClient?.close();
  }
  
  build() {
    Column() {
      // 状态显示
      Row() {
        Text(this.connectionStatus)
          .fontSize(16)
          .fontColor(this.connectionStatus === '已连接' ? '#7ED321' : '#D0021B')
      
        Blank().layoutWeight(1)
      
        Text(this.reconnectInfo)
          .fontSize(12)
          .fontColor('#666')
      }
      .width('100%')
      .padding(15)
    
      // 消息列表
      List() {
        ForEach(this.messages, (msg: string, index: number) => {
          ListItem() {
            Text(msg)
              .fontSize(14)
              .padding(10)
              .backgroundColor('#F5F5F5')
              .borderRadius(4)
          }
        })
      }
      .width('100%')
      .layoutWeight(1)
      .padding(10)
    }
    .width('100%')
    .height('100%')
  }
}

四、踩坑与注意事项

坑点一:心跳间隔设置不当

间隔太短浪费资源,太长无法及时检测断线。

// ❌ 错误:心跳间隔过短
const config = {
  interval: 1000,  // 每秒发一次,太频繁
  timeout: 500
};

// ❌ 错误:心跳间隔过长
const config = {
  interval: 300000,  // 5分钟,无法及时发现断线
  timeout: 5000
};

// ✅ 正确:根据场景设置
const config = {
  // 移动网络:NAT超时通常3-5分钟,建议25-30秒
  mobile: { interval: 25000, timeout: 5000 },
  
  // WiFi:可以稍长
  wifi: { interval: 45000, timeout: 10000 },
  
  // 实时性要求高:缩短间隔
  realtime: { interval: 15000, timeout: 3000 }
};

坑点二:重连导致消息丢失

重连期间发送的消息会丢失,需要消息队列。

// 带消息队列的WebSocket
class QueuedWebSocket extends RobustWebSocket {
  private messageQueue: Array<{ data: any; timestamp: number }> = [];
  private maxQueueSize: number = 100;
  
  // 发送消息(支持离线队列)
  async sendWithQueue(data: any): Promise<boolean> {
    if (this.isConnected) {
      return await this.send(JSON.stringify(data));
    } else {
      // 加入队列
      this.messageQueue.push({
        data: data,
        timestamp: Date.now()
      });
    
      // 限制队列大小
      if (this.messageQueue.length > this.maxQueueSize) {
        this.messageQueue.shift();
      }
    
      console.info('消息已加入队列');
      return false;
    }
  }
  
  // 重连成功后发送队列消息
  async flushQueue(): Promise<void> {
    while (this.messageQueue.length > 0 && this.isConnected) {
      let item = this.messageQueue.shift()!;
      await this.send(JSON.stringify(item.data));
    }
  }
}

坑点三:定时器未清理

页面销毁时定时器未清理,导致内存泄漏。

// ❌ 错误:忘记清理定时器
aboutToDisappear() {
  // 没有清理heartbeatTimer、reconnectTimer等
}

// ✅ 正确:统一清理
aboutToDisappear() {
  this.stopHeartbeat();
  this.stopStableTimer();
  
  if (this.reconnectTimer !== -1) {
    clearTimeout(this.reconnectTimer);
    this.reconnectTimer = -1;
  }
}

五、HarmonyOS 6适配指南

5.1 系统级心跳支持

HarmonyOS 6提供了系统级的Ping/Pong支持。

import webSocket from '@ohos.net.webSocket';

let ws = webSocket.createWebSocket();

await ws.connect('wss://api.example.com/ws', {
  // HarmonyOS 6: 系统级心跳配置
  pingInterval: 30000,   // 自动发送Ping
  pongTimeout: 5000      // Pong超时判定断开
});

// 系统会自动处理心跳,无需手动实现

5.2 网络状态感知重连

结合网络状态API,智能重连。

import connection from '@ohos.net.connection';

// 监听网络状态
let netConnection = connection.createNetConnection();

netConnection.on('netAvailable', async (data) => {
  console.info('网络可用:', data.netHandle.netId);
  
  // 网络恢复后立即重连
  if (!wsClient.isConnected) {
    await wsClient.reconnect();
  }
});

netConnection.on('netLost', (data) => {
  console.info('网络丢失');
  // 网络丢失时暂停重连
});

六、总结一下下

心跳和重连是WebSocket稳定运行的双重保障。本文从三个层面展开:

心跳检测:定期发送探测消息,及时发现连接异常。支持Ping/Pong帧和应用层心跳两种模式,根据服务器支持情况选择。

智能重连:指数退避加随机抖动,避免重连风暴。设置重试上限和稳定重置机制,平衡可靠性和资源消耗。

组合实现:将心跳和重连结合,实现生产级客户端。注意消息队列、状态恢复、定时器清理等细节。

HarmonyOS 6提供了系统级心跳支持,简化了实现难度。结合网络状态API,可以做到网络恢复时智能重连,用户体验更佳。

下一篇文章,我们将进入Socket TCP编程的世界,探索传输层的网络通信!

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐