Java百万在线用户消息推送:彻底解决积压、延迟,新手也能看懂
其实100万在线用户推送,核心逻辑并不复杂,记住这3个关键点:拆分任务:把100万拆成小批量,匀速推送,不集中压垮服务器;缓冲减压:用RabbitMQ做消息队列,解耦业务和推送,避免消息丢失、积压;兜底保障:加重试机制和监控告警,解决漏推、积压问题。上面的代码的都是可直接复制套用的,新手只要跟着步骤来,就能搞定,不用怕复杂。最后互动一下:你在做消息推送时,还遇到过哪些问题?比如WebSocket推
Java百万在线用户消息推送:彻底解决积压、延迟,新手也能看懂
原创 柯临 智汇魔盒 2026年4月20日 20:19 广东 听全文
大家好,我是柯临,做了8年Java开发。今天咱们聊个核心问题:向100万在线用户同时推送通知,怎么设计才能避免消息积压、推送延迟?
遇到这个需求就慌了——100万用户啊,同时发,服务器会不会崩?消息发不出去怎么办?其实不用怕,核心就一个逻辑:不“硬刚”,靠“拆分+缓冲+兜底”,把压力分摊掉。
全程不堆砌晦涩术语,每一步都讲透、给实操方法,甚至附简单代码,新手跟着做也能搞定,看完直接复制就能用。
一、核心设计思路(先记这4步,不慌)
先给大家一个清晰的步骤,不用急着抠细节,先把整体逻辑理顺,跟着这4步走,基本不会出大问题:
第一步:拒绝“一次性群发”,拆分推送任务
100万用户同时推送,就像100万人同时过一座桥,直接堵死。咱们要做的是“分批次、分通道”,把100万拆成无数个小批量,比如每批1000人,分1000批慢慢发,降低单批次压力。
第二步:用“消息队列”做缓冲,避免直接压垮业务服务器
简单说,消息队列就是“临时存消息的容器”,就像快递驿站——你不用直接把100万条消息全扔给推送服务器,先扔到驿站(消息队列),推送服务器再慢慢从驿站取,哪怕推送慢一点,消息也不会丢、不会积压。
第三步:多线程+线程池,提高推送效率
单线程推送太慢,100万条消息要推到天荒地老。咱们用多线程,就像多个人同时取快递、送快递,效率翻倍;但线程不能乱建,用线程池管理,避免线程过多拖垮服务器。
第四步:兜底机制,解决“漏推、积压”的收尾问题
哪怕前面做得再好,也可能出现个别消息推送失败、队列积压的情况。所以要加“重试机制”(失败了再试几次)和“监控告警”(积压、延迟了及时提醒),确保每一条消息都能送到位。
二、具体实现逻辑(附案例+代码,直接套用)
核心思路懂了,接下来拆解具体实现,每一步都给实操方法,结合Java代码片段(标注详细注释),还有场景类比,帮大家快速理解。
1. 任务拆分:批量推送(核心:拆小、匀速)
类比场景:你要给100万个客户发短信,不会一次性复制100万个号码,而是每1000个一组,分1000组发,每组间隔100毫秒,避免短信平台被压垮。
具体实现:
① 先获取所有在线用户ID(这里假设从Redis获取,因为Redis查在线用户最快,新手记住:在线用户存Redis,key是用户ID,value是在线状态);
② 把用户ID分成批量,每批1000个(批量大小可调整,建议1000-2000,太大容易压垮,太小效率低);
③ 匀速推送,每批间隔100-200毫秒,避免集中压力。
代码片段(Java,带注释):
// 1. 从Redis获取所有在线用户ID(新手:Redis的SMEMBERS命令,获取集合中的所有元素)Set<String> onlineUserIds = redisTemplate.opsForSet().members("online:user:ids");if (onlineUserIds == null || onlineUserIds.isEmpty()) {System.out.println("没有在线用户,无需推送");return;}// 2. 批量拆分,每批1000个用户List<String> userList = new ArrayList<>(onlineUserIds);int batchSize = 1000; // 每批大小,可根据服务器性能调整int totalBatch = (int) Math.ceil((double) userList.size() / batchSize);// 3. 循环推送每一批,每批间隔100毫秒for (int i = 0; i < totalBatch; i++) {// 计算当前批次的用户范围int start = i * batchSize;int end = Math.min((i + 1) * batchSize, userList.size());List<String> batchUserIds = userList.subList(start, end);// 推送当前批次(后面会讲推送方法)pushBatchMessage(batchUserIds, "推送内容");// 匀速推送,避免集中压力(新手:Thread.sleep不要放在多线程里,这里是主线程控制批次间隔)Thread.sleep(100);}
2. 消息缓冲:用RabbitMQ做消息队列
为什么用RabbitMQ?简单说,它稳定、易用,Java集成起来简单,新手容易上手(比Kafka简单,Kafka适合超大规模,100万用户用RabbitMQ足够)。
核心作用:业务服务器(生成推送消息)不用直接调用推送接口,而是把消息发送到RabbitMQ队列,推送服务器从队列里“消费”消息(取消息、推消息),实现“解耦”——哪怕推送服务器卡了,消息也会存在队列里,不会丢失、不会积压(只要队列配置合理)。
具体实现(Java集成RabbitMQ,核心代码,带注释):
① 配置RabbitMQ队列(新手:复制到项目配置类即可)
@Configurationpublic class RabbitMQConfig {// 消息队列名称(自定义,好记就行)public static final String PUSH_QUEUE = "user:message:push:queue";// 声明队列(核心: durable=true 表示队列持久化,避免服务器重启后队列丢失)@Beanpublic Queue pushQueue() {// durable: 持久化;exclusive: 不独占;autoDelete: 不自动删除;arguments: 额外参数return QueueBuilder.durable(PUSH_QUEUE).exclusive(false).autoDelete(false).build();}}
② 业务服务器:发送消息到队列
@Servicepublic class MessageSendService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送推送消息到队列(参数:用户ID列表、推送内容)public void sendPushMessage(List<String> userIds, String content) {// 封装消息内容(新手:用Map存,方便后面解析)Map<String, Object> message = new HashMap<>();message.put("userIds", userIds);message.put("content", content);// 发送消息到队列(参数:队列名称、消息内容)rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_QUEUE, message);System.out.println("消息发送到队列成功,用户数:" + userIds.size());}}
③ 推送服务器:从队列消费消息(核心:多线程消费)
@Servicepublic class MessageConsumerService {@Autowiredprivate PushService pushService; // 自己写的推送服务(比如调用极光、个推,或自己的推送接口)// 消费队列消息(@RabbitListener:监听指定队列)@RabbitListener(queues = RabbitMQConfig.PUSH_QUEUE)public void consumePushMessage(Map<String, Object> message) {try {// 解析消息中的用户ID和推送内容List<String> userIds = (List<String>) message.get("userIds");String content = (String) message.get("content");// 调用推送方法,推送当前批次(这里会用多线程,后面讲)pushService.pushBatch(userIds, content);} catch (Exception e) {// 异常处理:消息消费失败,后面讲重试机制System.out.println("消息消费失败,原因:" + e.getMessage());throw new AmqpRejectAndDontRequeueException("消费失败,进入重试队列");}}}
3. 提高效率:多线程+线程池(核心:控制线程数量)
类比场景:你一个人送1000个快递,要送很久;找10个人一起送,效率翻倍,但也不能找100个人(人太多会乱,还会占用太多资源),找10-20个人最合适。
具体实现:用Java的ThreadPoolExecutor创建线程池,控制线程数量(根据服务器CPU核心数调整,新手建议:核心线程数=CPU核心数*2,最大线程数=CPU核心数*4),避免线程过多导致服务器卡顿。
代码片段(线程池配置+多线程推送):
// 1. 线程池配置(新手:复制到配置类,直接用)@Configurationpublic class ThreadPoolConfig {// CPU核心数(自动获取,不用手动改)private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;private int maxPoolSize = Runtime.getRuntime().availableProcessors() * 4;@Beanpublic ExecutorService pushThreadPool() {return new ThreadPoolExecutor(corePoolSize, // 核心线程数(常驻线程)maxPoolSize, // 最大线程数(最多能创建的线程数)60L, // 空闲线程存活时间(超过这个时间,空闲线程会被销毁)TimeUnit.SECONDS, // 时间单位new ArrayBlockingQueue<>(1000), // 任务队列(存等待执行的任务)Executors.defaultThreadFactory(), // 线程工厂(创建线程)new ThreadPoolExecutor.AbortPolicy() // 拒绝策略(任务满了就拒绝,避免内存溢出));}}// 2. 推送服务(多线程推送)@Servicepublic class PushService {@Autowiredprivate ExecutorService pushThreadPool;// 批量推送(多线程)public void pushBatch(List<String> userIds, String content) {// 把每一个用户的推送,封装成一个任务,交给线程池执行for (String userId : userIds) {pushThreadPool.submit(() -> {// 具体推送逻辑(比如调用极光推送API,或自己的WebSocket推送)// 新手:这里替换成自己的推送代码,下面是模拟推送sendMessageToUser(userId, content);});}}// 模拟推送(实际开发中替换成真实推送逻辑)private void sendMessageToUser(String userId, String content) {// 比如:WebSocket推送、极光推送、个推等System.out.println("向用户[" + userId + "]推送消息:" + content);}}
4. 兜底机制:重试+监控(避免漏推、积压)
哪怕前面做得再好,也会有意外:比如用户网络波动、推送接口临时故障,导致消息推送失败;或者队列消息太多,推送不及时,出现积压。这时候就需要“兜底”。
① 重试机制(失败了再试几次)
核心:推送失败后,不要直接放弃,重试2-3次(最多3次,多了反而会加重压力),每次重试间隔1秒(避免频繁重试)。
代码片段(给推送方法加重试):
// 带重试的推送方法(核心:最多重试3次)private void sendMessageToUser(String userId, String content) {int maxRetry = 3; // 最大重试次数int retryCount = 0; // 当前重试次数while (retryCount < maxRetry) {try {// 真实推送逻辑(比如调用极光API)// 这里模拟推送失败(新手:实际开发中替换成真实代码)if (Math.random() < 0.1) { // 10%的失败概率,模拟异常throw new Exception("推送接口异常");}System.out.println("向用户[" + userId + "]推送消息:" + content + ",成功");return; // 推送成功,退出循环} catch (Exception e) {retryCount++;System.out.println("向用户[" + userId + "]推送失败,第" + retryCount + "次重试");if (retryCount >= maxRetry) {// 重试次数用完,记录失败日志(后面手动处理)System.out.println("向用户[" + userId + "]推送失败,已达到最大重试次数,记录日志");// 可选:把失败的用户ID存到数据库,后续手动重试saveFailedPush(userId, content);}try {Thread.sleep(1000); // 每次重试间隔1秒} catch (InterruptedException ex) {Thread.currentThread().interrupt();}}}}
② 监控告警(及时发现问题)
核心:实时监控消息队列的积压情况、推送延迟、失败率,一旦超过阈值,及时告警(比如发邮件、钉钉消息给开发人员),避免问题扩大。
新手实操:用Spring Boot Admin监控线程池状态,用RabbitMQ的管理界面(默认端口15672)监控队列积压;也可以简单点,写个定时任务,每1分钟检查一次队列消息数,超过10000条就告警。
简单监控代码(定时任务):
@Component@EnableSchedulingpublic class PushMonitorTask {@Autowiredprivate RabbitTemplate rabbitTemplate;// 每1分钟检查一次队列积压情况@Scheduled(cron = "0 0/1 * * * ?")public void monitorQueue() {// 获取队列中的消息数(新手:RabbitMQ的queueDeclarePassive方法,获取队列信息)QueueInfo queueInfo = rabbitTemplate.execute(channel -> {AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(RabbitMQConfig.PUSH_QUEUE);return new QueueInfo(declareOk.getMessageCount(), declareOk.getConsumerCount());});long messageCount = queueInfo.getMessageCount(); // 队列积压消息数int consumerCount = queueInfo.getConsumerCount(); // 消费线程数// 阈值:积压超过10000条,或者没有消费线程,就告警if (messageCount > 10000 || consumerCount == 0) {// 这里替换成告警逻辑(比如发钉钉、邮件)System.out.println("告警:消息队列积压严重!积压数:" + messageCount + ",消费线程数:" + consumerCount);// sendAlarm("消息队列积压严重,积压数:" + messageCount);}}// 封装队列信息(内部类,不用管,直接用)static class QueueInfo {private long messageCount;private int consumerCount;public QueueInfo(long messageCount, int consumerCount) {this.messageCount = messageCount;this.consumerCount = consumerCount;}// getter方法(省略,IDE自动生成即可)public long getMessageCount() { return messageCount; }public int getConsumerCount() { return consumerCount; }}}
三、必避漏洞(新手最容易踩的坑,附规避方法)
这部分是重点!很多开发者设计得看似没问题,但一上线就出问题,都是因为踩了这些坑。每个坑都给具体的规避方法,照着做就能避开。
漏洞1:批量太大,导致推送服务器卡顿
容易犯的错:觉得“批量越大,效率越高”,把每批设置成10000个,结果推送服务器一次性处理10000个请求,CPU、内存飙升,直接卡顿,反而导致延迟。
规避方法:批量大小控制在1000-2000个,根据自己服务器性能调整;同时每批间隔100-200毫秒,匀速推送,不要集中压测服务器。
漏洞2:线程池配置不合理,导致线程泄漏、内存溢出
容易犯的错:直接用Executors.newCachedThreadPool()创建线程池(无界线程池),推送高峰期会创建大量线程,导致服务器内存溢出;或者线程池队列太小,任务满了直接拒绝,导致消息丢失。
规避方法:用ThreadPoolExecutor手动配置线程池(参考前面的代码),核心线程数=CPU核心数*2,最大线程数=CPU核心数*4,队列大小设置为1000-2000,拒绝策略用AbortPolicy(任务满了拒绝,避免内存溢出),同时监控线程池状态。
漏洞3:消息队列不持久化,服务器重启后消息丢失
容易犯的错:RabbitMQ队列配置时,durable=false(不持久化),一旦服务器重启,队列和里面的消息就全没了,导致大量消息丢失。
规避方法:队列配置时,设置durable=true(持久化),同时消息发送时,设置deliveryMode=2(消息持久化),确保服务器重启后,队列和消息都不会丢失。
补充代码(消息持久化):
// 发送消息时,设置消息持久化rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_QUEUE,message,messagePostProcessor -> {// deliveryMode=2 表示消息持久化messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return messagePostProcessor;});
漏洞4:没有重试机制,推送失败直接放弃
容易犯的错:推送失败后,直接打印日志,不做任何处理,导致部分用户漏推,用户投诉。
规避方法:给推送方法加重试机制,最多重试3次,每次间隔1秒;重试失败后,把失败的用户ID和消息存到数据库,后续手动重试或定时重试,确保不遗漏任何一个用户。
漏洞5:不监控队列,积压、延迟发现不及时
容易犯的错:上线后不管不顾,直到用户反馈“没收到消息”,才发现队列积压了几十万条消息,推送延迟了几十分钟。
规避方法:加监控告警(参考前面的定时任务代码),监控队列积压数、推送延迟、失败率;设置合理阈值,比如积压超过10000条、延迟超过5分钟,就及时告警,快速处理。
四、总结
其实100万在线用户推送,核心逻辑并不复杂,记住这3个关键点:
-
拆分任务:把100万拆成小批量,匀速推送,不集中压垮服务器;
-
缓冲减压:用RabbitMQ做消息队列,解耦业务和推送,避免消息丢失、积压;
-
兜底保障:加重试机制和监控告警,解决漏推、积压问题。
上面的代码的都是可直接复制套用的,新手只要跟着步骤来,就能搞定,不用怕复杂。
最后互动一下:你在做消息推送时,还遇到过哪些问题?比如WebSocket推送不稳定、队列积压处理不了?欢迎评论区留言~
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐



所有评论(0)