Java百万在线用户消息推送:彻底解决积压、延迟,新手也能看懂

原创 柯临 智汇魔盒 2026年4月20日 20:19 广东 听全文

大家好,我是柯临,做了8年Java开发。今天咱们聊个核心问题:向100万在线用户同时推送通知,怎么设计才能避免消息积压、推送延迟?

遇到这个需求就慌了——100万用户啊,同时发,服务器会不会崩?消息发不出去怎么办?其实不用怕,核心就一个逻辑:不“硬刚”,靠“拆分+缓冲+兜底”,把压力分摊掉。

全程不堆砌晦涩术语,每一步都讲透、给实操方法,甚至附简单代码,新手跟着做也能搞定,看完直接复制就能用。

一、核心设计思路(先记这4步,不慌)

先给大家一个清晰的步骤,不用急着抠细节,先把整体逻辑理顺,跟着这4步走,基本不会出大问题:

第一步:拒绝“一次性群发”,拆分推送任务

100万用户同时推送,就像100万人同时过一座桥,直接堵死。咱们要做的是“分批次、分通道”,把100万拆成无数个小批量,比如每批1000人,分1000批慢慢发,降低单批次压力。

第二步:用“消息队列”做缓冲,避免直接压垮业务服务器

简单说,消息队列就是“临时存消息的容器”,就像快递驿站——你不用直接把100万条消息全扔给推送服务器,先扔到驿站(消息队列),推送服务器再慢慢从驿站取,哪怕推送慢一点,消息也不会丢、不会积压。

第三步:多线程+线程池,提高推送效率

单线程推送太慢,100万条消息要推到天荒地老。咱们用多线程,就像多个人同时取快递、送快递,效率翻倍;但线程不能乱建,用线程池管理,避免线程过多拖垮服务器。

第四步:兜底机制,解决“漏推、积压”的收尾问题

哪怕前面做得再好,也可能出现个别消息推送失败、队列积压的情况。所以要加“重试机制”(失败了再试几次)和“监控告警”(积压、延迟了及时提醒),确保每一条消息都能送到位。

二、具体实现逻辑(附案例+代码,直接套用)

核心思路懂了,接下来拆解具体实现,每一步都给实操方法,结合Java代码片段(标注详细注释),还有场景类比,帮大家快速理解。

1. 任务拆分:批量推送(核心:拆小、匀速)

类比场景:你要给100万个客户发短信,不会一次性复制100万个号码,而是每1000个一组,分1000组发,每组间隔100毫秒,避免短信平台被压垮。

具体实现:

① 先获取所有在线用户ID(这里假设从Redis获取,因为Redis查在线用户最快,新手记住:在线用户存Redis,key是用户ID,value是在线状态);

② 把用户ID分成批量,每批1000个(批量大小可调整,建议1000-2000,太大容易压垮,太小效率低);

③ 匀速推送,每批间隔100-200毫秒,避免集中压力。

代码片段(Java,带注释):

// 1. 从Redis获取所有在线用户ID(新手:Redis的SMEMBERS命令,获取集合中的所有元素)Set<String> onlineUserIds = redisTemplate.opsForSet().members("online:user:ids");if (onlineUserIds == null || onlineUserIds.isEmpty()) {    System.out.println("没有在线用户,无需推送");    return;}// 2. 批量拆分,每批1000个用户List<String> userList = new ArrayList<>(onlineUserIds);int batchSize = 1000; // 每批大小,可根据服务器性能调整int totalBatch = (int) Math.ceil((double) userList.size() / batchSize);// 3. 循环推送每一批,每批间隔100毫秒for (int i = 0; i < totalBatch; i++) {    // 计算当前批次的用户范围    int start = i * batchSize;    int end = Math.min((i + 1) * batchSize, userList.size());    List<String> batchUserIds = userList.subList(start, end);    // 推送当前批次(后面会讲推送方法)    pushBatchMessage(batchUserIds, "推送内容");    // 匀速推送,避免集中压力(新手:Thread.sleep不要放在多线程里,这里是主线程控制批次间隔)    Thread.sleep(100);}

2. 消息缓冲:用RabbitMQ做消息队列

为什么用RabbitMQ?简单说,它稳定、易用,Java集成起来简单,新手容易上手(比Kafka简单,Kafka适合超大规模,100万用户用RabbitMQ足够)。

核心作用:业务服务器(生成推送消息)不用直接调用推送接口,而是把消息发送到RabbitMQ队列,推送服务器从队列里“消费”消息(取消息、推消息),实现“解耦”——哪怕推送服务器卡了,消息也会存在队列里,不会丢失、不会积压(只要队列配置合理)。

具体实现(Java集成RabbitMQ,核心代码,带注释):

① 配置RabbitMQ队列(新手:复制到项目配置类即可)

@Configurationpublic class RabbitMQConfig {    // 消息队列名称(自定义,好记就行)    public static final String PUSH_QUEUE = "user:message:push:queue";    // 声明队列(核心: durable=true 表示队列持久化,避免服务器重启后队列丢失)    @Bean    public Queue pushQueue() {        // durable: 持久化;exclusive: 不独占;autoDelete: 不自动删除;arguments: 额外参数        return QueueBuilder.durable(PUSH_QUEUE).exclusive(false).autoDelete(false).build();    }}

② 业务服务器:发送消息到队列

@Servicepublic class MessageSendService {    @Autowired    private RabbitTemplate rabbitTemplate;    // 发送推送消息到队列(参数:用户ID列表、推送内容)    public void sendPushMessage(List<String> userIds, String content) {        // 封装消息内容(新手:用Map存,方便后面解析)        Map&lt;String, Object&gt; message = new HashMap<>();        message.put("userIds", userIds);        message.put("content", content);        // 发送消息到队列(参数:队列名称、消息内容)        rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_QUEUE, message);        System.out.println("消息发送到队列成功,用户数:" + userIds.size());    }}

③ 推送服务器:从队列消费消息(核心:多线程消费)

@Servicepublic class MessageConsumerService {    @Autowired    private PushService pushService; // 自己写的推送服务(比如调用极光、个推,或自己的推送接口)    // 消费队列消息(@RabbitListener:监听指定队列)    @RabbitListener(queues = RabbitMQConfig.PUSH_QUEUE)    public void consumePushMessage(Map<String, Object> message) {        try {            // 解析消息中的用户ID和推送内容            List<String> userIds = (List<String>) message.get("userIds");            String content = (String) message.get("content");            // 调用推送方法,推送当前批次(这里会用多线程,后面讲)            pushService.pushBatch(userIds, content);        } catch (Exception e) {            // 异常处理:消息消费失败,后面讲重试机制            System.out.println("消息消费失败,原因:" + e.getMessage());            throw new AmqpRejectAndDontRequeueException("消费失败,进入重试队列");        }    }}

3. 提高效率:多线程+线程池(核心:控制线程数量)

类比场景:你一个人送1000个快递,要送很久;找10个人一起送,效率翻倍,但也不能找100个人(人太多会乱,还会占用太多资源),找10-20个人最合适。

具体实现:用Java的ThreadPoolExecutor创建线程池,控制线程数量(根据服务器CPU核心数调整,新手建议:核心线程数=CPU核心数*2,最大线程数=CPU核心数*4),避免线程过多导致服务器卡顿。

代码片段(线程池配置+多线程推送):

// 1. 线程池配置(新手:复制到配置类,直接用)@Configurationpublic class ThreadPoolConfig {    // CPU核心数(自动获取,不用手动改)    private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;    private int maxPoolSize = Runtime.getRuntime().availableProcessors() * 4;    @Bean    public ExecutorService pushThreadPool() {        return new ThreadPoolExecutor(                corePoolSize, // 核心线程数(常驻线程)                maxPoolSize, // 最大线程数(最多能创建的线程数)                60L, // 空闲线程存活时间(超过这个时间,空闲线程会被销毁)                TimeUnit.SECONDS, // 时间单位                new ArrayBlockingQueue<>(1000), // 任务队列(存等待执行的任务)                Executors.defaultThreadFactory(), // 线程工厂(创建线程)                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略(任务满了就拒绝,避免内存溢出)        );    }}// 2. 推送服务(多线程推送)@Servicepublic class PushService {    @Autowired    private ExecutorService pushThreadPool;    // 批量推送(多线程)    public void pushBatch(List<String> userIds, String content) {        // 把每一个用户的推送,封装成一个任务,交给线程池执行        for (String userId : userIds) {            pushThreadPool.submit(() -> {                // 具体推送逻辑(比如调用极光推送API,或自己的WebSocket推送)                // 新手:这里替换成自己的推送代码,下面是模拟推送                sendMessageToUser(userId, content);            });        }    }    // 模拟推送(实际开发中替换成真实推送逻辑)    private void sendMessageToUser(String userId, String content) {        // 比如:WebSocket推送、极光推送、个推等        System.out.println("向用户[" + userId + "]推送消息:" + content);    }}

4. 兜底机制:重试+监控(避免漏推、积压)

哪怕前面做得再好,也会有意外:比如用户网络波动、推送接口临时故障,导致消息推送失败;或者队列消息太多,推送不及时,出现积压。这时候就需要“兜底”。

① 重试机制(失败了再试几次)

核心:推送失败后,不要直接放弃,重试2-3次(最多3次,多了反而会加重压力),每次重试间隔1秒(避免频繁重试)。

代码片段(给推送方法加重试):

// 带重试的推送方法(核心:最多重试3次)private void sendMessageToUser(String userId, String content) {    int maxRetry = 3; // 最大重试次数    int retryCount = 0; // 当前重试次数    while (retryCount < maxRetry) {        try {            // 真实推送逻辑(比如调用极光API)            // 这里模拟推送失败(新手:实际开发中替换成真实代码)            if (Math.random() < 0.1) { // 10%的失败概率,模拟异常                throw new Exception("推送接口异常");            }            System.out.println("向用户[" + userId + "]推送消息:" + content + ",成功");            return; // 推送成功,退出循环        } catch (Exception e) {            retryCount++;            System.out.println("向用户[" + userId + "]推送失败,第" + retryCount + "次重试");            if (retryCount >= maxRetry) {                // 重试次数用完,记录失败日志(后面手动处理)                System.out.println("向用户[" + userId + "]推送失败,已达到最大重试次数,记录日志");                // 可选:把失败的用户ID存到数据库,后续手动重试                saveFailedPush(userId, content);            }            try {                Thread.sleep(1000); // 每次重试间隔1秒            } catch (InterruptedException ex) {                Thread.currentThread().interrupt();            }        }    }}

② 监控告警(及时发现问题)

核心:实时监控消息队列的积压情况、推送延迟、失败率,一旦超过阈值,及时告警(比如发邮件、钉钉消息给开发人员),避免问题扩大。

新手实操:用Spring Boot Admin监控线程池状态,用RabbitMQ的管理界面(默认端口15672)监控队列积压;也可以简单点,写个定时任务,每1分钟检查一次队列消息数,超过10000条就告警。

简单监控代码(定时任务):

@Component@EnableSchedulingpublic class PushMonitorTask {    @Autowired    private RabbitTemplate rabbitTemplate;    // 每1分钟检查一次队列积压情况    @Scheduled(cron = "0 0/1 * * * ?")    public void monitorQueue() {        // 获取队列中的消息数(新手:RabbitMQ的queueDeclarePassive方法,获取队列信息)        QueueInfo queueInfo = rabbitTemplate.execute(channel -> {            AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(RabbitMQConfig.PUSH_QUEUE);            return new QueueInfo(declareOk.getMessageCount(), declareOk.getConsumerCount());        });        long messageCount = queueInfo.getMessageCount(); // 队列积压消息数        int consumerCount = queueInfo.getConsumerCount(); // 消费线程数        // 阈值:积压超过10000条,或者没有消费线程,就告警        if (messageCount > 10000 || consumerCount == 0) {            // 这里替换成告警逻辑(比如发钉钉、邮件)            System.out.println("告警:消息队列积压严重!积压数:" + messageCount + ",消费线程数:" + consumerCount);            // sendAlarm("消息队列积压严重,积压数:" + messageCount);        }    }    // 封装队列信息(内部类,不用管,直接用)    static class QueueInfo {        private long messageCount;        private int consumerCount;        public QueueInfo(long messageCount, int consumerCount) {            this.messageCount = messageCount;            this.consumerCount = consumerCount;        }        // getter方法(省略,IDE自动生成即可)        public long getMessageCount() { return messageCount; }        public int getConsumerCount() { return consumerCount; }    }}

三、必避漏洞(新手最容易踩的坑,附规避方法)

这部分是重点!很多开发者设计得看似没问题,但一上线就出问题,都是因为踩了这些坑。每个坑都给具体的规避方法,照着做就能避开。

漏洞1:批量太大,导致推送服务器卡顿

容易犯的错:觉得“批量越大,效率越高”,把每批设置成10000个,结果推送服务器一次性处理10000个请求,CPU、内存飙升,直接卡顿,反而导致延迟。

规避方法:批量大小控制在1000-2000个,根据自己服务器性能调整;同时每批间隔100-200毫秒,匀速推送,不要集中压测服务器。

漏洞2:线程池配置不合理,导致线程泄漏、内存溢出

容易犯的错:直接用Executors.newCachedThreadPool()创建线程池(无界线程池),推送高峰期会创建大量线程,导致服务器内存溢出;或者线程池队列太小,任务满了直接拒绝,导致消息丢失。

规避方法:用ThreadPoolExecutor手动配置线程池(参考前面的代码),核心线程数=CPU核心数*2,最大线程数=CPU核心数*4,队列大小设置为1000-2000,拒绝策略用AbortPolicy(任务满了拒绝,避免内存溢出),同时监控线程池状态。

漏洞3:消息队列不持久化,服务器重启后消息丢失

容易犯的错:RabbitMQ队列配置时,durable=false(不持久化),一旦服务器重启,队列和里面的消息就全没了,导致大量消息丢失。

规避方法:队列配置时,设置durable=true(持久化),同时消息发送时,设置deliveryMode=2(消息持久化),确保服务器重启后,队列和消息都不会丢失。

补充代码(消息持久化):

// 发送消息时,设置消息持久化rabbitTemplate.convertAndSend(    RabbitMQConfig.PUSH_QUEUE,    message,    messagePostProcessor -> {        // deliveryMode=2 表示消息持久化        messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);        return messagePostProcessor;    });

漏洞4:没有重试机制,推送失败直接放弃

容易犯的错:推送失败后,直接打印日志,不做任何处理,导致部分用户漏推,用户投诉。

规避方法:给推送方法加重试机制,最多重试3次,每次间隔1秒;重试失败后,把失败的用户ID和消息存到数据库,后续手动重试或定时重试,确保不遗漏任何一个用户。

漏洞5:不监控队列,积压、延迟发现不及时

容易犯的错:上线后不管不顾,直到用户反馈“没收到消息”,才发现队列积压了几十万条消息,推送延迟了几十分钟。

规避方法:加监控告警(参考前面的定时任务代码),监控队列积压数、推送延迟、失败率;设置合理阈值,比如积压超过10000条、延迟超过5分钟,就及时告警,快速处理。

四、总结

其实100万在线用户推送,核心逻辑并不复杂,记住这3个关键点:

  1. 拆分任务:把100万拆成小批量,匀速推送,不集中压垮服务器;

  2. 缓冲减压:用RabbitMQ做消息队列,解耦业务和推送,避免消息丢失、积压;

  3. 兜底保障:加重试机制和监控告警,解决漏推、积压问题。

上面的代码的都是可直接复制套用的,新手只要跟着步骤来,就能搞定,不用怕复杂。

最后互动一下:你在做消息推送时,还遇到过哪些问题?比如WebSocket推送不稳定、队列积压处理不了?欢迎评论区留言~

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐