引言

在人工智能对话应用飞速发展的今天,用户对实时交互体验的期待已经达到前所未有的高度。想象一下,当你向AI助手提出一个问题时,传统的等待方式是一次性获取完整回复,这种体验在信息量较大的场景下往往让用户感到焦虑不安。而现代AI应用普遍采用的"打字机效果"——即回复内容逐字逐句地流式呈现——不仅大幅提升了用户体验,更让交互过程变得自然流畅、充满期待感。

本文将深入探讨实现这种流式交互效果的核心技术——Server-Sent Events(SSE)。我们将从前端到后端,从原理到实践,全面剖析SSE技术的每一个细节,帮助你掌握构建生产级流式对话应用的核心能力。无论是基于Spring Boot的后端实现,还是Vue3前端框架的巧妙运用,抑或是生产环境中Nginx配置的关键注意事项,本文都将提供详尽的技术解读和最佳实践建议。

────────────────────────────────────────────────────────────

为什么选择SSE而不是WebSocket:适用场景深度分析

图4:SSE与WebSocket/长轮询对比图

技术选型的重要性

在构建实时交互应用时,技术选型往往决定了项目的成功与否。许多开发者在初次接触流式交互需求时,会本能地想到WebSocket这个"全能选手"。诚然,WebSocket以其双向全双工通信的能力,几乎可以应对所有实时通信场景,但这种"万能钥匙"思维有时候反而会增加不必要的复杂度。让我们通过一个具体的对比分析,理解为何在AI对话场景下SSE是更明智的选择。

SSE的核心优势

单向通信的极致简洁是SSE最大的魅力所在。在AI对话场景中,数据的流动模式其实非常清晰:客户端发送一次请求,服务器端持续推送数据流。这种一对多的数据流动模式与SSE的设计哲学完美契合。使用SSE,你只需要维护一条HTTP长连接,服务器端通过这条连接持续不断地向客户端发送事件,而客户端只需要专注于接收和解析这些事件即可。

HTTP协议的天然亲和力是SSE的另一个显著优势。由于SSE本质上还是基于HTTP协议,它天然地继承了HTTP在互联网基础设施中的所有优势——不需要特殊的端口配置,不会被企业防火墙阻断,可以无缝穿越各种代理服务器。在当今复杂的网络环境中,这一点的重要性怎么强调都不为过。我曾经见过许多项目因为WebSocket在某些企业网络环境中无法正常工作而不得不返工重造,而基于SSE的应用从未遇到这类问题。

内置的自动重连机制让SSE的使用更加安心。当连接意外中断时,EventSource API会自动尝试重新建立连接,并且会带上上一次收到的最后一个事件ID(Last-Event-ID),确保不会丢失任何数据。这对于长时间运行的流式对话来说至关重要。

WebSocket的适用场景

当然,这并不是说WebSocket一无是处。恰恰相反,在以下场景中WebSocket仍然是首选:在线游戏需要实时双向交互,每个毫秒都至关重要;协作编辑工具如Google Docs需要多用户同时操作,双向通信不可或缺;金融交易平台需要毫秒级的行情推送和下单确认。只有在这些真正需要客户端和服务器实时双向沟通的场景中,WebSocket的优势才能充分发挥。

AI对话场景的最终判断

在AI大语言模型对话场景中,我们追求的是服务器向客户端推送流式文本数据,客户端需要做的是优雅地接收、解析并呈现这些数据。整个过程中,客户端不需要也几乎不会向服务器发送新的请求(除了建立连接时的初始请求)。这种单向数据流场景,正是SSE的拿手好戏。更何况,SSE的实现复杂度远低于WebSocket,调试和维护也更加方便,配套的浏览器开发者工具支持也更加完善。

对比维度

SSE

WebSocket

长轮询

通信方向

单向(服务器→客户端)

双向全双工

伪实时(轮询)

协议基础

HTTP/1.1

ws:// wss://

HTTP/1.1

实现复杂度

中等

中等

自动重连

内置支持

需手动实现

内置机制

二进制数据

不支持

支持

支持(Base64)

企业防火墙

友好

可能有障碍

完全兼容

代理服务器

需配置

需配置

完全兼容

AI对话场景

**最佳选择**

可用但复杂

不推荐

────────────────────────────────────────────────────────────

SSE技术原理深度解析

图1:SSE技术架构图

HTTP协议与SSE的关系

要深入理解SSE的工作原理,我们必须首先理解它与HTTP协议之间的密切关系。SSE并非全新的协议,而是一种基于HTTP协议的服务器推送技术实现。它巧妙地利用了HTTP/1.1的持久连接(Keep-Alive)特性,通过一种特殊的MIME类型text/event-stream来标识这是一个SSE连接。

当浏览器发起一个SSE请求时,它实际上发起的是一个带有特殊Accept头的HTTP请求:

GET /api/stream HTTP/1.1
Host: example.com
Accept: text/event-stream

服务器收到这个请求后,不会像普通HTTP响应那样立即返回完整数据然后关闭连接,而是会保持这个连接处于打开状态,并通过这个连接持续发送数据。当服务器发送完一组数据后,连接仍然保持,服务器可以在任何时候继续向这个连接写入新数据。

SSE数据格式详解

SSE的数据格式设计得极为简洁却功能强大。每一条消息由多个字段组成,字段之间以换行符分隔。以下是SSE消息的标准格式:

event: message
id: 1
data: {"content": "Hello", "type": "text"}

data: {"content": " World", "type": "text"}

event字段用于指定事件的类型,客户端可以根据这个字段为不同类型的事件设置不同的处理器。如果不指定event字段,默认为message类型。

id字段是实现可靠重连的关键。每次发送消息时,服务器会递增这个ID。当连接断开后自动重连时,浏览器会自动在请求头中带上Last-Event-ID,服务器可以根据这个ID从断点继续发送数据,避免数据重复或丢失。

data字段是实际的消息内容,可以是多行的。SSE会自动将多行data字段的内容用换行符连接起来,形成完整的消息体。这种设计使得SSE可以传输任意长度的文本数据。

retry字段用于指定连接断开后的重连间隔时间(毫秒)。如果不指定,浏览器会使用默认值(通常为3秒),但你可以通过发送retry: 10000来将重连间隔调整为10秒。

与短轮询的对比

短轮询是最简单也最原始的实时通信实现方式。其原理非常简单:客户端每隔一段时间就向服务器发送一个HTTP请求,询问"有没有新数据"。服务器立即返回当前状态,不管有没有新数据,客户端收到响应后稍作停顿,再次发送请求,如此循环往复。

短轮询的最大问题是效率低下和延迟不确定。即使服务器数据没有任何变化,客户端仍然需要不断地发送请求,这不仅浪费带宽,还给服务器带来无谓的计算压力。而且,如果数据的更新频率低于轮询频率,用户就会看到明显的数据更新延迟;反之,如果轮询过于频繁,即使数据没有更新也会造成资源浪费。短轮询的实时性完全取决于轮询间隔的设置,无法做到真正的实时。

与长轮询的对比

长轮询是对短轮询的一种改进,试图在实时性和资源消耗之间取得平衡。长轮询的工作流程是:客户端发送请求后,服务器不立即返回响应,而是保持这个连接打开,直到有数据更新时才返回数据。当客户端收到响应后,会立即再次发送一个新的请求,建立下一个长轮询连接。

长轮询相比短轮询确实大幅减少了无效请求的数量,实时性也有所提升。但它仍然存在根本性的缺陷:每次数据更新都需要建立一个新的HTTP连接,这个过程包括TCP握手、TLS协商(如果是HTTPS)、HTTP请求和响应的完整交互。这种开销在高频率更新的场景下会变得非常显著。

更重要的是,长轮询的延迟是不确定的。在最理想的情况下(数据刚好在有客户端等待时到达),延迟可以接近零;但在最糟糕的情况下(数据刚更新完,下一个轮询请求还没发出),延迟可能接近整个轮询周期。

SSE的革命性优势

SSE彻底解决了这些问题。由于使用的是持久化的HTTP长连接,服务器可以在数据产生的第一时间就将数据推送到客户端,真正的零延迟。而且,由于复用了同一个HTTP连接,不需要为每次数据推送都重新建立连接,资源消耗极低。

SSE还支持单个TCP连接上的多路复用(在HTTP/2环境下),这意味着即使是多个SSE连接,也可以共享同一个TCP连接,进一步减少了网络资源的使用。这种高效性是轮询系列技术无法企及的。

────────────────────────────────────────────────────────────

Spring Boot SSE后端实现

SseEmitter核心API

Spring Boot为SSE提供了原生支持,主要通过SseEmitter类来实现。SseEmitter是Spring Web模块中用于处理Server-Sent Events的核心类,它封装了异步HTTP响应的所有复杂性,让开发者可以专注于业务逻辑的实现。

SseEmitter的基本用法非常直观。首先创建一个SseEmitter实例,然后将这个emitter作为异步处理的载体,在适当的时机发送数据,最后在连接完成或出错时通知emitter。以下是一个最简单的SSE端点实现:

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream() {
    SseEmitter emitter = new SseEmitter(3600000L); // 超时时间1小时

    CompletableFuture.runAsync(() -> {
        try {
            for (int i = 0; i < 10; i++) {
                emitter.send("data: 第" + i + "条消息\n\n");
                Thread.sleep(1000);
            }
            emitter.complete();
        } catch (Exception e) {
            emitter.completeWithError(e);
        }
    });

    return emitter;
}

这个例子展示了SSE最基本的工作流程:创建emitter,异步发送数据,完成或出错时通知emitter。但在生产环境中,我们需要考虑更多的细节和最佳实践。

异步线程池配置

在上面的简单例子中,我们使用了CompletableFuture.runAsync(),它默认使用ForkJoinPool.common()作为执行器。在生产环境中,这种默认配置往往不是最佳选择,因为SSE连接可能会长时间保持,占用公共线程池的资源会影响其他业务操作。

更好的做法是为SSE业务配置专门的线程池。以下是一个推荐的线程池配置:

@Configuration
public class AsyncConfig implements AsyncConfigurer {

    private static final int CORE_POOL_SIZE = 10;
    private static final int MAX_POOL_SIZE = 50;
    private static final int QUEUE_CAPACITY = 100;
    private static final String THREAD_NAME_PREFIX = "sse-executor-";

    @Bean(name = "sseExecutor")
    public Executor sseExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return sseExecutor();
    }
}

有了这个专门的线程池,我们就可以在SSE控制器中注入并使用它:

@Autowired
@Qualifier("sseExecutor")
private Executor sseExecutor;

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream() {
    SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 不设置超时

    sseExecutor.execute(() -> {
        try {
            for (int i = 0; i < 10; i++) {
                emitter.send(SseEmitter.event()
                    .name("message")
                    .data("第" + i + "条消息")
                    .id(String.valueOf(i));
                Thread.sleep(1000);
            }
            emitter.complete();
        } catch (Exception e) {
            emitter.completeWithError(e);
        }
    });

    return emitter;
}

完整的企业级SSE服务实现

现在让我们来看一个更加完善的企业级SSE实现。这个实现包含了超时管理、错误处理、资源清理、连接状态追踪等生产环境必需的要素:

@Service
public class SSEStreamService {

    private final Map<String, SseEmitter> activeEmitters = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

    @Autowired
    private LLMService llmService;

    public SseEmitter createStream(String sessionId, String userMessage) {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        emitter.onCompletion(() -> {
            activeEmitters.remove(sessionId);
            log.info("SSE连接完成: sessionId={}", sessionId);
        });

        emitter.onTimeout(() -> {
            activeEmitters.remove(sessionId);
            log.warn("SSE连接超时: sessionId={}", sessionId);
        });

        emitter.onError(e -> {
            activeEmitters.remove(sessionId);
            log.error("SSE连接错误: sessionId={}", sessionId, e);
        });

        activeEmitters.put(sessionId, emitter);

        // 启动超时检查任务
        scheduleTimeoutCheck(sessionId, emitter);

        // 启动异步数据处理
        processStream(sessionId, userMessage, emitter);

        return emitter;
    }

    private void scheduleTimeoutCheck(String sessionId, SseEmitter emitter) {
        scheduler.schedule(() -> {
            if (activeEmitters.containsKey(sessionId)) {
                try {
                    emitter.send(SseEmitter.event()
                        .name("timeout")
                        .data("{\"error\": \"连接超时,请重试\"}"));
                    emitter.complete();
                } catch (Exception e) {
                    log.error("发送超时消息失败: sessionId={}", sessionId, e);
                }
            }
        }, 30, TimeUnit.MINUTES);
    }

    private void processStream(String sessionId, String userMessage, SseEmitter emitter) {
        sseExecutor.execute(() -> {
            try {
                // 调用LLM服务获取流式响应
                llmService.streamGenerate(userMessage, new StreamCallback() {
                    private int eventId = 0;

                    @Override
                    public void onChunk(String chunk) {
                        try {
                            emitter.send(SseEmitter.event()
                                .name("message")
                                .id(String.valueOf(++eventId))
                                .data("{\"content\": \"" + escapeJson(chunk) + "\"}"));
                        } catch (IOException e) {
                            log.error("发送SSE数据失败: sessionId={}", sessionId, e);
                        }
                    }

                    @Override
                    public void onComplete() {
                        try {
                            emitter.send(SseEmitter.event()
                                .name("done")
                                .data("{\"done\": true}"));
                            emitter.complete();
                        } catch (IOException e) {
                            log.error("发送完成信号失败: sessionId={}", sessionId, e);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        try {
                            emitter.send(SseEmitter.event()
                                .name("error")
                                .data("{\"error\": \"" + t.getMessage() + "\"}"));
                            emitter.completeWithError(t);
                        } catch (IOException e) {
                            log.error("发送错误信号失败: sessionId={}", sessionId, e);
                        }
                    }
                });
            } catch (Exception e) {
                log.error("处理SSE流异常: sessionId={}", sessionId, e);
                try {
                    emitter.send(SseEmitter.event()
                        .name("error")
                        .data("{\"error\": \"" + escapeJson(e.getMessage()) + "\"}"));
                    emitter.completeWithError(e);
                } catch (IOException ioe) {
                    log.error("发送错误信号失败: sessionId={}", sessionId, ioe);
                }
            }
        });
    }

    public void closeSession(String sessionId) {
        SseEmitter emitter = activeEmitters.remove(sessionId);
        if (emitter != null) {
            emitter.complete();
        }
    }

    public int getActiveConnectionCount() {
        return activeEmitters.size();
    }

    private String escapeJson(String text) {
        if (text == null) return "";
        return text.replace("\\", "\\\\")
                   .replace("\"", "\\\"")
                   .replace("\n", "\\n")
                   .replace("\r", "\\r")
                   .replace("\t", "\\t");
    }
}

这个实现展示了企业级SSE服务应有的样子:完整的生命周期管理、错误处理、资源清理、连接追踪,以及与LLM服务的集成。值得注意的是JSON转义的处理——在SSE的data字段中传输JSON数据时,必须对特殊字符进行转义,否则会导致前端解析失败。

────────────────────────────────────────────────────────────

前端EventSource API详解

EventSource基础用法

浏览器原生的EventSource API是使用SSE最简单的方式。它提供了一个基于事件的接口,让我们可以轻松地订阅服务器发送的消息。以下是EventSource的基本用法:

// 创建EventSource连接
const eventSource = new EventSource('/api/stream');

// 监听默认的message事件
eventSource.onmessage = (event) => {
    console.log('收到消息:', event.data);
    appendToDisplay(event.data);
};

// 监听自定义类型的消息
eventSource.addEventListener('message', (event) => {
    console.log('message事件:', event.data);
});

eventSource.addEventListener('done', (event) => {
    console.log('数据流结束');
    hideTypingIndicator();
});

eventSource.addEventListener('error', (event) => {
    console.error('SSE错误:', event);
    if (event.target.readyState === EventSource.CLOSED) {
        console.log('连接已关闭');
    }
});

// 连接状态
console.log('连接状态:', eventSource.readyState);
// EventSource.CONNECTING = 0
// EventSource.OPEN = 1
// EventSource.CLOSED = 2

EventSource的自动重连机制

EventSource最方便的特性之一就是自动重连。当连接意外断开时(网络波动、服务器重启等),EventSource会自动尝试重新建立连接,而且会带上上一次成功接收到的最后一条消息的ID(Last-Event-ID),让服务器可以从断点继续发送。

这个机制是浏览器自动实现的,我们不需要编写任何重连逻辑。但有时候,我们可能需要在重连时做一些额外的处理,比如更新UI状态、记录日志等。可以通过监听error事件来实现这一点:

eventSource.onerror = (event) => {
    if (event.target.readyState === EventSource.CONNECTING) {
        console.log('正在尝试重新连接...');
        updateConnectionStatus('reconnecting');
    } else if (event.target.readyState === EventSource.CLOSED) {
        console.log('连接已永久关闭');
        updateConnectionStatus('disconnected');
        // 可能需要手动提示用户刷新页面
    }
};

EventSource的局限性

尽管EventSource API简单易用,但它有一些根本性的限制,这些限制在某些场景下会成为致命问题:

无法发送自定义请求头是EventSource最大的局限。由于EventSource本质上是通过<script>标签的动态创建来发起请求的(这是浏览器的内部实现),你无法在请求中添加Authorization头或其他自定义头部。这意味着,如果你需要在SSE请求中进行身份验证,只能通过URL参数传递令牌,或者依赖session cookie。这种做法在安全性上有一定的妥协。

无法指定HTTP方法。EventSource总是使用GET方法,无法发送POST请求。这在需要向服务器提交数据的场景下是完全不可用的。

不支持二进制数据。EventSource只能处理文本数据,无法接收二进制流。如果你需要传输二进制数据(如图片、文件等),必须先将其Base64编码为文本,这会大幅增加数据传输量。

IE浏览器不支持。这是历史遗留问题,虽然现代浏览器都已经支持EventSource,但如果你需要兼容老版本IE(现在已经非常罕见),EventSource是不可用的。

这些局限性促使我们在很多场景下选择Fetch API + ReadableStream的方案。

────────────────────────────────────────────────────────────

Fetch API + ReadableStream实现细粒度控制

为什么选择Fetch API

Fetch API是XMLHttpRequest的现代替代品,它提供了更强大、更灵活的网络请求能力。结合ReadableStream,Fetch API可以让我们完全控制HTTP响应的处理过程,包括流式数据的读取、分块解析、错误处理等。

相比EventSource,Fetch API + ReadableStream的核心优势包括:

完全的请求控制。我们可以指定任意的HTTP方法(GET、POST等)、请求头、请求体,满足各种复杂的业务需求。

细粒度的数据处理。ReadableStream提供了分块读取的能力,让我们可以一块一块地处理接收到的数据,而不是等待完整响应。

更好的错误处理。Fetch API的错误处理机制更加完善,我们可以区分网络错误、HTTP错误(如4xx、5xx)等不同类型的错误。

流式响应支持。通过设置response.body为ReadableStream,我们可以实时处理服务器推送的数据。

Fetch + ReadableStream基础实现

以下是一个使用Fetch API + ReadableStream实现SSE的完整例子:

class SSEReader {
    constructor(url, options = {}) {
        this.url = url;
        this.options = options;
        this.abortController = null;
        this.reader = null;
        this.decoder = new TextDecoder();
        this.buffer = '';
    }

    async connect() {
        this.abortController = new AbortController();

        const response = await fetch(this.url, {
            method: this.options.method || 'GET',
            headers: this.options.headers || {},
            signal: this.abortController.signal,
        });

        if (!response.ok) {
            throw new Error(`HTTP error! status: ${response.status}`);
        }

        if (!response.body) {
            throw new Error('ReadableStream is not supported');
        }

        this.reader = response.body.getReader();
        return this.processStream();
    }

    async processStream() {
        try {
            while (true) {
                const { done, value } = await this.reader.read();

                if (done) {
                    // 处理缓冲区中剩余的数据
                    if (this.buffer.trim()) {
                        this.processSSEData(this.buffer);
                    }
                    this.onComplete();
                    break;
                }

                // 将接收到的数据块追加到缓冲区
                this.buffer += this.decoder.decode(value, { stream: true });

                // 处理缓冲区中的完整SSE消息
                this.processBuffer();
            }
        } catch (error) {
            this.onError(error);
        }
    }

    processBuffer() {
        // 按换行符分割
        const lines = this.buffer.split('\n');
        // 保留最后不完整的行在缓冲区中
        this.buffer = lines.pop();

        for (const line of lines) {
            this.processSSEData(line);
        }
    }

    processSSEData(line) {
        line = line.trim();
        if (!line) return;

        // 解析SSE格式: field:value
        const colonIndex = line.indexOf(':');
        if (colonIndex === -1) return;

        const field = line.substring(0, colonIndex).trim();
        const value = line.substring(colonIndex + 1).trim();

        switch (field) {
            case 'event':
                this.currentEventType = value;
                break;
            case 'id':
                this.lastEventId = value;
                break;
            case 'data':
                this.handleData(value);
                break;
            case 'retry':
                // 服务器可以指定重连间隔
                console.log(`服务器指定重连间隔: ${value}ms`);
                break;
        }
    }

    handleData(value) {
        const eventType = this.currentEventType || 'message';
        this.currentEventType = null;

        switch (eventType) {
            case 'message':
                this.onMessage(value);
                break;
            case 'done':
                this.onDone();
                break;
            case 'error':
                this.onErrorEvent(value);
                break;
            default:
                this.onCustomEvent(eventType, value);
        }
    }

    onMessage(data) {
        // 回调: 收到普通消息
        if (this.options.onMessage) {
            this.options.onMessage(data);
        }
    }

    onDone() {
        // 回调: 流结束
        if (this.options.onDone) {
            this.options.onDone();
        }
    }

    onError(error) {
        // 回调: 错误
        if (this.options.onError) {
            this.options.onError(error);
        }
    }

    onComplete() {
        // 回调: 完成
        if (this.options.onComplete) {
            this.options.onComplete();
        }
    }

    close() {
        if (this.abortController) {
            this.abortController.abort();
        }
        if (this.reader) {
            this.reader.cancel();
        }
    }
}

使用示例

const sseReader = new SSEReader('/api/stream', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
    },
    body: JSON.stringify({ message: '你好,请介绍一下自己' }),
    onMessage: (data) => {
        try {
            const parsed = JSON.parse(data);
            appendToDisplay(parsed.content);
        } catch (e) {
            console.error('解析消息失败:', e);
            appendToDisplay(data);
        }
    },
    onDone: () => {
        hideTypingIndicator();
        console.log('数据流已完成');
    },
    onError: (error) => {
        showError('连接错误: ' + error.message);
        console.error('SSE错误:', error);
    },
    onComplete: () => {
        console.log('连接已关闭');
    }
});

// 在组件卸载时关闭连接
onUnmounted(() => {
    sseReader.close();
});

────────────────────────────────────────────────────────────

打字机效果的前端实现

打字机效果的核心原理

打字机效果的实现原理其实非常直观:服务器发送的文本被分割成多个小块,前端每收到一个数据块,就将它追加到当前显示文本的末尾,同时更新页面渲染。这个过程不断重复,直到所有数据发送完毕。

在视觉效果上,由于文本是一块一块地添加的,我们看到的就像是一个看不见的人在逐字逐句地打字一样。如果配合光标动画和适当的滚动逻辑,整个体验就和一个真实的打字员在为你工作别无二致。

Vue3中的实现

在Vue3中实现打字机效果,我们需要考虑响应式更新、性能优化和用户体验等多个维度。以下是一个完整的Vue3组件实现:

<template>
  <div class="chat-container">
    <div class="message-list" ref="messageListRef">
      <div
        v-for="(message, index) in messages"
        :key="index"
        :class="['message', message.role]"
      >
        <div class="message-content">
          <span v-html="formatMessage(message.content)"></span>
          <span
            v-if="message.role === 'assistant' && message.streaming"
            class="typing-cursor"
          ></span>
        </div>
      </div>
    </div>

    <div class="input-area">
      <textarea
        v-model="inputText"
        @keydown.enter.exact.prevent="sendMessage"
        placeholder="输入你的问题..."
        rows="3"
      ></textarea>
      <button @click="sendMessage" :disabled="isStreaming">
        {{ isStreaming ? '生成中...' : '发送' }}
      </button>
    </div>
  </div>
</template>

<script setup>
import { ref, reactive, nextTick, watch } from 'vue';

const messages = reactive([]);
const inputText = ref('');
const isStreaming = ref(false);
const messageListRef = ref(null);

// 发送消息
const sendMessage = async () => {
    if (isStreaming.value || !inputText.value.trim()) return;

    const userMessage = inputText.value.trim();
    inputText.value = '';
    isStreaming.value = true;

    // 添加用户消息
    messages.push({
        role: 'user',
        content: userMessage
    });

    // 添加AI消息占位
    const aiMessageIndex = messages.length;
    messages.push({
        role: 'assistant',
        content: '',
        streaming: true
    });

    // 创建SSE连接
    const sseReader = new SSEReader('/api/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message: userMessage }),
        onMessage: (data) => {
            try {
                const parsed = JSON.parse(data);
                messages[aiMessageIndex].content += parsed.content;
                scrollToBottom();
            } catch (e) {
                messages[aiMessageIndex].content += data;
                scrollToBottom();
            }
        },
        onDone: () => {
            messages[aiMessageIndex].streaming = false;
            isStreaming.value = false;
            scrollToBottom();
        },
        onError: (error) => {
            messages[aiMessageIndex].content += `\n[错误: ${error.message}]`;
            messages[aiMessageIndex].streaming = false;
            isStreaming.value = false;
        }
    });
};

// 自动滚动到底部
const scrollToBottom = async () => {
    await nextTick();
    if (messageListRef.value) {
        messageListRef.value.scrollTop = messageListRef.value.scrollHeight;
    }
};

// 格式化消息(支持简单的Markdown)
const formatMessage = (content) => {
    if (!content) return '';

    let formatted = content
        // 转义HTML
        .replace(/&/g, '&')
        .replace(/</g, '<')
        .replace(/>/g, '>')
        // 代码块
        .replace(/```(\w*)\n([\s\S]*?)```/g, '<pre><code class="$1">$2</code></pre>')
        // 行内代码
        .replace(/`([^`]+)`/g, '<code>$1</code>')
        // 粗体
        .replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>')
        // 斜体
        .replace(/\*([^*]+)\*/g, '<em>$1</em>')
        // 换行
        .replace(/\n/g, '<br>');

    return formatted;
};

// 清理
onUnmounted(() => {
    // 清理逻辑
});
</script>

<style scoped>
.chat-container {
    display: flex;
    flex-direction: column;
    height: 100vh;
    max-width: 800px;
    margin: 0 auto;
    background: #1a1a2e;
}

.message-list {
    flex: 1;
    overflow-y: auto;
    padding: 20px;
}

.message {
    margin-bottom: 16px;
    max-width: 80%;
}

.message.user {
    margin-left: auto;
    text-align: right;
}

.message.user .message-content {
    background: #3b82f6;
    color: white;
    padding: 12px 16px;
    border-radius: 16px 16px 4px 16px;
}

.message.assistant .message-content {
    background: #2d3748;
    color: #e2e8f0;
    padding: 12px 16px;
    border-radius: 16px 16px 16px 4px;
}

.typing-cursor {
    display: inline-block;
    width: 2px;
    height: 1em;
    background: #10b981;
    margin-left: 2px;
    animation: blink 1s infinite;
    vertical-align: text-bottom;
}

@keyframes blink {
    0%, 100% { opacity: 1; }
    50% { opacity: 0; }
}

.input-area {
    display: flex;
    padding: 16px;
    background: #2d3748;
    gap: 12px;
}

.input-area textarea {
    flex: 1;
    padding: 12px;
    border: 1px solid #4a5568;
    border-radius: 8px;
    background: #1a202c;
    color: #e2e8f0;
    font-size: 14px;
    resize: none;
}

.input-area button {
    padding: 12px 24px;
    background: #10b981;
    color: white;
    border: none;
    border-radius: 8px;
    cursor: pointer;
    font-weight: bold;
}

.input-area button:disabled {
    background: #4a5568;
    cursor: not-allowed;
}
</style>

性能优化技巧

在实现打字机效果时,性能是一个必须认真考虑的问题。如果不加控制,每收到一个数据块就立即更新DOM,在高频数据流(如AI流式输出)的情况下,可能会导致严重的性能问题,用户会感受到明显的卡顿。

使用requestAnimationFrame节流是最简单有效的优化手段。我们不直接更新DOM,而是将要更新的内容放入一个变量,然后通过requestAnimationFrame来批量更新:

let pendingUpdate = false;
let displayText = '';

const updateDisplay = () => {
    element.textContent = displayText;
    pendingUpdate = false;
};

const onDataReceived = (chunk) => {
    displayText += chunk;

    if (!pendingUpdate) {
        pendingUpdate = true;
        requestAnimationFrame(updateDisplay);
    }
};

文本片段合并是另一个重要的优化策略。AI模型生成的token可能非常小(有时甚至只有一个字符),如果每个token都触发一次DOM更新,效率会非常低下。我们可以设置一个缓冲时间,将多个小片段合并后一次性更新:

let textBuffer = '';
let flushTimeout = null;
const FLUSH_INTERVAL = 30; // 毫秒

const onDataReceived = (chunk) => {
    textBuffer += chunk;

    if (!flushTimeout) {
        flushTimeout = setTimeout(() => {
            displayText += textBuffer;
            element.textContent = displayText;
            textBuffer = '';
            flushTimeout = null;
        }, FLUSH_INTERVAL);
    }
};

虚拟滚动对于超长对话是必须的。当消息列表非常长时,渲染所有的DOM元素会成为性能瓶颈。虚拟滚动的原理是只渲染当前可见区域的元素,当用户滚动时动态更新渲染的内容。Vue3生态中有许多成熟的虚拟滚动库,如vue-virtual-scroller。

────────────────────────────────────────────────────────────

SSE流式中的错误处理与重连机制

连接错误的类型分析

在SSE通信中,错误可能发生在多个层面,我们需要对它们进行细致的区分和处理:

网络层错误是最常见的错误类型。当网络连接中断时(WiFi断开、网线拔掉、网络切换等),EventSource会触发error事件,然后自动进入重连流程。这类错误通常是暂时性的,网络恢复后连接会自动建立。

HTTP层错误发生在请求到达服务器但处理出现问题的场景。比如返回404(端点不存在)、401(未认证)、500(服务器内部错误)等。这类错误不会触发EventSource的自动重连(因为HTTP响应已经收到了),需要我们手动处理。

超时错误是一个容易被忽视的问题。如果服务器在很长时间内没有发送任何数据,HTTP客户端可能会认为连接已死,从而主动断开。我们需要在服务器端配置合理的超时设置,并在客户端实现超时检测机制。

服务器主动关闭是服务器在某些情况下主动断开连接的行为。比如用户会话过期、服务器需要重启、负载均衡将请求切换到其他服务器等。这类情况下,服务器通常会先发送一条特殊的消息或使用特定的HTTP状态码来通知客户端。

前端错误处理实现

class SSEReaderWithErrorHandling {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            maxRetries: 5,
            retryDelay: 1000,
            connectionTimeout: 30000,
            ...options
        };
        this.retryCount = 0;
        this.retryTimeout = null;
        this.connectionTimeoutId = null;
    }

    connect() {
        this.resetTimeouts();

        return new Promise((resolve, reject) => {
            this.abortController = new AbortController();

            // 设置连接超时
            this.connectionTimeoutId = setTimeout(() => {
                this.abortController.abort();
                reject(new Error('连接超时'));
            }, this.options.connectionTimeout);

            fetch(this.url, {
                method: this.options.method || 'GET',
                headers: this.options.headers || {},
                signal: this.abortController.signal,
            })
            .then(response => {
                clearTimeout(this.connectionTimeoutId);

                if (!response.ok) {
                    throw new Error(`HTTP ${response.status}: ${response.statusText}`);
                }

                this.retryCount = 0; // 重置重试计数
                this.setupStreamProcessing(response);
                resolve();
            })
            .catch(error => {
                clearTimeout(this.connectionTimeoutId);

                if (error.name === 'AbortError') {
                    reject(new Error('请求被中止'));
                } else {
                    this.handleError(error);
                    reject(error);
                }
            });
        });
    }

    handleError(error) {
        console.error('SSE错误:', error);

        if (this.retryCount < this.options.maxRetries) {
            const delay = this.options.retryDelay * Math.pow(2, this.retryCount);
            console.log(`${delay}ms后进行第${this.retryCount + 1}次重试...`);

            this.retryTimeout = setTimeout(() => {
                this.retryCount++;
                this.connect().catch(() => {});
            }, delay);
        } else {
            console.error('已达到最大重试次数');
            if (this.options.onMaxRetriesExceeded) {
                this.options.onMaxRetriesExceeded(error);
            }
        }
    }

    resetTimeouts() {
        if (this.retryTimeout) {
            clearTimeout(this.retryTimeout);
            this.retryTimeout = null;
        }
        if (this.connectionTimeoutId) {
            clearTimeout(this.connectionTimeoutId);
            this.connectionTimeoutId = null;
        }
    }

    close() {
        this.resetTimeouts();
        if (this.abortController) {
            this.abortController.abort();
        }
    }
}

后端错误处理策略

后端Spring Boot应用的错误处理同样重要。我们需要考虑几个关键场景:

LLM服务不可用时的降级处理。当AI模型服务暂时不可用时,我们可以实现一个优雅降级策略:返回一条友好的错误消息,提示用户稍后再试,同时记录详细的错误日志便于后续排查。

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream(@RequestParam String message) {
    SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

    sseExecutor.execute(() -> {
        try {
            llmService.streamGenerate(message, new StreamCallback() {
                @Override
                public void onChunk(String chunk) {
                    emitter.send(SseEmitter.event()
                        .name("message")
                        .data("{\"content\": \"" + escapeJson(chunk) + "\"}"));
                }

                @Override
                public void onComplete() {
                    emitter.send(SseEmitter.event()
                        .name("done")
                        .data("{\"done\": true}"));
                    emitter.complete();
                }

                @Override
                public void onError(Throwable t) {
                    log.error("LLM调用失败", t);
                    try {
                        emitter.send(SseEmitter.event()
                            .name("error")
                            .data("{\"error\": \"服务暂时不可用,请稍后重试\"}"));
                    } catch (IOException e) {
                        log.error("发送错误消息失败", e);
                    }
                    emitter.completeWithError(t);
                }
            });
        } catch (Exception e) {
            log.error("创建流失败", e);
            try {
                emitter.send(SseEmitter.event()
                    .name("error")
                    .data("{\"error\": \"请求处理失败: " + escapeJson(e.getMessage()) + "\"}"));
            } catch (IOException ioe) {
                log.error("发送错误消息失败", ioe);
            }
            emitter.completeWithError(e);
        }
    });

    // 设置超时回调
    emitter.onTimeout(() -> {
        log.warn("SSE连接超时: {}", message);
    });

    emitter.onCompletion(() -> {
        log.info("SSE连接完成");
    });

    return emitter;
}

熔断器模式也是生产环境中常用的保护机制。当错误率超过阈值时,熔断器会"跳闸",快速拒绝新的请求,避免持续的失败尝试拖垮整个系统。Spring Cloud Netflix的Resilience4j是一个成熟的解决方案:

@CircuitBreaker(name = "llmService", fallbackMethod = "fallback")
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream(@RequestParam String message) {
    // 正常逻辑
}

private SseEmitter fallback(String message, Throwable t) {
    SseEmitter emitter = new SseEmitter();
    sseExecutor.execute(() -> {
        try {
            emitter.send(SseEmitter.event()
                .name("error")
                .data("{\"error\": \"服务繁忙,请稍后重试\"}"));
            emitter.complete();
        } catch (IOException e) {
            log.error("发送降级消息失败", e);
        }
    });
    return emitter;
}

────────────────────────────────────────────────────────────

Nginx配置对SSE的影响

proxy_buffering问题

在将Spring Boot应用部署到生产环境时,Nginx几乎是必然要用到的反向代理服务器。然而,Nginx的默认配置可能会给SSE带来意想不到的问题,其中最关键的就是proxy_buffering。

proxy_buffering是Nginx的一个优化特性,默认为开启状态。当启用时,Nginx会缓冲代理服务器的响应,然后再发送给客户端。这个设计本意是减少对后端服务器的连接压力,提高整体吞吐量。但对于SSE这种需要实时推送的场景,proxy_buffering会导致严重的延迟问题——服务器发送的数据会被Nginx"截留",只有当缓冲区满了或者响应完成后,客户端才能收到数据。

解决方案:关闭proxy_buffering

针对SSE相关的location配置,我们必须关闭proxy_buffering:

location /api/stream {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

    # 关键配置:关闭代理缓冲
    proxy_buffering off;

    # 关闭缓存
    proxy_cache off;

    # 设置chunked传输编码
    chunked_transfer_encoding on;

    # 超时设置
    proxy_read_timeout 86400s;
    proxy_send_timeout 86400s;

    # 关闭升级(如果使用HTTP/1.0)
    proxy_request_buffering off;
}

完整的生产环境Nginx配置

upstream sse_backend {
    least_conn;
    server 192.168.1.101:8080 weight=5;
    server 192.168.1.102:8080 weight=5;
    keepalive 32;
}

server {
    listen 80;
    server_name your-domain.com;

    # SSE端点配置
    location /api/stream {
        proxy_pass http://sse_backend;
        proxy_http_version 1.1;

        # 基础代理头
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header Connection '';

        # SSE关键配置
        proxy_buffering off;
        proxy_cache off;
        chunked_transfer_encoding on;
        proxy_request_buffering off;

        # 连接和传输超时(长连接需要较长超时)
        proxy_connect_timeout 60s;
        proxy_send_timeout 86400s;  # 24小时,支持长时间流
        proxy_read_timeout 86400s; # 24小时

        # 错误处理
        proxy_intercept_errors off;
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
    }

    # 其他API端点(正常缓冲)
    location /api/ {
        proxy_pass http://sse_backend;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 普通API可以使用缓冲优化
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
        proxy_busy_buffers_size 8k;

        proxy_connect_timeout 30s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    # 静态资源
    location /static/ {
        expires 30d;
        add_header Cache-Control "public, immutable";
    }
}

HTTP/1.0与HTTP/1.1的差异

在配置Nginx时,还需要注意HTTP版本的影响。SSE需要HTTP/1.1的chunked传输编码支持。如果不小心配置成了HTTP/1.0,SSE可能无法正常工作。

proxy_http_version 1.1;这行配置确保了使用HTTP/1.1协议。同时,proxy_set_header Connection '';(注意是空字符串)用于清除Connection头,避免出现"Connection: keep-alive"导致的问题。这是Nginx代理SSE时的一个常见陷阱。

Gzip压缩与SSE

另一个需要注意的配置是Gzip压缩。Gzip会在传输层对数据进行压缩,对于普通API响应可以显著节省带宽。但对于SSE流,Gzip压缩可能会造成问题——因为压缩算法需要等待足够多的数据才能产生输出块,这会导致数据不能立即发送。

# 在SSE location中禁用Gzip
location /api/stream {
    # ... 其他配置 ...

    # 如果全局开启了Gzip,在这个location中关闭
    gzip off;
}

────────────────────────────────────────────────────────────

完整实战案例:Spring Boot + Vue3实现打字机对话

项目概述

现在,让我们通过一个完整的实战项目来整合所有学到的知识。这个项目包含两个主要部分:基于Spring Boot的后端SSE流式响应服务,以及基于Vue3的前端对话界面。项目将实现一个类似ChatGPT的AI对话效果,支持流式输出、打字机效果、自动滚动等功能。

后端实现

项目结构

backend/
├── src/main/java/com/example/ssedemo/
│   ├── SseDemoApplication.java
│   ├── config/
│   │   ├── WebConfig.java
│   │   └── AsyncConfig.java
│   ├── controller/
│   │   └── ChatController.java
│   ├── service/
│   │   ├── ChatService.java
│   │   └── LLMService.java
│   ├── model/
│   │   └── ChatRequest.java
│   └── util/
│       └── JsonUtil.java
├── pom.xml
└── application.yml

Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.springframework.org/schema/beans
         http://maven.springframework.org/schema/beans/spring-beans.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>sse-demo</artifactId>
    <version>1.0.0</version>

    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

异步配置

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "sseExecutor")
    public Executor sseExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("sse-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

ChatController

@RestController
@RequestMapping("/api")
@CrossOrigin(origins = "*")
public class ChatController {

    private static final Logger log = LoggerFactory.getLogger(ChatController.class);

    @Autowired
    private ChatService chatService;

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream(@RequestParam String message,
                            @RequestParam(required = false, defaultValue = "user-123") String sessionId) {
        log.info("收到SSE请求: sessionId={}, message={}", sessionId, message);

        SseEmitter emitter = chatService.createStream(sessionId, message);

        emitter.onCompletion(() -> log.info("SSE连接完成: sessionId={}", sessionId));
        emitter.onTimeout(() -> log.warn("SSE连接超时: sessionId={}", sessionId));
        emitter.onError(e -> log.error("SSE连接错误: sessionId={}", sessionId, e));

        return emitter;
    }

    @PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter chat(@RequestBody ChatRequest request) {
        log.info("收到聊天请求: sessionId={}, message={}", request.getSessionId(), request.getMessage());

        SseEmitter emitter = chatService.createStream(request.getSessionId(), request.getMessage());

        emitter.onCompletion(() -> log.info("聊天会话完成: sessionId={}", request.getSessionId()));
        emitter.onTimeout(() -> log.warn("聊天会话超时: sessionId={}", request.getSessionId()));
        emitter.onError(e -> log.error("聊天会话错误: sessionId={}", request.getSessionId(), e));

        return emitter;
    }
}

ChatService

@Service
public class ChatService {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Executor sseExecutor;

    @Autowired
    private LLMService llmService;

    public ChatService(@Qualifier("sseExecutor") Executor sseExecutor) {
        this.sseExecutor = sseExecutor;
    }

    public SseEmitter createStream(String sessionId, String message) {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        // 保存emitter
        SseEmitter existing = emitters.put(sessionId, emitter);
        if (existing != null) {
            existing.complete();
        }

        // 设置回调
        emitter.onCompletion(() -> emitters.remove(sessionId));
        emitter.onTimeout(() -> emitters.remove(sessionId));
        emitter.onError(e -> {
            emitters.remove(sessionId);
            log.error("SSE错误: sessionId={}", sessionId, e);
        });

        // 启动处理
        processMessage(sessionId, message, emitter);

        return emitter;
    }

    private void processMessage(String sessionId, String message, SseEmitter emitter) {
        sseExecutor.execute(() -> {
            try {
                // 模拟LLM流式输出(实际项目中调用真实的LLM服务)
                simulateLLMStream(emitter, message);
            } catch (Exception e) {
                log.error("处理消息失败: sessionId={}", sessionId, e);
                try {
                    emitter.send(SseEmitter.event()
                        .name("error")
                        .data("{\"error\": \"" + escapeJson(e.getMessage()) + "\"}"));
                } catch (IOException ioe) {
                    log.error("发送错误消息失败", ioe);
                }
                emitter.completeWithError(e);
            }
        });
    }

    // 模拟LLM流式输出(实际项目中替换为真实LLM调用)
    private void simulateLLMStream(SseEmitter emitter, String userMessage) throws IOException {
        String response = generateResponse(userMessage);

        int eventId = 0;
        for (char c : response.toCharArray()) {
            // 模拟token生成延迟
            try {
                Thread.sleep(30 + (long)(Math.random() * 50));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }

            final int currentId = ++eventId;
            final String chunk = String.valueOf(c);

            try {
                emitter.send(SseEmitter.event()
                    .name("message")
                    .id(String.valueOf(currentId))
                    .data("{\"content\": \"" + escapeJson(chunk) + "\"}"));
            } catch (IOException e) {
                log.warn("发送SSE数据失败,客户端可能已断开: sessionId", sessionId);
                return;
            }
        }

        // 发送完成信号
        emitter.send(SseEmitter.event()
            .name("done")
            .data("{\"done\": true}"));
        emitter.complete();
    }

    private String generateResponse(String userMessage) {
        // 这里使用模拟的回复,实际项目中调用真实的LLM服务
        if (userMessage.contains("你好") || userMessage.contains("hello")) {
            return "你好!很高兴见到你。我是洛水AI助手,一个基于大语言模型的智能对话系统。我可以帮助你回答问题、提供建议、进行创意写作等。有什么我可以帮助你的吗?";
        } else if (userMessage.contains("你是谁") || userMessage.contains("介绍")) {
            return "我是一个先进的人工智能助手,由洛水团队开发。我基于最新的大语言模型技术构建,具备强大的自然语言理解和生成能力。我可以帮助你完成各种任务,包括:\n\n1. 回答问题 - 无论是常识问题还是专业知识,我都可以尽力为你解答。\n\n2. 写作辅助 - 我可以帮你撰写文章、邮件、报告,或者帮你润色和修改文本。\n\n3. 代码开发 - 我熟悉多种编程语言,可以帮你写代码、调试bug、解释代码逻辑。\n\n4. 学习辅导 - 我可以帮助你学习新知识,解释复杂的概念。\n\n5. 创意头脑风暴 - 如果你需要新点子,我可以帮你进行创意发散。\n\n请告诉我你需要什么样的帮助吧!";
        } else if (userMessage.contains("谢谢")) {
            return "不客气!很高兴能帮到你。如果你还有其他问题,随时可以问我。保持好奇,继续探索!";
        } else {
            return "感谢你的提问!你的问题是:" + userMessage + "\n\n让我思考一下...\n\n关于这个话题,我认为这是一个非常有意义的话题,涉及到多个层面的知识和技术。让我详细解释一下:\n\n首先,我们需要理解这个问题的背景和核心要点。在这个快速发展的时代,各种新技术和新概念层出不穷,我们需要保持学习的态度,不断更新自己的知识体系。\n\n其次,实践是非常重要的。理论知识和实际应用之间往往存在差距,只有通过不断的实践,我们才能真正掌握一项技能或理解一个概念。\n\n最后,我希望这些信息对你有所帮助。如果你还有其他问题,欢迎继续提问!";
        }
    }

    private String escapeJson(String text) {
        if (text == null) return "";
        return text.replace("\\", "\\\\")
                   .replace("\"", "\\\"")
                   .replace("\n", "\\n")
                   .replace("\r", "\\r")
                   .replace("\t", "\\t");
    }
}

ChatRequest

@Data
public class ChatRequest {
    private String sessionId;
    private String message;
}

前端实现

项目结构

frontend/
├── public/
│   └── index.html
├── src/
│   ├── main.js
│   ├── App.vue
│   ├── components/
│   │   └── ChatInterface.vue
│   ├── services/
│   │   └── sseService.js
│   └── styles/
│       └── global.css
├── package.json
└── vite.config.js

Vue3 + Vite项目创建

npm create vite@latest frontend -- --template vue
cd frontend
npm install
npm run dev

sseService.js

// SSE服务封装
class SSEService {
    constructor() {
        this.currentReader = null;
        this.abortController = null;
    }

    async chat(message, callbacks = {}) {
        this.close();

        this.abortController = new AbortController();
        const { signal } = this.abortController;

        try {
            const response = await fetch('/api/stream', {
                method: 'GET',
                headers: {
                    'Accept': 'text/event-stream',
                },
                signal,
                params: { message }
            });

            if (!response.ok) {
                throw new Error(`HTTP ${response.status}: ${response.statusText}`);
            }

            await this.processStream(response, callbacks);
        } catch (error) {
            if (error.name === 'AbortError') {
                console.log('SSE请求被中止');
            } else {
                console.error('SSE错误:', error);
                if (callbacks.onError) {
                    callbacks.onError(error);
                }
            }
        }
    }

    async processStream(response, callbacks) {
        const reader = response.body.getReader();
        this.currentReader = reader;
        const decoder = new TextDecoder();
        let buffer = '';
        let currentEventType = 'message';

        try {
            while (true) {
                const { done, value } = await reader.read();

                if (done) {
                    break;
                }

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n');
                buffer = lines.pop();

                for (const line of lines) {
                    const trimmed = line.trim();
                    if (!trimmed) continue;

                    const colonIndex = trimmed.indexOf(':');
                    if (colonIndex === -1) continue;

                    const field = trimmed.substring(0, colonIndex).trim();
                    const value = trimmed.substring(colonIndex + 1).trim();

                    switch (field) {
                        case 'event':
                            currentEventType = value;
                            break;
                        case 'data':
                            this.handleData(value, currentEventType, callbacks);
                            currentEventType = 'message';
                            break;
                    }
                }
            }
        } finally {
            this.currentReader = null;
            if (callbacks.onComplete) {
                callbacks.onComplete();
            }
        }
    }

    handleData(data, eventType, callbacks) {
        switch (eventType) {
            case 'message':
                try {
                    const parsed = JSON.parse(data);
                    if (callbacks.onMessage) {
                        callbacks.onMessage(parsed.content || parsed);
                    }
                } catch {
                    if (callbacks.onMessage) {
                        callbacks.onMessage(data);
                    }
                }
                break;
            case 'done':
                if (callbacks.onDone) {
                    callbacks.onDone();
                }
                break;
            case 'error':
                try {
                    const parsed = JSON.parse(data);
                    if (callbacks.onError) {
                        callbacks.onError(new Error(parsed.error));
                    }
                } catch {
                    if (callbacks.onError) {
                        callbacks.onError(new Error(data));
                    }
                }
                break;
        }
    }

    close() {
        if (this.abortController) {
            this.abortController.abort();
            this.abortController = null;
        }
        if (this.currentReader) {
            this.currentReader.cancel();
            this.currentReader = null;
        }
    }
}

export const sseService = new SSEService();

ChatInterface.vue

<template>
  <div class="chat-container">
    <div class="chat-header">
      <h1>洛水AI助手</h1>
      <span class="status" :class="{ connected: isConnected }">
        {{ isConnected ? '连接中' : '空闲' }}
      </span>
    </div>

    <div class="message-list" ref="messageListRef">
      <div
        v-for="(message, index) in messages"
        :key="index"
        class="message"
        :class="[message.role, { streaming: message.streaming }]"
      >
        <div class="avatar">
          {{ message.role === 'user' ? 'U' : 'AI' }}
        </div>
        <div class="content-wrapper">
          <div class="message-content" v-html="formatContent(message.content)"></div>
          <span v-if="message.streaming" class="cursor"></span>
        </div>
      </div>
    </div>

    <div class="input-container">
      <textarea
        v-model="inputMessage"
        @keydown.enter.exact.prevent="sendMessage"
        placeholder="输入你的问题,按Enter发送..."
        rows="3"
      ></textarea>
      <button @click="sendMessage" :disabled="isStreaming">
        {{ isStreaming ? '生成中...' : '发送' }}
      </button>
    </div>
  </div>
</template>

<script setup>
import { ref, reactive, nextTick, onUnmounted } from 'vue';
import { sseService } from '../services/sseService';

const messages = reactive([]);
const inputMessage = ref('');
const isStreaming = ref(false);
const isConnected = ref(false);
const messageListRef = ref(null);

const sendMessage = () => {
    if (isStreaming.value || !inputMessage.value.trim()) return;

    const userMessage = inputMessage.value.trim();
    inputMessage.value = '';
    isStreaming.value = true;

    // 添加用户消息
    messages.push({
        role: 'user',
        content: userMessage
    });

    // 添加AI消息占位
    const aiMessageIndex = messages.length;
    messages.push({
        role: 'assistant',
        content: '',
        streaming: true
    });

    scrollToBottom();

    // 调用SSE服务
    sseService.chat(userMessage, {
        onMessage: (chunk) => {
            messages[aiMessageIndex].content += chunk;
            scrollToBottom();
        },
        onDone: () => {
            messages[aiMessageIndex].streaming = false;
            isStreaming.value = false;
            isConnected.value = false;
            scrollToBottom();
        },
        onError: (error) => {
            messages[aiMessageIndex].content += `\n[错误: ${error.message}]`;
            messages[aiMessageIndex].streaming = false;
            isStreaming.value = false;
            isConnected.value = false;
            scrollToBottom();
        }
    });

    isConnected.value = true;
};

const scrollToBottom = async () => {
    await nextTick();
    if (messageListRef.value) {
        messageListRef.value.scrollTop = messageListRef.value.scrollHeight;
    }
};

const formatContent = (content) => {
    if (!content) return '';

    let formatted = content
        .replace(/&/g, '&')
        .replace(/</g, '<')
        .replace(/>/g, '>')
        .replace(/\n/g, '<br>')
        .replace(/```(\w*)\n([\s\S]*?)```/g, '<pre><code class="$1">$2</code></pre>')
        .replace(/`([^`]+)`/g, '<code>$1</code>')
        .replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>')
        .replace(/\*([^*]+)\*/g, '<em>$1</em>');

    return formatted;
};

onUnmounted(() => {
    sseService.close();
});
</script>

<style scoped>
.chat-container {
    display: flex;
    flex-direction: column;
    height: 100vh;
    max-width: 900px;
    margin: 0 auto;
    background: #0f0f1a;
    color: #e2e8f0;
}

.chat-header {
    display: flex;
    justify-content: space-between;
    align-items: center;
    padding: 20px 24px;
    background: #1a1a2e;
    border-bottom: 1px solid #2d2d44;
}

.chat-header h1 {
    margin: 0;
    font-size: 20px;
    color: #10b981;
}

.status {
    font-size: 12px;
    padding: 4px 12px;
    border-radius: 12px;
    background: #374151;
    color: #9ca3af;
}

.status.connected {
    background: #064e3b;
    color: #10b981;
}

.message-list {
    flex: 1;
    overflow-y: auto;
    padding: 24px;
}

.message {
    display: flex;
    gap: 16px;
    margin-bottom: 24px;
    max-width: 85%;
}

.message.user {
    flex-direction: row-reverse;
    margin-left: auto;
}

.message.assistant {
    flex-direction: row;
}

.avatar {
    width: 40px;
    height: 40px;
    border-radius: 50%;
    display: flex;
    align-items: center;
    justify-content: center;
    font-weight: bold;
    flex-shrink: 0;
}

.message.user .avatar {
    background: #3b82f6;
    color: white;
}

.message.assistant .avatar {
    background: #10b981;
    color: white;
}

.content-wrapper {
    position: relative;
    padding: 12px 16px;
    border-radius: 16px;
    line-height: 1.6;
}

.message.user .content-wrapper {
    background: #3b82f6;
    color: white;
    border-bottom-right-radius: 4px;
}

.message.assistant .content-wrapper {
    background: #1e293b;
    color: #e2e8f0;
    border-bottom-left-radius: 4px;
}

.cursor {
    display: inline-block;
    width: 2px;
    height: 1.2em;
    background: #10b981;
    margin-left: 2px;
    animation: blink 0.8s infinite;
    vertical-align: text-bottom;
}

@keyframes blink {
    0%, 100% { opacity: 1; }
    50% { opacity: 0; }
}

.input-container {
    display: flex;
    gap: 12px;
    padding: 20px 24px;
    background: #1a1a2e;
    border-top: 1px solid #2d2d44;
}

.input-container textarea {
    flex: 1;
    padding: 12px 16px;
    border: 1px solid #374151;
    border-radius: 12px;
    background: #0f0f1a;
    color: #e2e8f0;
    font-size: 14px;
    font-family: inherit;
    resize: none;
    transition: border-color 0.2s;
}

.input-container textarea:focus {
    outline: none;
    border-color: #10b981;
}

.input-container textarea::placeholder {
    color: #6b7280;
}

.input-container button {
    padding: 12px 24px;
    background: #10b981;
    color: white;
    border: none;
    border-radius: 12px;
    font-weight: bold;
    cursor: pointer;
    transition: background 0.2s;
}

.input-container button:hover:not(:disabled) {
    background: #059669;
}

.input-container button:disabled {
    background: #374151;
    cursor: not-allowed;
}
</style>

────────────────────────────────────────────────────────────

SSE在AI对话中的最佳实践

连接管理与资源优化

在AI对话应用中,SSE连接的时长可能非常长——一次对话可能持续数分钟甚至更长时间。在这段时间内,后端服务器需要维护连接状态、线程资源、可能的缓存等。如果管理不当,很容易出现资源泄漏或耗尽的问题。

限制单个用户的并发连接数是首要原则。一个用户不应该同时发起多个SSE连接。可以在后端使用用户的sessionId作为key,维护一个Map,当同一个sessionId再次发起连接时,先关闭旧的连接:

private final Map<String, SseEmitter> sessionEmitters = new ConcurrentHashMap<>();

public SseEmitter createStream(String sessionId, String message) {
    // 先关闭旧连接
    SseEmitter oldEmitter = sessionEmitters.get(sessionId);
    if (oldEmitter != null) {
        oldEmitter.complete();
        sessionEmitters.remove(sessionId);
    }

    SseEmitter newEmitter = new SseEmitter(Long.MAX_VALUE);
    sessionEmitters.put(sessionId, newEmitter);

    // ... 处理逻辑 ...

    newEmitter.onCompletion(() -> sessionEmitters.remove(sessionId));
    newEmitter.onTimeout(() -> sessionEmitters.remove(sessionId));
    newEmitter.onError(e -> sessionEmitters.remove(sessionId));

    return newEmitter;
}

定期心跳机制可以防止空闲连接被中间设备(如负载均衡器、防火墙)关闭。虽然HTTP长连接本身是持久的,但许多网络设备有空闲超时机制,一段时间没有数据传输的连接可能会被悄悄断开。定期发送ping消息可以保持连接的活跃:

// 定期发送ping
ScheduledExecutorService heartbeatScheduler = Executors.newScheduledThreadPool(1);
heartbeatScheduler.scheduleAtFixedRate(() -> {
    try {
        emitter.send(SseEmitter.event()
            .name("ping")
            .data(""));
    } catch (IOException e) {
        // 连接可能已关闭
        heartbeatScheduler.shutdown();
    }
}, 30, 30, TimeUnit.SECONDS);

数据格式设计

SSE的数据格式设计直接影响前端处理的复杂度和用户体验。一个好的数据格式应该语义清晰、易于解析、扩展性强:

// 普通文本消息
{"content": "这是回复的文本内容", "type": "text"}

// 带Markdown格式的文本
{"content": "**粗体** 和 *斜体*", "type": "markdown"}

// 代码块
{"content": "console.log('Hello')", "type": "code", "language": "javascript"}

// 思考过程(可选显示)
{"content": "让我想想这个问题...", "type": "thinking"}

// 流式结束信号
{"done": true}

// 错误信号
{"error": "服务暂时不可用"}

前端流式渲染优化

在前端实现AI对话时,除了基本的打字机效果,还有几个可以显著提升体验的优化点:

流式Markdown渲染。如果AI返回的是Markdown格式的文本,逐字渲染Markdown会让文本看起来非常奇怪。更好的做法是先对Markdown进行语法分析,只在完整的代码块或列表结束后再渲染,而不是逐字渲染:

class MarkdownRenderer {
    constructor() {
        this.buffer = '';
        this.pendingTags = [];
    }

    addChunk(char) {
        this.buffer += char;

        // 检测是否是完整的语法单元
        if (this.isCompleteUnit()) {
            return this.flush();
        }
        return '';
    }

    isCompleteUnit() {
        // 简化的检测逻辑
        // 实际上需要更复杂的Markdown解析
        return this.buffer.length > 50 ||
               (this.buffer.endsWith('\n\n')) ||
               (this.buffer.endsWith('```'));
    }

    flush() {
        const result = this.renderMarkdown(this.buffer);
        this.buffer = '';
        return result;
    }
}

减少重渲染。Vue和React的响应式系统在每次状态更新时都会进行Virtual DOM diff。在高频更新的场景下(如逐字渲染),这可能造成性能问题。可以使用节流(throttle)来减少更新频率:

const throttledUpdate = _.throttle((content) => {
    messages[aiMessageIndex].content = content;
}, 50);

安全性考虑

输入验证。虽然SSE是服务器向客户端推送数据,但用户输入仍然是需要验证的。恶意用户可能通过特殊的输入触发服务器端的异常:

@PostMapping("/chat")
public SseEmitter chat(@RequestBody @Valid ChatRequest request) {
    // 验证消息长度
    if (request.getMessage().length() > 2000) {
        throw new IllegalArgumentException("消息长度不能超过2000字符");
    }

    // 过滤特殊字符
    String sanitizedMessage = sanitizers.html().strip(request.getMessage());
    // ... 处理逻辑
}

速率限制。防止用户恶意频繁发起请求,耗尽服务器资源:

@Service
public class RateLimitService {
    private final Map<String, AtomicInteger> requestCounts = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleanup = Executors.newSingleThreadScheduledExecutor();

    public RateLimitService() {
        // 每分钟清理一次
        cleanup.scheduleAtFixedRate(() -> requestCounts.clear(), 1, 1, TimeUnit.MINUTES);
    }

    public boolean isAllowed(String sessionId) {
        AtomicInteger count = requestCounts.computeIfAbsent(sessionId, k -> new AtomicInteger());
        return count.incrementAndGet() <= 60; // 每分钟最多60次请求
    }
}

────────────────────────────────────────────────────────────

总结

SSE(Server-Sent Events)作为HTML5引入的服务器推送技术,在AI对话、实时通知、监控系统等场景中展现出了独特的优势。它以HTTP协议为基础,实现简单、兼容性良好、天然支持自动重连,特别适合服务器向客户端单向推送数据的场景。

本文从技术选型出发,深入分析了SSE与WebSocket、长短轮询的适用场景差异,帮助读者在项目初期做出正确的技术决策。随后,我们详细讲解了SSE的技术原理、数据格式,以及与HTTP协议的关系,为读者打下坚实的理论基础。

在后端实现部分,我们通过Spring Boot的SseEmitter API,展示了从简单到企业级的多种实现方案,包括异步线程池配置、错误处理、熔断器集成等生产环境必备要素。前端部分则同时介绍了EventSource API和Fetch + ReadableStream两种方案,让读者可以根据实际需求灵活选择。

打字机效果是AI对话应用的核心用户体验。我们不仅实现了基础的逐字显示,还深入探讨了性能优化、虚拟滚动、Markdown渲染等进阶话题。错误处理与重连机制、Nginx配置细节等生产部署必须考虑的问题,也在文中得到了详尽的阐述。

最后,通过一个完整的Spring Boot + Vue3实战项目,我们将所有知识点串联起来,展示了从设计到实现的完整流程。希望本文能够帮助读者全面掌握SSE流式响应开发的核心技能,在实际项目中游刃有余。

AI交互体验正在快速演进,SSE作为一种成熟稳定的技术方案,将继续在实时通信领域发挥重要作用。掌握这项技术,你将能够构建出更加流畅、自然的AI对话应用,为用户带来前所未有的交互体验。

────────────────────────────────────────────────────────────

附:配套技术图解

![SSE技术架构图](sse_architecture.png)

图1:SSE技术架构图

![SSE流式响应完整流程图](sse_workflow.png)

图2:SSE流式响应完整流程图

![前端EventSource打字机效果实现图](sse_frontend.png)

图3:前端EventSource打字机效果实现图

![SSE与WebSocket/长轮询对比图](sse_comparison.png)

图4:SSE与WebSocket/长轮询对比分析图

附:配套技术图解

SSE技术架构图

图1:SSE技术架构图(展示Server-Sent Events的工作原理)

SSE流式响应完整流程图

图2:SSE流式响应完整流程图(从请求到响应的全链路解析)

前端EventSource打字机效果实现

图3:前端EventSource API与Fetch + ReadableStream实现对比

SSE与WebSocket/长轮询对比

图4:三种实时通信技术全面对比分析

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐