本教程将带你从零开始构建一个基于 AgentScope Java 的人机协作(Human-in-the-Loop)聊天系统。通过这个项目,你将掌握如何创建可与用户实时交互、支持工具动态配置、并能中断和确认危险操作的智能对话代理。

📦 配套代码: 所有 DTO、前端完整代码请参考 https://gitee.com/CodeMao01/hitl-chat

一、项目概述

1.1、什么是 HITL Chat?

HITL(Human-in-the-Loop)Chat 是一个展示人机协作模式的智能对话示例项目。在这个系统中,AI 助手不仅可以自主调用工具完成任务,还能在遇到高风险操作时主动暂停,等待用户确认后再继续执行。

应用场景

  • 🔐 敏感操作审批:删除文件、访问隐私数据等需要人工确认
  • 🛠️ 动态工具集成:运行时添加/移除 MCP(Model Context Protocol)服务器
  • 💬 交互式调试:随时中断 AI 执行,调整策略后继续
  • 🎯 可控自动化:用户可控制哪些工具需要审批,哪些可以自动执行

1.2、核心特性

1. 动态 MCP 工具配置

支持在运行时动态添加和移除 MCP 服务器,无需重启应用即可扩展 AI 能力。

2. 会话中断支持

用户可随时中断正在执行的对话,AI 会立即停止当前操作并保存状态。

3. 危险工具确认机制

对标记为"危险"的工具调用,AI 会暂停并等待用户明确确认或拒绝。

4. 内置实用工具

  • get_time:获取当前时间
  • random_number:生成随机数

5. 实时 SSE 流式通信

使用 Server-Sent Events 实现服务端向客户端的实时消息推送。


1.3、技术栈

技术领域 技术选型 说明
后端框架 Spring Boot 4.x 轻量级 Web 应用框架
Web 引擎 Spring WebFlux 响应式 Web 框架,支持异步流处理
AI 框架 AgentScope Core 1.0.10 阿里巴巴开源的多智能体协作框架
大模型 Qwen(通义千问) 通过 DashScope API 或 Ollama 本地部署
通信协议 SSE (Server-Sent Events) 单向实时消息推送
MCP 支持 Model Context Protocol 标准化的工具集成协议
前端技术 Vanilla JavaScript 原生 JS,无框架依赖
构建工具 Maven 项目依赖管理和构建

二、项目搭建

  1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>4.1.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>hitl-chat</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hitl-chat</name>
    <description>hitl-chat</description>

    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.agentscope</groupId>
            <artifactId>agentscope-core</artifactId>
            <version>1.0.10</version>
        </dependency>

        <dependency>
            <groupId>io.github.cdimascio</groupId>
            <artifactId>dotenv-java</artifactId>
            <version>3.2.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <annotationProcessorPaths>
                                <path>
                                    <groupId>org.projectlombok</groupId>
                                    <artifactId>lombok</artifactId>
                                </path>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                    <execution>
                        <id>default-testCompile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <annotationProcessorPaths>
                                <path>
                                    <groupId>org.projectlombok</groupId>
                                    <artifactId>lombok</artifactId>
                                </path>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

  1. yaml配置
spring:
  application:
    name: hitl-chat
  servlet:
    encoding:
      charset: UTF-8
      force: true
      enabled: true

server:
  port: 8082
  1. 启动类(获取.env配置)
package com.example.hitlchat;

import io.github.cdimascio.dotenv.Dotenv;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class HitlChatApplication {

    public static void main(String[] args) {
        // 加载.env文件
        Dotenv load = Dotenv.configure().ignoreIfMissing().load();
        // 把.env变量设置成系统属性
        load.entries().forEach(entry -> System.setProperty(entry.getKey(), entry.getValue()));
        SpringApplication.run(HitlChatApplication.class, args);
        printStartupInfo();
    }

    private static void printStartupInfo() {
        System.out.println("\n=== HITL Chat Application Started ===");
        System.out.println("Open: http://localhost:8802");
        System.out.println("\nFeatures:");
        System.out.println("  - Dynamic MCP tool configuration");
        System.out.println("  - Agent interruption support");
        System.out.println("  - Dangerous tool confirmation");
        System.out.println("  - Built-in tools: get_time, list_files, random_number");
        System.out.println("\nPress Ctrl+C to stop.");
    }

}

  1. chatModel配置
package com.example.hitlchat.config;

import com.example.hitlchat.tool.BuiltinTools;
import io.agentscope.core.formatter.dashscope.DashScopeChatFormatter;
import io.agentscope.core.formatter.ollama.OllamaChatFormatter;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.model.ollama.OllamaOptions;
import io.agentscope.core.model.ollama.ThinkOption;
import io.agentscope.core.tool.Toolkit;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AiConfig {

    @Bean
    public OllamaChatModel ollamaChatModel() {
        return OllamaChatModel.builder()
                .modelName("qwen3.5:0.8b")
                .defaultOptions(OllamaOptions.builder()
                        .thinkOption(ThinkOption.ThinkBoolean.DISABLED)
                        .build())
                // 格式话请求和Ollama的请求
                .formatter(new OllamaChatFormatter())
                .build();
    }

    @Bean
    public DashScopeChatModel dashScopeChatModel() {
        return DashScopeChatModel.builder()
                .apiKey(System.getProperty("DASHSCOPE_KEY"))
                .modelName(System.getProperty("DASHSCOPE_MODEL_NAME"))
                .baseUrl(System.getProperty("DASHSCOPE_BASE_URL"))
                .enableThinking(false)
                .stream(true)
                .formatter(new DashScopeChatFormatter())
                .build();
    }
}

  1. ReactAgent服务配置
package com.example.hitlchat.service;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;

    public ReActAgent createAgent() {
        return ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .build();
    }
}

  1. 测试接口
package com.example.hitlchat.controller;

import com.example.hitlchat.service.AgentService;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.message.Msg;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@AllArgsConstructor
@RestController
@RequestMapping("/api")
public class ChatController {

    private final AgentService agentService;

    @PostMapping("/chat")
    public String chat(String question) {
        ReActAgent agent = agentService.createAgent();
        Msg block = agent.call(Msg.builder().textContent(question).build()).block();
        return block.getTextContent();
    }
}

三、内置工具

3.1、基本使用

  1. 工具创建
package com.example.hitlchat.tool;

import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.tool.Tool;
import io.agentscope.core.tool.ToolParam;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class BuiltinTools {

    /**
     * 1. **获取当前时间**:调用 `LocalDateTime.now()` 获取系统当前日期和时间
     * 2. **创建格式化器**:使用 `DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")` 定义输出格式
     * 3. **格式化时间字符串**:调用 `now.format(formatter)` 将时间对象转换为可读字符串
     * 4. **封装返回结果**:使用 `ToolResultBlock.text()` 包装成工具响应格式
     * 5. **返回给 AI**:AI 接收到格式化的时间信息并展示给用户
     *
     * @return 时间字符串
     */
    @Tool(name = "get_current_time", description = "获取当前时间")
    public ToolResultBlock getCurrentTime() {
        return ToolResultBlock.text("当前时间:" + LocalDateTime.now()
                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }

    /**
     *
         1. **参数验证**:检查 `min > max` 是否成立
         2. **错误处理**:如果参数无效,返回错误信息 `ToolResultBlock.error()`
         3. **生成随机数**:调用 `random.nextInt(max - min + 1)` 生成范围内的随机整数
         4. **偏移计算**:加上 `min` 确保结果在 `[min, max]` 区间内
         5. **格式化输出**:构建包含范围和结果的描述性文本
         6. **返回结果**:使用 `ToolResultBlock.text()` 封装后返回
     */
    @Tool(name = "get_random_number", description = "获取一个随机数")
    public ToolResultBlock getRandomNumber(@ToolParam(name = "最小值", description = "最小值(包含最小值)") int min,
                                           @ToolParam(name = "最大值", description = "最大值(包含最大值)") int max) {
        if (min > max) {
            return ToolResultBlock.error("参数错误:最小值不能大于最大值");
        }
        int randomNum = (int) (Math.random() * (max - min + 1)) + min;
        return ToolResultBlock.text("随机数:" + randomNum + ",范围:" + min + "~" + max);
    }

}

  1. 工具注入配置(只需要项目启动的时候注册)
@Bean
public Toolkit toolkit() {
    Toolkit toolkit = new Toolkit();
    toolkit.registerTool(new BuiltinTools());
    return toolkit;
}
  1. 整合ReactAgent
package com.example.hitlchat.service;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final Toolkit toolkit;

    public ReActAgent createAgent() {
        return ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .toolkit(toolkit)
                .build();
    }
}

3.2、调整流式输出

  1. ReactAgent增加流式
package com.example.hitlchat.service;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final Toolkit toolkit;

    public ReActAgent createAgent() {
        return ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .toolkit(toolkit)
                .build();
    }

    public Flux<String> chat(String message) {
        ReActAgent agent = createAgent();
        return agent.stream(Msg.builder().textContent(message).build())
                .flatMap(event -> {
                    Msg msg = event.getMessage();
                    switch (event.getType()) {
                        // 只需要返回思考的过程的文本内容, 其他类型比如工具调用等等不需要返回
                        case REASONING -> {
                            List<TextBlock> textBlocks = msg.getContentBlocks(TextBlock.class);
                            String content = Optional.ofNullable(textBlocks).stream()
                                    .filter(v -> !v.isEmpty())
                                    .flatMap(List::stream)
                                    .map(TextBlock::getText)
                                    .collect(Collectors.joining());
                            if (StringUtils.hasText(content)) {
                                return Flux.just(content);
                            }
                        }
                    }
                    return null;
                });
    }
}

  1. 接口调整
/**
 * 为什么返回SSE, 而不是Flux<String>
 * 1. 消息可能包含换行符、JSON、代码块等复杂内容
 * 2. 需要区分不同类型的事件(思考、答案、错误等)
 * 3. 希望前端代码简洁易维护
 * 4. 需要断线重连支持
 * 5. 生产环境的最佳实践
 *
 * @param question
 * @return
 */
@PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chat(String question) {
    return agentService.chat(question).map(v -> ServerSentEvent.<String>builder().data(v).build());
}

3.3、调整接口请求和响应参数

前端页面可以贴进去了

  1. 请求和响应类
package com.example.hitlchat.dto;

/**
 * Chat request DTO.
 */
public class ChatRequest {

    private String message;
    private String sessionId;

    public ChatRequest() {}

    public ChatRequest(String message, String sessionId) {
        this.message = message;
        this.sessionId = sessionId;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getSessionId() {
        return sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }
}

package com.example.hitlchat.dto;

import java.util.List;
import java.util.Map;

/**
 * Chat event sent to frontend via SSE.
 */
public class ChatEvent {

    /** Event type: TEXT, TOOL_USE, TOOL_RESULT, TOOL_CONFIRM, ERROR, COMPLETE. */
    private String type;

    /** Text content for TEXT events. */
    private String content;

    /** Tool name for TOOL_USE/TOOL_RESULT events. */
    private String toolName;

    /** Tool ID for TOOL_USE/TOOL_RESULT events. */
    private String toolId;

    /** Tool input parameters for TOOL_USE events. */
    private Map<String, Object> toolInput;

    /** Tool result for TOOL_RESULT events. */
    private String toolResult;

    /** Pending tool calls for TOOL_CONFIRM events. */
    private List<PendingToolCall> pendingToolCalls;

    /** Error message for ERROR events. */
    private String error;

    /** Indicates if this is incremental content. */
    private boolean incremental;

    public ChatEvent() {}

    public static ChatEvent text(String content, boolean incremental) {
        ChatEvent event = new ChatEvent();
        event.type = "TEXT";
        event.content = content;
        event.incremental = incremental;
        return event;
    }

    public static ChatEvent toolUse(String toolId, String toolName, Map<String, Object> input) {
        ChatEvent event = new ChatEvent();
        event.type = "TOOL_USE";
        event.toolId = toolId;
        event.toolName = toolName;
        event.toolInput = input;
        return event;
    }

    public static ChatEvent toolResult(String toolId, String toolName, String result) {
        ChatEvent event = new ChatEvent();
        event.type = "TOOL_RESULT";
        event.toolId = toolId;
        event.toolName = toolName;
        event.toolResult = result;
        return event;
    }

    public static ChatEvent toolConfirm(List<PendingToolCall> pendingToolCalls) {
        ChatEvent event = new ChatEvent();
        event.type = "TOOL_CONFIRM";
        event.pendingToolCalls = pendingToolCalls;
        return event;
    }

    public static ChatEvent error(String error) {
        ChatEvent event = new ChatEvent();
        event.type = "ERROR";
        event.error = error;
        return event;
    }

    public static ChatEvent complete() {
        ChatEvent event = new ChatEvent();
        event.type = "COMPLETE";
        return event;
    }

    public static ChatEvent interrupted(String message) {
        ChatEvent event = new ChatEvent();
        event.type = "INTERRUPTED";
        event.content = message;
        return event;
    }

    // Getters and setters

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getToolName() {
        return toolName;
    }

    public void setToolName(String toolName) {
        this.toolName = toolName;
    }

    public String getToolId() {
        return toolId;
    }

    public void setToolId(String toolId) {
        this.toolId = toolId;
    }

    public Map<String, Object> getToolInput() {
        return toolInput;
    }

    public void setToolInput(Map<String, Object> toolInput) {
        this.toolInput = toolInput;
    }

    public String getToolResult() {
        return toolResult;
    }

    public void setToolResult(String toolResult) {
        this.toolResult = toolResult;
    }

    public List<PendingToolCall> getPendingToolCalls() {
        return pendingToolCalls;
    }

    public void setPendingToolCalls(List<PendingToolCall> pendingToolCalls) {
        this.pendingToolCalls = pendingToolCalls;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public boolean isIncremental() {
        return incremental;
    }

    public void setIncremental(boolean incremental) {
        this.incremental = incremental;
    }

    /**
     * Pending tool call information for confirmation.
     */
    public static class PendingToolCall {

        private String id;
        private String name;
        private Map<String, Object> input;
        private boolean dangerous;

        public PendingToolCall() {}

        public PendingToolCall(
                String id, String name, Map<String, Object> input, boolean dangerous) {
            this.id = id;
            this.name = name;
            this.input = input;
            this.dangerous = dangerous;
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Map<String, Object> getInput() {
            return input;
        }

        public void setInput(Map<String, Object> input) {
            this.input = input;
        }

        public boolean isDangerous() {
            return dangerous;
        }

        public void setDangerous(boolean dangerous) {
            this.dangerous = dangerous;
        }
    }
}

  1. controller调整
package com.example.hitlchat.controller;

import com.example.hitlchat.dto.ChatEvent;
import com.example.hitlchat.dto.ChatRequest;
import com.example.hitlchat.service.AgentService;
import lombok.AllArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@AllArgsConstructor
@RestController
@RequestMapping("/api")
public class ChatController {

    private final AgentService agentService;

    /**
     * 为什么返回SSE, 而不是Flux<String>
     * 1. 消息可能包含换行符、JSON、代码块等复杂内容
     * 2. 需要区分不同类型的事件(思考、答案、错误等)
     * 3. 希望前端代码简洁易维护
     * 4. 需要断线重连支持
     * 5. 生产环境的最佳实践
     *
     * @param chatRequest
     * @return Flux<ServerSentEvent<ChatEvent>>
     */
    @PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<ChatEvent>> chat(@RequestBody ChatRequest chatRequest) {
        String sessionId = chatRequest.getSessionId();
        if (!StringUtils.hasText(sessionId)) {
            sessionId = "default";
        }
        return agentService.chat(sessionId, chatRequest.getMessage())
                .map(v -> ServerSentEvent.<ChatEvent>builder().data(v).build());
    }
}

  1. service中处理响应结果
package com.example.hitlchat.service;

import com.example.hitlchat.dto.ChatEvent;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.agentscope.core.agent.EventType.REASONING;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final Toolkit toolkit;

    public ReActAgent createAgent() {
        return ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .toolkit(toolkit)
                .build();
    }

    public Flux<ChatEvent> chat(String sessionId, String message) {
        ReActAgent agent = createAgent();
        return agent.stream(Msg.builder().textContent(message).build())
                .flatMap(this::convertEventToChatEvent)
                .concatWith(Flux.just(ChatEvent.complete()));
    }

    private Flux<ChatEvent> convertEventToChatEvent(Event event) {
        Msg msg = event.getMessage();
        List<ChatEvent> events = new ArrayList<>();
        switch (event.getType()) {
            // 只需要返回思考的过程的文本内容, 其他类型比如工具调用等等不需要返回
            case REASONING -> {
                String content = extraText(msg);
                if (StringUtils.hasText(content)) {
                    // event.isLast() 不是最好则还是流式输出
                    events.add(ChatEvent.text(content, !event.isLast()));
                }
            }
        }
        // 把集合转换成响应式流
        return Flux.fromIterable(events);
    }

    private String extraText(Msg msg) {
        List<TextBlock> textBlocks = msg.getContentBlocks(TextBlock.class);
        return Optional.ofNullable(textBlocks).stream()
                .filter(v -> !v.isEmpty())
                .flatMap(List::stream)
                .map(TextBlock::getText)
                .collect(Collectors.joining());
    }
}

3.4、工具调用和结果展示

  1. AgentService.convertEventToChatEvent方法调整
package com.example.hitlchat.service;

import com.example.hitlchat.dto.ChatEvent;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.message.*;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

import java.util.*;
import java.util.stream.Collectors;

import static io.agentscope.core.agent.EventType.REASONING;
import static io.agentscope.core.agent.EventType.TOOL_RESULT;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final Toolkit toolkit;

    public ReActAgent createAgent() {
        return ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .toolkit(toolkit)
                .build();
    }

    public Flux<ChatEvent> chat(String sessionId, String message) {
        ReActAgent agent = createAgent();
        return agent.stream(Msg.builder().textContent(message).build())
                .flatMap(this::convertEventToChatEvent)
                .concatWith(Flux.just(ChatEvent.complete()));
    }

    private Flux<ChatEvent> convertEventToChatEvent(Event event) {
        Msg msg = event.getMessage();
        List<ChatEvent> events = new ArrayList<>();
        switch (event.getType()) {
            // 只需要返回思考的过程的文本内容, 其他类型比如工具调用等等不需要返回
            case REASONING -> {
                if (event.isLast() && msg.hasContentBlocks(ToolUseBlock.class)) {
                    // 处理工具返回信息
                    List<ToolUseBlock> toolUseBlocks = msg.getContentBlocks(ToolUseBlock.class);
                    for (ToolUseBlock toolUseBlock : toolUseBlocks) {
                        events.add(ChatEvent.toolUse(toolUseBlock.getId(),
                                toolUseBlock.getName(),
                                toolUseBlock.getInput()));
                    }
                } else {
                    String content = extraText(msg);
                    if (StringUtils.hasText(content)) {
                        // event.isLast() 不是最好则还是流式输出
                        events.add(ChatEvent.text(content, !event.isLast()));
                    }
                }
            }
            case TOOL_RESULT -> {
                List<ToolResultBlock> toolResultBlocks = msg.getContentBlocks(ToolResultBlock.class);
                for (ToolResultBlock toolResultBlock : toolResultBlocks) {
                    events.add(ChatEvent.toolResult(
                            toolResultBlock.getId(),
                            toolResultBlock.getName(),
                            convertOutput(toolResultBlock.getOutput())));
                }
            }
        }
        // 把集合转换成响应式流
        return Flux.fromIterable(events);
    }

    private String convertOutput(List<ContentBlock> output) {
        if (output == null || output.isEmpty()) {
            return "";
        }

        return output.stream()
                .filter(v -> v instanceof TextBlock)
                .map(v -> ((TextBlock) v).getText())
                .collect(Collectors.joining());
    }


    private String extraText(Msg msg) {
        List<TextBlock> textBlocks = msg.getContentBlocks(TextBlock.class);
        return Optional.ofNullable(textBlocks).stream()
                .filter(v -> !v.isEmpty())
                .flatMap(List::stream)
                .map(TextBlock::getText)
                .collect(Collectors.joining());
    }

    public Set<String> getToolsName() {
        return toolkit.getToolNames();
    }
}

  1. 增加工具name返回接口
@GetMapping("/tools")
public ResponseEntity<Set<String>> getTools() {
    return ResponseEntity.ok(agentService.getToolsName());
}

3.5、Session管理

  1. 内存Session(加载、保存、常用方法)
package com.example.hitlchat.service;

import com.example.hitlchat.dto.ChatEvent;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.message.*;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.session.InMemorySession;
import io.agentscope.core.session.Session;
import io.agentscope.core.state.SessionKey;
import io.agentscope.core.state.SimpleSessionKey;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final Toolkit toolkit;
    private final Session session = new InMemorySession();

    public ReActAgent createAgent(String sessionId) {
        ReActAgent agent = ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .toolkit(toolkit)
                .build();
        agent.loadIfExists(session, sessionId);
        return agent;
    }

    public Flux<ChatEvent> chat(String sessionId, String message) {
        ReActAgent agent = createAgent(sessionId);
        return agent.stream(Msg.builder().textContent(message).build())
                .flatMap(this::convertEventToChatEvent)
                .concatWith(Flux.just(ChatEvent.complete()))
                .doFinally(signal -> {
                    agent.saveTo(session, sessionId);
                })
                .onErrorResume(error ->
                        Flux.just(ChatEvent.error(error.getMessage()), ChatEvent.complete())
                );
    }

    public void clearSession(String sessionId) {
        session.delete(SimpleSessionKey.of(sessionId));
    }

    public boolean existsSession(String sessionId) {
        return session.exists(SimpleSessionKey.of(sessionId));
    }

    public Set<SessionKey> listSessionKeys() {
        return session.listSessionKeys();
    }

    private Flux<ChatEvent> convertEventToChatEvent(Event event) {
        Msg msg = event.getMessage();
        List<ChatEvent> events = new ArrayList<>();
        switch (event.getType()) {
            // 只需要返回思考的过程的文本内容, 其他类型比如工具调用等等不需要返回
            case REASONING -> {
                if (event.isLast() && msg.hasContentBlocks(ToolUseBlock.class)) {
                    // 处理工具返回信息
                    List<ToolUseBlock> toolUseBlocks = msg.getContentBlocks(ToolUseBlock.class);
                    for (ToolUseBlock toolUseBlock : toolUseBlocks) {
                        events.add(ChatEvent.toolUse(toolUseBlock.getId(),
                                toolUseBlock.getName(),
                                toolUseBlock.getInput()));
                    }
                } else {
                    String content = extraText(msg);
                    if (StringUtils.hasText(content)) {
                        // event.isLast() 不是最好则还是流式输出
                        events.add(ChatEvent.text(content, !event.isLast()));
                    }
                }
            }
            case TOOL_RESULT -> {
                List<ToolResultBlock> toolResultBlocks = msg.getContentBlocks(ToolResultBlock.class);
                for (ToolResultBlock toolResultBlock : toolResultBlocks) {
                    events.add(ChatEvent.toolResult(
                            toolResultBlock.getId(),
                            toolResultBlock.getName(),
                            convertOutput(toolResultBlock.getOutput())));
                }
            }
        }
        // 把集合转换成响应式流
        return Flux.fromIterable(events);
    }

    private String convertOutput(List<ContentBlock> output) {
        if (output == null || output.isEmpty()) {
            return "";
        }

        return output.stream()
                .filter(v -> v instanceof TextBlock)
                .map(v -> ((TextBlock) v).getText())
                .collect(Collectors.joining());
    }


    private String extraText(Msg msg) {
        List<TextBlock> textBlocks = msg.getContentBlocks(TextBlock.class);
        return Optional.ofNullable(textBlocks).stream()
                .filter(v -> !v.isEmpty())
                .flatMap(List::stream)
                .map(TextBlock::getText)
                .collect(Collectors.joining());
    }

    public Set<String> getToolsName() {
        return toolkit.getToolNames();
    }
}

3.6、中断流式接口

  1. 中断方法及保存running中的agnet
package com.example.hitlchat.service;

import com.example.hitlchat.dto.ChatEvent;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.message.*;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.session.InMemorySession;
import io.agentscope.core.session.Session;
import io.agentscope.core.state.SessionKey;
import io.agentscope.core.state.SimpleSessionKey;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final Toolkit toolkit;
    private final Session session = new InMemorySession();
    // key:sessionId, value:agent
    private final ConcurrentHashMap<String, ReActAgent> runningAgents = new ConcurrentHashMap<>();

    public ReActAgent createAgent(String sessionId) {
        ReActAgent agent = ReActAgent.builder()
                .name("HITL-chat")
                .model(dashScopeChatModel)
                .toolkit(toolkit)
                .build();
        agent.loadIfExists(session, sessionId);
        return agent;
    }

    public Flux<ChatEvent> chat(String sessionId, String message) {
        ReActAgent agent = createAgent(sessionId);
        runningAgents.put(sessionId, agent);
        return agent.stream(Msg.builder().textContent(message).build())
                .flatMap(this::convertEventToChatEvent)
                .concatWith(Flux.just(ChatEvent.complete()))
                .doFinally(signal -> {
                    agent.saveTo(session, sessionId);
                    runningAgents.remove(sessionId);
                })
                .onErrorResume(error ->
                        Flux.just(ChatEvent.error(error.getMessage()), ChatEvent.complete())
                );
    }

    public void clearSession(String sessionId) {
        session.delete(SimpleSessionKey.of(sessionId));
    }

    public boolean existsSession(String sessionId) {
        return session.exists(SimpleSessionKey.of(sessionId));
    }

    public Set<SessionKey> listSessionKeys() {
        return session.listSessionKeys();
    }

    private Flux<ChatEvent> convertEventToChatEvent(Event event) {
        Msg msg = event.getMessage();
        List<ChatEvent> events = new ArrayList<>();
        switch (event.getType()) {
            // 只需要返回思考的过程的文本内容, 其他类型比如工具调用等等不需要返回
            case REASONING -> {
                if (event.isLast() && msg.hasContentBlocks(ToolUseBlock.class)) {
                    // 处理工具返回信息
                    List<ToolUseBlock> toolUseBlocks = msg.getContentBlocks(ToolUseBlock.class);
                    for (ToolUseBlock toolUseBlock : toolUseBlocks) {
                        events.add(ChatEvent.toolUse(toolUseBlock.getId(),
                                toolUseBlock.getName(),
                                toolUseBlock.getInput()));
                    }
                } else {
                    String content = extraText(msg);
                    if (StringUtils.hasText(content)) {
                        // event.isLast() 不是最好则还是流式输出
                        events.add(ChatEvent.text(content, !event.isLast()));
                    }
                }
            }
            case TOOL_RESULT -> {
                List<ToolResultBlock> toolResultBlocks = msg.getContentBlocks(ToolResultBlock.class);
                for (ToolResultBlock toolResultBlock : toolResultBlocks) {
                    events.add(ChatEvent.toolResult(
                            toolResultBlock.getId(),
                            toolResultBlock.getName(),
                            convertOutput(toolResultBlock.getOutput())));
                }
            }
        }
        // 把集合转换成响应式流
        return Flux.fromIterable(events);
    }

    private String convertOutput(List<ContentBlock> output) {
        if (output == null || output.isEmpty()) {
            return "";
        }

        return output.stream()
                .filter(v -> v instanceof TextBlock)
                .map(v -> ((TextBlock) v).getText())
                .collect(Collectors.joining());
    }


    private String extraText(Msg msg) {
        List<TextBlock> textBlocks = msg.getContentBlocks(TextBlock.class);
        return Optional.ofNullable(textBlocks).stream()
                .filter(v -> !v.isEmpty())
                .flatMap(List::stream)
                .map(TextBlock::getText)
                .collect(Collectors.joining());
    }

    public Set<String> getToolsName() {
        return toolkit.getToolNames();
    }

    public boolean interrupt(String sessionId) {
        ReActAgent agent = runningAgents.get(sessionId);
        if (agent == null) {
            return false;
        }
        agent.interrupt();
        return true;
    }
}

  1. 中断接口
@PostMapping("/chat/interrupt/{sessionId}")
public ResponseEntity<Map<String, Object>> interrupt(@PathVariable("sessionId") String sessionId) {
    boolean interrupt = agentService.interrupt(sessionId);
    return ResponseEntity.ok(Map.of("success", true, "interrupted", interrupt));
}

3.7、危险工具确认

  1. 工具确认hook

危险工具CRUD + event(如果是危险工具则暂停Agent)

package com.example.hitlchat.hook;

import io.agentscope.core.hook.Hook;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.ToolUseBlock;
import reactor.core.publisher.Mono;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ToolConfirmationHook implements Hook {

    private Set<String> dangerousTools;

    public ToolConfirmationHook() {
        dangerousTools = new HashSet<>();
    }

    public ToolConfirmationHook(Set<String> dangerousTools) {
        // 防止操作原数据
        this.dangerousTools = new HashSet<>(dangerousTools);
    }

    /**
     * 添加危险工具
     *
     * @param toolName
     */
    public void addDangerousTool(String toolName) {
        dangerousTools.add(toolName);
    }

    /**
     * 移除危险工具
     *
     * @param toolName
     */
    public void removeDangerousTool(String toolName) {
        dangerousTools.remove(toolName);
    }

    /**
     * 获取危险工具
     *
     * @return Set<String>
     */
    public Set<String> getDangerousTools() {
        return Set.copyOf(dangerousTools);
    }

    /**
     * 判断工具是否危险
     *
     * @param toolName
     * @return boolean
     */
    public boolean isDangerousTool(String toolName) {
        return dangerousTools.contains(toolName);
    }

    /**
     * 设置危险工具
     *
     * @param dangerousTools
     */
    public void setDangerousTools(Set<String> dangerousTools) {
        this.dangerousTools.clear();
        if (dangerousTools == null) {
            return;
        }
        this.dangerousTools.addAll(dangerousTools);

    }


    /**
     * 监听事件 如果工具使用了危险工具则暂停Agent
     *
     * @param event
     * @return Mono<T>
     */
    @Override
    public <T extends HookEvent> Mono<T> onEvent(T event) {
        if (event instanceof PostReasoningEvent postReasoningEvent) {
            Msg msg = postReasoningEvent.getReasoningMessage();
            if (msg == null) {
                return Mono.just(event);
            }
            // 获取工具使用信息
            List<ToolUseBlock> toolUseBlocks = msg.getContentBlocks(ToolUseBlock.class);
            // 判断是否包含危险工具 如果包含则暂停
            boolean isDangerous = toolUseBlocks.stream()
                    .anyMatch(tool -> dangerousTools.contains(tool.getName()));
            if (isDangerous) {
                postReasoningEvent.stopAgent();
                return Mono.just(event);
            }
        }
        return Mono.just(event);
    }
}

  1. AgentService
  • Agent添加hook
  • 确认和取消逻辑
package com.example.hitlchat.service;

import com.example.hitlchat.dto.ChatEvent;
import com.example.hitlchat.dto.ToolConfirmRequest;
import com.example.hitlchat.hook.ToolConfirmationHook;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.message.*;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.OllamaChatModel;
import io.agentscope.core.session.InMemorySession;
import io.agentscope.core.session.Session;
import io.agentscope.core.state.SessionKey;
import io.agentscope.core.state.SimpleSessionKey;
import io.agentscope.core.tool.Toolkit;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import tools.jackson.core.type.TypeReference;
import tools.jackson.databind.ObjectMapper;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@AllArgsConstructor
@Service
public class AgentService {

    private final OllamaChatModel ollamaChatModel;
    private final DashScopeChatModel dashScopeChatModel;
    private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final Toolkit toolkit;
    private final Session session = new InMemorySession();
    // key:sessionId, value:agent
    private final ConcurrentHashMap<String, ReActAgent> runningAgents = new ConcurrentHashMap<>();

    @Getter
    private final ToolConfirmationHook toolConfirmationHook = new ToolConfirmationHook(Set.of("get_random_number"));


    public ReActAgent createAgent(String sessionId) {
        ReActAgent agent = ReActAgent.builder()
                // agentName 不设置会报错
                .name("HITL-chat")
                // 使用的模型 本身线程安全
                .model(dashScopeChatModel)
                // 工具集 使用copy方法 保证线程安装
                .toolkit(toolkit.copy())
                // 默认就是内存记忆
//                .memory(new InMemoryMemory())
                .hook(toolConfirmationHook)
                .build();
        // 加载session
        agent.loadIfExists(session, sessionId);
        return agent;
    }

    public Flux<ChatEvent> chat(String sessionId, String message) {
        // 每次会话都创建一个Agent
        ReActAgent agent = createAgent(sessionId);
        // 存储正在运行的Agent, 可以进行中断
        runningAgents.put(sessionId, agent);
        return handleStreamResult(agent,
                sessionId,
                agent.stream(Msg.builder().textContent(message).build()));

    }

    /**
     * 处理流式输出结果
     *
     * @param agent
     * @param sessionId
     * @param eventFlux
     * @return Flux<ChatEvent>
     */
    private Flux<ChatEvent> handleStreamResult(ReActAgent agent,
                                               String sessionId,
                                               Flux<Event> eventFlux) {
        return eventFlux
                // 处理响应数据, 把event转换成前端接受的ChatEvent
                .flatMap(this::convertEventToChatEvent)
                // 流式输出完成后追加一个完成消息的标志, 方便前端处理
                .concatWith(Flux.just(ChatEvent.complete()))
                // 流式输出结束会调用此方法
                .doFinally(signal -> {
                    // 流式输出完成后保存对话内容到session中
                    agent.saveTo(session, sessionId);
                    // 流式输出完成后, 删除正在运行的Agent
                    runningAgents.remove(sessionId);
                })
                // 抛出异常执行此方法
                .onErrorResume(error ->
                        // 两个事件, 一个是错误信息, 一个是完成信息
                        Flux.just(ChatEvent.error(error.getMessage()), ChatEvent.complete())
                );

    }

    public Flux<ChatEvent> confirmTool(String sessionId,
                                       Boolean confirmed,
                                       String reason,
                                       List<ToolConfirmRequest.ToolCallInfo> toolCalls) {
        ReActAgent agent = createAgent(sessionId);
        if (confirmed) {
            // 工具调用确认逻辑
            return handleStreamResult(agent, sessionId, agent.stream(StreamOptions.defaults()));
        } else {
            // 工具调用取消逻辑
            String cancelMessage = reason == null ? "用户取消工具调用" : reason;
            // 返回结果Block
            ToolResultBlock[] resultBlocks = toolCalls.stream().map(tool ->
                    ToolResultBlock.builder()
                            .id(tool.getId())
                            .name(tool.getName())
                            .output(TextBlock.builder().text(cancelMessage).build())
                            .build()
            ).toArray(ToolResultBlock[]::new);
            Msg msg = Msg.builder().role(MsgRole.TOOL).content(resultBlocks).build();
            return handleStreamResult(agent, sessionId, agent.stream(msg));

        }
    }

    /**
     * 根据sessionId删除session
     *
     * @param sessionId
     */
    public void clearSession(String sessionId) {
        session.delete(SimpleSessionKey.of(sessionId));
    }

    /**
     * 判断sessionId是否存在
     *
     * @param sessionId
     * @return boolean
     */
    public boolean existsSession(String sessionId) {
        return session.exists(SimpleSessionKey.of(sessionId));
    }

    /**
     * 列出所有sessionId
     *
     * @return Set<SessionKey>
     */
    public Set<SessionKey> listSessionKeys() {
        return session.listSessionKeys();
    }

    /**
     * 把event转换成ChatEvent
     *
     * @param event
     * @return Flux<ChatEvent>
     */
    private Flux<ChatEvent> convertEventToChatEvent(Event event) {
        // 获取消息
        Msg msg = event.getMessage();
        // 因为会有多个事件
        List<ChatEvent> events = new ArrayList<>();
        switch (event.getType()) {
            // 推理事件, 只需要返回思考的过程的文本内容, 其他类型比如工具调用等等不需要返回
            case REASONING -> {
                // 如果是最后一条消息, 并且有ToolUseBlock, 则将ToolUseBlock添加到events中
                if (event.isLast() && msg.hasContentBlocks(ToolUseBlock.class)) {
                    // 处理工具返回信息
                    List<ToolUseBlock> toolUseBlocks = msg.getContentBlocks(ToolUseBlock.class);
                    boolean isDangerous = toolUseBlocks.stream().anyMatch(toolUseBlock ->
                            toolConfirmationHook.isDangerousTool(toolUseBlock.getName()));

                    if (isDangerous) {
                        List<ChatEvent.PendingToolCall> pendingToolCalls = toolUseBlocks.stream()
                                .map(toolUseBlock ->
                                        new ChatEvent.PendingToolCall(
                                                toolUseBlock.getId(),
                                                toolUseBlock.getName(),
                                                convertInput(toolUseBlock.getInput()),
                                                toolConfirmationHook.isDangerousTool(toolUseBlock.getName()))
                                ).toList();
                        events.add(ChatEvent.toolConfirm(pendingToolCalls));

                    } else {
                        for (ToolUseBlock toolUseBlock : toolUseBlocks) {
                            events.add(ChatEvent.toolUse(toolUseBlock.getId(),
                                    toolUseBlock.getName(),
                                    convertInput(toolUseBlock.getInput())));
                        }
                    }
                } else {
                    // 获取文本内容
                    String content = extraText(msg);
                    // 如果有文本内容, 则添加到events中
                    if (StringUtils.hasText(content)) {
                        // event.isLast() 不是最后则还是流式输出
                        // incremental 为true 表示递增, 为false 表示最后
                        events.add(ChatEvent.text(content, !event.isLast()));
                    }
                }
            }
            // 工具结果事件
            case TOOL_RESULT -> {
                // 获取工具返回信息
                List<ToolResultBlock> toolResultBlocks = msg.getContentBlocks(ToolResultBlock.class);
                for (ToolResultBlock toolResultBlock : toolResultBlocks) {
                    events.add(ChatEvent.toolResult(
                            toolResultBlock.getId(),
                            toolResultBlock.getName(),
                            convertOutput(toolResultBlock.getOutput())));
                }
            }
        }
        // 把集合转换成响应式流
        return Flux.fromIterable(events);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> convertInput(Object input) {
        if (input == null) {
            return Map.of();
        }

        if (input instanceof Map<?, ?>) {
            return (Map<String, Object>) input;
        }

        try {
            return OBJECT_MAPPER.convertValue(input, new TypeReference<Map<String, Object>>() {
            });
        } catch (Exception e) {
            return Map.of("value", input.toString());
        }
    }


    /**
     * 把工具返回信息转换成文本
     *
     * @param output
     * @return String
     */
    private String convertOutput(List<ContentBlock> output) {
        if (output == null || output.isEmpty()) {
            return "";
        }

        return output.stream()
                .filter(v -> v instanceof TextBlock)
                .map(v -> ((TextBlock) v).getText())
                .collect(Collectors.joining());
    }


    /**
     * 获取文本内容
     *
     * @param msg
     * @return String
     */
    private String extraText(Msg msg) {
        List<TextBlock> textBlocks = msg.getContentBlocks(TextBlock.class);
        return Optional.ofNullable(textBlocks).stream()
                .filter(v -> !v.isEmpty())
                .flatMap(List::stream)
                .map(TextBlock::getText)
                .collect(Collectors.joining());
    }

    /**
     * 获取工具名称
     *
     * @return Set<String>
     */
    public Set<String> getToolsName() {
        return toolkit.getToolNames();
    }

    /**
     * 中断agent
     *
     * @param sessionId
     * @return boolean
     */
    public boolean interrupt(String sessionId) {
        // 从正在运行的Agent中获取
        ReActAgent agent = runningAgents.get(sessionId);
        // 如果不存在则直接返回false
        if (agent == null) {
            return false;
        }
        // 中断Agent
        agent.interrupt();
        return true;
    }
}

  1. 确认请求DTO
package com.example.hitlchat.dto;

import java.util.List;

/**
 * Tool confirmation request DTO.
 */
public class ToolConfirmRequest {

    private String sessionId;
    private boolean confirmed;
    private String reason;
    private List<ToolCallInfo> toolCalls;

    public ToolConfirmRequest() {}

    public ToolConfirmRequest(
            String sessionId, boolean confirmed, String reason, List<ToolCallInfo> toolCalls) {
        this.sessionId = sessionId;
        this.confirmed = confirmed;
        this.reason = reason;
        this.toolCalls = toolCalls;
    }

    public String getSessionId() {
        return sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }

    public boolean isConfirmed() {
        return confirmed;
    }

    public void setConfirmed(boolean confirmed) {
        this.confirmed = confirmed;
    }

    public String getReason() {
        return reason;
    }

    public void setReason(String reason) {
        this.reason = reason;
    }

    public List<ToolCallInfo> getToolCalls() {
        return toolCalls;
    }

    public void setToolCalls(List<ToolCallInfo> toolCalls) {
        this.toolCalls = toolCalls;
    }

    /** Tool call information for rejection response. */
    public static class ToolCallInfo {
        private String id;
        private String name;

        public ToolCallInfo() {}

        public ToolCallInfo(String id, String name) {
            this.id = id;
            this.name = name;
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

  1. 添加确认接口
@PostMapping(value = "/chat/confirm", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ChatEvent>> confirmTool(@RequestBody ToolConfirmRequest request) {
    String sessionId = request.getSessionId();
    if (!StringUtils.hasText(sessionId)) {
        sessionId = "default";
    }
    return agentService.confirmTool(sessionId,
            request.isConfirmed(),
            request.getReason(),
            request.getToolCalls())
            .map(v -> ServerSentEvent.<ChatEvent>builder().data(v).build());
}

四、MCP

4.1、添加MCP服务

  1. 请求DTO
/*
 * Copyright 2024-2026 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.example.hitlchat.dto;

import java.util.List;
import java.util.Map;

/**
 * MCP configuration request DTO.
 */
public class McpConfigRequest {

    /** MCP server name (unique identifier). */
    private String name;

    /** Transport type: STDIO, SSE, HTTP. */
    private String transportType;

    /** For STDIO: command to execute. */
    private String command;

    /** For STDIO: command arguments. */
    private List<String> args;

    /** For SSE/HTTP: server URL. */
    private String url;

    /** HTTP headers for SSE/HTTP transport. */
    private Map<String, String> headers;

    /** Query parameters for SSE/HTTP transport. */
    private Map<String, String> queryParams;

    public McpConfigRequest() {}

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getTransportType() {
        return transportType;
    }

    public void setTransportType(String transportType) {
        this.transportType = transportType;
    }

    public String getCommand() {
        return command;
    }

    public void setCommand(String command) {
        this.command = command;
    }

    public List<String> getArgs() {
        return args;
    }

    public void setArgs(List<String> args) {
        this.args = args;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public Map<String, String> getHeaders() {
        return headers;
    }

    public void setHeaders(Map<String, String> headers) {
        this.headers = headers;
    }

    public Map<String, String> getQueryParams() {
        return queryParams;
    }

    public void setQueryParams(Map<String, String> queryParams) {
        this.queryParams = queryParams;
    }
}

  1. MCP服务编写
  1. 构建McpClient并注册到Agent中
package com.example.hitlchat.service;

import com.example.hitlchat.dto.McpConfigRequest;
import io.agentscope.core.tool.Toolkit;
import io.agentscope.core.tool.mcp.McpClientBuilder;
import io.agentscope.core.tool.mcp.McpClientWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class McpService {
    // 存储McpClient, key: mcpName, 在公司中可以考虑存储到数据库
    private static final ConcurrentHashMap<String, McpClientWrapper> mcpClients = new ConcurrentHashMap<>();

    @Autowired
    private Toolkit toolkit;


    /**
     * addMcpServer(McpConfigRequest request, Toolkit toolkit) 方法
     * 执行步骤:
     *
     * 获取服务器名称:从请求对象中提取 MCP 服务器的唯一标识名
     * 检查重复:查询 mcpClients 映射表,判断同名服务器是否已存在
     * 重复校验:如果已存在,立即返回错误 Mono,抛出 IllegalArgumentException
     * 构建客户端:调用私有方法 buildMcpClient(request) 异步创建 MCP 客户端
     * 注册到缓存:使用 flatMap 操作符,将新建的客户端存入 ConcurrentHashMap
     * 注册到工具包:调用 toolkit.registerMcpClient(client) 将 MCP 工具注册到共享工具包
     * 返回完成信号:返回空的 Mono<Void> 表示操作成功完成
     */
    public Mono<Void> addMcpService(McpConfigRequest request) {
        String name = request.getName();
        if (mcpClients.containsKey(name)) {
            return Mono.error(new IllegalArgumentException("MCP服务已存在"));
        }

        return buildMcpClient(request)
                .flatMap(
                        client -> {
                            mcpClients.put(name, client);
                            return toolkit.registerMcpClient(client);
                        }
                );

    }

    private Mono<McpClientWrapper> buildMcpClient(McpConfigRequest request) {
        // 添加的MCP类型 比如: stdio, sse, http
        String type = request.getTransportType().toUpperCase();
        // 创建McpClient
        McpClientBuilder builder = McpClientBuilder.create(request.getName());
        switch (type) {
            case "STDIO":
                String command = request.getCommand();
                List<String> args = request.getArgs();
                if (args == null || args.isEmpty()) {
                    builder.stdioTransport(command);
                } else {
                    builder.stdioTransport(command, args.toArray(new String[0]));
                }
                break;
            case "SSE":
                builder.sseTransport(request.getUrl());
                addHeaderAndQueryParams(builder, request);
                break;
            case "HTTP":
                builder.streamableHttpTransport(request.getUrl());
                addHeaderAndQueryParams(builder, request);
                break;
            default:
                return Mono.error(new IllegalArgumentException("Invalid transport type: " + type));
        }
        return builder.buildAsync();
    }

    /**
     * sse和http因为设计到网络请求 所以需要header和queryParams
     *
     * @param builder
     * @param request
     */
    private void addHeaderAndQueryParams(McpClientBuilder builder, McpConfigRequest request) {
        Map<String, String> headers = request.getHeaders();
        if (headers != null && !headers.isEmpty()) {
            headers.forEach(builder::header);
        }
        Map<String, String> queryParams = request.getQueryParams();
        if (queryParams != null && !queryParams.isEmpty()) {
            queryParams.forEach(builder::queryParam);
        }
    }
}
  1. MCP添加接口
/**
 * 添加MCP服务
 * @param request
 * @return
 */
@PostMapping("/mcp/add")
public Mono<ResponseEntity<Map<String, Object>>> addMcpServer(@RequestBody McpConfigRequest request) {
    return mcpService.addMcpService(request)
            .then(Mono.just(ResponseEntity.ok(Map.<String, Object>of("success", true))))
            .onErrorResume(e -> Mono.just(ResponseEntity
                    .badRequest().body(Map.of("success", false, "error", e.getMessage()))));
}

4.2、显示和删除MCP服务

  1. 显示和删除service
/**
 * 获取MCP服务列表
 *
 * @return
 */
public Set<String> listMcpServer() {
    return mcpClients.keySet();
}

/**
 * 删除MCP服务
 *
 * @param name
 * @return
 */
public Mono<Void> deleteMcpServer(String name) {
    McpClientWrapper mcpClientWrapper = mcpClients.remove(name);
    if (mcpClientWrapper == null) {
        return Mono.error(new IllegalArgumentException("MCP没有找到"));
    }
    return toolkit.removeMcpClient(name);
}
  1. 显示和删除接口
/**
 * 列出MCP服务
 *
 * @return
 */
@GetMapping("/mcp/list")
public ResponseEntity<Set<String>> listMcpServer() {
    return ResponseEntity.ok(mcpService.listMcpServer());
}

/**
 * 删除MCP服务
 *
 * @param name
 * @return
 */
@DeleteMapping("/mcp/{name}")
public Mono<ResponseEntity<Map<String, Object>>> deleteMcpServer(@PathVariable("name") String name) {
    return mcpService.deleteMcpServer(name)
            .then(Mono.just(ResponseEntity.ok(Map.<String, Object>of("success", true))))
            .onErrorResume(e -> Mono.just(ResponseEntity
                    .badRequest().body(Map.of("success", false, "error", e.getMessage()))));
}

五、前端项目

具体就不粘贴了,可以看项目地址去复制

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐