LangChain-agent:MCP功能的认识与深入使用
LangChain-agent:MCP功能的深入使用
一.MCP
1.1什么是MCP
https://modelcontextprotocol.io/docs/getting-started/intro模型上下文协议 (Model Context Protocol,简称MCP),它是干什么用的呢。文档中有详细的概念的叙述,我们就不那么详细了,这里就直白些。
像我们之前去编写我们的LangChain代码时,每次新开一个项目时就需要把原来的工具重新写一遍,非常麻烦(可以复制粘贴也很麻烦)。
有没有什么办法,我们直接部署一个服务,然后让本地的LangChain应用通过我们给定的服务器地址调用服务,像调工具那样。当然可以,但是agent应该以什么样的方式去向服务器发起调用呢?
就像我们的网页通常以发起HTTP请求来规范我们访问服务器上部署的后端服务一样,agent想要调用后端部署的服务,也需要遵循事先与服务端约定好的协议。这个协议就是MCP。
下面是关于MCP的几点补充:
1. 它不是“HTTP之于网页”那样的通用协议
实际上,MCP 是基于 JSON-RPC 2.0 构建的,可以理解为“协议之上的协议”。即便你完全不用 MCP,用 HTTP + 自定义 JSON 格式也能实现 Agent 调用远程服务。MCP 的价值在于它定义了“格式”标准——比如工具请求长什么样、返回结果长什么样、错误怎么处理,而不是重新发明传输层。
2. 它不只解决工具复用问题
复用 LangChain 工具”这一场景上,这确实是痛点之一。但 MCP 的野心更大:
-
不只管工具,还管数据和 Prompt:MCP 定义的三大核心原语是 Resources(数据源)、Tools(工具) 和 Prompts(预定义模板)。也就是说,AI 应用不仅可以通过 MCP 调用远程计算,还能读取远程文件、数据库,甚至获取预设的 Prompt 模板。
-
不只解决复用问题:如官方文档所言,MCP 的核心是标准化连接——让 AI 应用“一次接入,到处使用”,像 USB-C 一样。你的复用问题只是这个标准化价值的体现之一。
3. 它不需要“事先约定”,而是“动态发现”
虽然说Agent 需要“遵循事先约定好的协议”来调用服务确实没什么问题。但是更准确地说,MCP 是动态发现的:Agent 启动时连接 MCP Server,Server 会主动告诉 Agent“我提供了什么工具和数据”,Agent 动态发现并使用。
4. 它不是专为 Agent 设计的
虽然 Agent 是 MCP 的主要应用场景,但 MCP 的官方定义是“连接 AI 应用到外部系统的标准”。即便是没有 Agent 能力的普通聊天应用,也可以通过 MCP 获取工具和数据。当然,目前主流应用确实是 Agent。
1.2两种MCP传输方式
MCP 支持两种主要的客户端-服务器通信传输机制。
1.2.1HTTP(也称 streamable-http)
- 通过 HTTP 请求通信。有关详细信息,请参见 MCP HTTP 运输规范。
- 适合远程服务器、云部署。
- 支持传递自定义请求头(如认证 token)和实现
httpx.Auth接口的认证机制。 - 官方示例:自定义身份验证实现
1.2.2stdio
- 客户端将服务器作为子进程启动,通过标准输入/输出通信。
- 适合本地工具、简单配置。
- 有状态特性:子进程在客户端连接期间持续存在,但
MultiServerMCPClient默认仍为每次工具调用创建新会话。
具体例子可以参考下面的快速上手部分。
1.3快速上手
1.3.1定义服务端
因为LangChain仅提供了客户端的MCP工具,所以我们需要借助其他的三方包去实现MCP服务端,这里使用fast-mcp,安装包的方式如下:
pip install fastmcp
我们分别定义两个不同传输方式的mcp服务器并运行起来:
stdio:
from fastmcp import FastMCP
mcp = FastMCP("Weather")
@mcp.tool()
def get_weather_by_location(loc : str):
"""在已知用户位置的情况下查询天气信息"""
return f"{loc}未来一个月的天气均是阳光明媚"
@mcp.tool()
def get_weather_without_location():
"""未知用户未知的情况下查询天气信息"""
return "焦作的未来一个月的天气均是阳光明媚"
if "__name__" == "__main__":
mcp.run(transport="stdio")
streamable-http:
from fastmcp import FastMCP
mcp = FastMCP("Math")
@mcp.tool()
def add(a : int,b : int):
"""求两数之和"""
return a + b
if __name__ == "__main__":
mcp.run(transport="streamable-http",port=8080)
1.3.2定义客户端
接下来来定义客户端:
使用 langchain-mcp-adapters 库让 LangChain Agent 调用 MCP 服务器上定义的工具。
pip install langchain-mcp-adapters
核心用法:
- 创建
MultiServerMCPClient,配置一个或多个 MCP 服务器(支持stdio和http传输)。 - 调用
client.get_tools()获取所有工具。 - 将工具传入
create_agent,构建 Agent。
我们来创建一个客户端链接我们刚才创建好的两个mcp服务:
import asyncio
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
# FastMCP 客户端是异步的,因此我们需要使用 asyncio.run 来运行客户端
async def main():
client = MultiServerMCPClient(
{
"Weather": {
"transport": "stdio", # 本地子进程通信
"command": "python",
# stdio服务源码文件的绝对路径
"args": ["C:/code/LangChain/LangChain-v1-test/studio-mcp-test.py"],
},
"Math": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
}
}
)
#获取工具并实例化agent
tools = await client.get_tools()
print(tools)
model = ChatDeepSeek(model="deepseek-v4-flash", base_url="https://api.siliconflow.cn/v1")
agent = create_agent(
model=model,
tools=tools,
system_prompt="你是一个善于调用工具回答用户问题的助手,回答用户问题前,如果有合适的工具调用,请务必调用工具作为辅助回答用户的问题。"
)
math_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "3 + 5等于多少?"}]}
)
weather_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "上海的天气怎么样?"}]}
)
print(math_response)
print(weather_response)
if __name__ == "__main__":
asyncio.run(main())
将客户端启动便可以看到有哪些工具可供调用以及两个问题的回答信息了。
补充注意事项:
-
FastMCP 客户端是异步的,因此我们需要使用
asyncio.run来运行客户端。 -
默认情况下,
MultiServerMCPClient是无状态的:每次工具调用都会创建一个全新的MCP ClientSession,执行工具后立即清理。
1.3.3补充:有状态会话
如果需要控制 MCP 会话的生命周期,例如处理维护跨工具调用上下文的有状态服务器时,可以使用 client.session() 创建持久的 ClientSession。使用 client.session("服务器名称") 上下文管理器,配合 load_mcp_tools 加载工具。
基于上面的例子我们来进行修改,让它称为一个有状态的会话:
import asyncio
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_mcp_adapters.tools import load_mcp_tools
# FastMCP 客户端是异步的,因此我们需要使用 asyncio.run 来运行客户端
async def main():
client = MultiServerMCPClient(
{
"Weather": {
"transport": "stdio", # 本地子进程通信
"command": "python",
# stdio服务源码文件的绝对路径
"args": ["C:/code/LangChain/LangChain-v1-test/studio-mcp-test.py"],
},
"Math": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
}
}
)
async with client.session("Weather") as session_weather, client.session("Math") as session_math:
tools_weather = await load_mcp_tools(session_weather)
tools_math = await load_mcp_tools(session_math)
tools = tools_weather + tools_math
print(tools)
# 该 agent 的所有工具调用将复用同⼀个会话
model = ChatDeepSeek(model="deepseek-v4-flash", base_url="https://api.siliconflow.cn/v1")
agent = create_agent(
model=model,
tools=tools,
system_prompt="你是一个善于调用工具回答用户问题的助手,回答用户问题前,如果有合适的工具调用,请务必调用工具作为辅助回答用户的问题。"
)
math_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "3 + 5等于多少?"}]}
)
weather_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "上海的天气怎么样?"}]}
)
print(math_response)
print(weather_response)
if __name__ == "__main__":
asyncio.run(main())
这样子去创建会话那么对于这所有的mcp服务器的请求此时都不会再创建新会话了。
1.4三种常见构件的访问使用
FastMACP官方文档
MCP不仅仅支持让我们的ai去访问它的工具,同时也可以让他去访问我们在服务端存储的文档或提示词。它的核心能力也是由这些构件组成的,下面我们来认识下:
1. Tools(工具)
- 定义:AI 模型可主动调用的函数,由模型决定使用时机。支持写操作(如修改文件、调用 API、写数据库)。
- 控制方:模型
- 示例:搜索航班、创建日历事件、发送邮件
2. Resources(资源)
- 定义:只读的被动数据源,为模型提供上下文信息(如文件内容、数据库模式、API 文档)。
- 控制方:应用
- 资源类型:
- 直接资源:固定 URI,如
calendar://events/2024 - 资源模板:动态 URI,支持参数,如
weather://forecast/{city}/{date}
- 直接资源:固定 URI,如
3. Prompts(提示词)
- 定义:预置的指令模板,指导模型配合特定工具和资源完成任务。
- 控制方:用户
- 特点:支持参数化、参数补全、显式调用(如斜杠命令)
1.4.1访问与使用resource
客户端获取 Resources 时,LangChain 将 MCP Resource 统一转换为 Blob 对象,提供一致的接口处理文本和二进制内容。
Blob 是 LangChain 统一的数据容器,具有以下属性:
| 属性/方法 | 说明 |
|---|---|
blob.metadata |
包含 uri(资源标识符)等元数据 |
blob.mimetype |
MIME 类型,如 text/plain、image/png |
blob.as_string() |
将内容解码为字符串(文本文件) |
blob.as_bytes() |
获取原始字节数据(二进制文件) |
需要特别注意的是,我们在使用resource时,Resources 是只读的,用于向 LLM 提供上下文信息,而非执行动作(那是 Tools 的职责)。
此时比如说我在服务端保存了一些qa文档,然后我让agent来通过这些文档为我提供答案。我们以这个场景来写一个例子:
import json
from fastmcp import FastMCP
mcp = FastMCP("Qa_Document")
@mcp.resource(uri="data://qa")
def get_qa_document() -> str:
return json.dumps({
"q" : "如何进行服务端密码重置?",
"a" : "目前仅支持提交工单后,交由工作人员进行密码重置"
})
if __name__ == "__main__":
mcp.run(transport="streamable-http",port=8080)
上面是一个服务端,我们其实有两种方式去访问qa文档,一种是直接全量获取,另一种则是通过指定的uri进行获取。我们来看下面的示例:
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.resources import load_mcp_resources
# FastMCP 客户端是异步的,因此我们需要使用 asyncio.run 来运行客户端
async def main():
client = MultiServerMCPClient(
{
"Qa_Document": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
}
}
)
#无状态会话进行资源获取
resources = await client.get_resources("Qa_Document")
#await client.get_resources("Qa_Document",uris=["data://qa"]) #通过uri对指定资源进行获取
for blob in resources:
print(f"资源uri:{blob.metadata["uri"]}")
print(f"资源的MIME类型:{blob.mimetype}")
print(f"资源的实际内容:{blob.as_string()}")
#有状态的会话进行资源获取
async with client.session("Qa_Document") as session_qa:
resources = await load_mcp_resources(session_qa)
#resources = await load_mcp_resources(session_qa,uris=["data://qa"]) #通过uri对指定资源进行获取
for blob in resources:
print(f"资源uri:{blob.metadata["uri"]}")
print(f"资源的MIME类型:{blob.mimetype}")
print(f"资源的实际内容:{blob.as_string()}")
if __name__ == "__main__":
asyncio.run(main())
结果各位可以自行尝试看下,两次打印结果都是一样的。
这样子其实我们并没有把这个资源提供给agent进行使用,我们可以定义一个获取资源的工具来作为中间层供agent进行资源访问,像下面这个例子一样:
import asyncio
import json
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.resources import load_mcp_resources
mini_model = ChatDeepSeek(model="deepseek-v4-flash",base_url="https://api.siliconflow.cn/v1")
# FastMCP 客户端是异步的,因此我们需要使用 asyncio.run 来运行客户端
async def main():
client = MultiServerMCPClient(
{
"Qa_Document": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
}
}
)
resources = await client.get_resources("Qa_Document")
# 无状态会话进行资源获取
# await client.get_resources("Qa_Document",uris=["data://qa"]) #通过uri对指定资源进行获取
@tool
def get_qa_document() -> list[dict[str,any]]:
"""搜索全量文档工具"""
res = []
for blob in resources:
if blob.mimetype == "text/plain":
res.append(json.loads(blob.as_string()))
print(res)
return res
agent = create_agent(model=mini_model,system_prompt="请尽可能的调用工具回答用户问题",tools=[get_qa_document])
result = await agent.ainvoke({"messages" : [HumanMessage("可以告诉我服务端如何重置密码吗?")]})
print(result.get("messages")[-1])
if __name__ == "__main__":
asyncio.run(main())
1.4.2访问prompt
LangChain 将 MCP 提示转换为可直接在聊天模型中使用的消息列表 (messages)。有两种加载方式:
| 方式 | 描述 | 使用场景 |
|---|---|---|
| 通过客户端直接加载 | 使用 MultiServerMCPClient 实例的 get_prompt() 方法。 |
简单、直接的调用,适用于大多数情况。 |
| 通过会话精确控制 | 先创建持久会话 (client.session()),再使用 load_mcp_prompt() 函数。 |
需要更精细地管理 MCP 会话生命周期时(例如有状态服务器)。 |
使用就不多说了,因为你获取到提示词之后可以通过包装工具的方式去使用,也可以先获取在传给agent作为初始化参数,怎么使用随你自己个人喜好。
我们主要来看怎么访问获取,首先先在1.1.1的mcp服务器基础上多定义一个@mcp.prompt:
@mcp.prompt
def get_prompt(nums : list[float]) -> str:
return f"请对这些数据进行分析: {"".join(str(point) for point in nums)}。"
客户端访问这里就直接持久会话了,无状态其实是一样的写法,就是换个函数而已:
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
import asyncio
import json
async def main():
client = MultiServerMCPClient(
{
"Qa_Document": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
}
}
)
prompts = await client.get_prompt(
"Qa_Document",
"get_prompt",
arguments={
"nums" : json.dumps([1.1,1.5,1.6])
}
)
for prompt in prompts:
print(f"{prompt.type} : {prompt.content}")
if __name__ == "__main__":
asyncio.run(main())
最终结果为:
human : 请对这些数据进行分析: 1.11.51.6。
这里消息类型为人类是因为我们服务端没有设置返回类型,所以服务端默认返回时设置的类型为human,langchain这里解析出来就是human了。
1.4.3访问社区mcp工具
工具访问前文我们已经见过了,这里就不再赘述了,我们主要来看下怎么去访问社区提供的mcp工具。
比如我这里找了一个能够搜索鸣潮信息的mcp工具,我们来试着使用下:
https://modelscope.cn/mcp/servers/@jacksmith3888/wuwa-mcp-server
根据文档提示,首先我们先安装对应包:
pip install wuwa-mcp-server
按照人家给我们的例子来配置下客户端:
"wuwa-mcp": {
"command": "uvx",
"args": ["wuwa-mcp-server"]
}
可以看到这里使用了uvx,所以我们需要先在我们的电脑上安装uvx,windows安装命令如下:
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
完整示例访问代码如下:
import asyncio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
mini_model = ChatDeepSeek(model="deepseek-v4-flash",base_url="https://api.siliconflow.cn/v1")
async def main():
client = MultiServerMCPClient(
{
"wuwa-mcp": {
"transport": "stdio", #注意为本地
"command": "uvx",
"args": ["wuwa-mcp-server"]
}
}
)
tools = await client.get_tools(server_name="wuwa-mcp")
agent = create_agent(
model=mini_model,
tools=tools
)
result = await agent.ainvoke({"messages" : HumanMessage("可以告诉我鸣潮的角色今汐的信息吗")})
print(result.get("messages")[-1])
if __name__ == "__main__":
asyncio.run(main())
此时就能正常返回结果了,需要注意如果你之前没有安装过uvx记得重启下IDE不然程序找不到。
更多三方模型请见魔塔社区。
1.5进阶功能
1.5.1工具拦截器
https://docs.langchain.com/oss/python/langchain/mcp#tool-interceptors
MCP 服务器作为独立进程运行,它们无法直接访问 LangGraph 的运行时信息,例如存储 (store)、上下文 (context) 或 Agent 状态 (state)。
拦截器(Interceptors) 填补了这一空白,它在 MCP 工具执行期间为你提供了访问这些运行时上下文的途径。同时,拦截器也提供了类似中间件的控制能力:可以修改请求、实现重试逻辑、动态添加请求头,甚至完全中断执行。
基本使用
比如我们服务端此时有这样一个工具,它需要我们传入用户id:
from fastmcp import FastMCP
mcp = FastMCP("Weather")
@mcp.tool
def get_weather(user_id : str,city : str) -> str:
'''获取指定城市的天气信息
Args:
city (str):用户想要查询的目标位置
user_id (str):查询用户的uid
Returns:
str:成功返回对应位置天气信息总结,失败返回None
'''
return f"{user_id}进行{city}的天气查询,结果为目标地点近一个月总是阳光明媚。"
if __name__ == "__main__":
mcp.run(transport="streamable-http",port=8080)
那我客户端ai怎么知道你user_id呢,虽然你这个id在上下文中。但是我ai不知道。除了我们提供给ai工具,能够让其进行上下文查询的方式外,我们也可以使用工具拦截器。根据官方文档,我们可以写出来下面这样一个例子去访问上面的mcp服务:
import asyncio
from dataclasses import dataclass
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.interceptors import MCPToolCallRequest
from langchain_mcp_adapters.resources import load_mcp_resources
mini_model = ChatDeepSeek(model="deepseek-v4-flash",base_url="https://api.siliconflow.cn/v1")
@dataclass
class Context:
user_id: str
async def tool_interceptor(
request: MCPToolCallRequest,
handler,
):
runtime = request.runtime
user_id = runtime.context.user_id
modified_request = request.override(
args={**request.args, "user_id": user_id}
)
return await handler(modified_request)
async def main():
client = MultiServerMCPClient(
{
"Weather": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
},
},
tool_interceptors=[tool_interceptor] #加入工具拦截器
)
tools = await client.get_tools(server_name="Weather")
agent = create_agent(model=mini_model,system_prompt="请尽可能的调用工具回答用户问题",context_schema=Context,tools=tools)
result = await agent.ainvoke(
{"messages" : [HumanMessage("可以告诉我上海今天的天气吗?")]},
context={"user_id" : "111"}
)
print(result.get("messages"))
if __name__ == "__main__":
asyncio.run(main())
工具此时返回结果就可以正常拿到我们上下文中的user_id了。
111进行上海的天气查询,结果为目标地点近一个月总是阳光明媚。
状态更新与跳转
在看完上面的例子后,我们其实发现这个工具拦截器其实和我们之前见过的wrap_tool_call中间件使用起来非常相似。实际上工具拦截器也是可以返回Command的然后直接终止流程,我们把上面的拦截器稍微改下,就让他当用户id为111时直接结束:
async def tool_interceptor(
request: MCPToolCallRequest,
handler,
):
runtime = request.runtime
user_id = runtime.context.user_id
if user_id == "111":
return Command(
update={"messages" : runtime.state["messages"] + [ToolMessage("目标用户无调用权限",tool_call_id=runtime.tool_call_id)]},
goto="__end__" #直接让其走到终止节点
)
modified_request = request.override(
args={**request.args, "user_id": user_id}
)
result = await handler(modified_request)
此时我们再去调用就会发现工具调用直接被拦截了,不过原本来说应该直接跳转到end节点的,按照官方文档所写应该也是,但是程序实际上运行并没有跳转到end。应该是一个bug。
更多工具拦截器的实践可以参考这里。
我们来简单总结下工具拦截器:
| 核心功能 | 关键点与原文描述 |
|---|---|
| 访问运行时上下文 | 通过 request.runtime 访问 state、config、store、context。案例展示了注入用户凭证到工具参数中。 |
| 状态更新与流程控制 | 返回 Command 对象。案例展示了更新状态并跳转 (goto="summary_agent") 和提前结束执行 (goto="__end__")。 |
| 编写自定义拦截器 | • 基本结构:async def func(request, handler)• 修改请求:使用 request.override()• 动态头:根据 request.name 修改 headers• 错误处理:包含指数退避重试和特定异常降级的完整代码 • 组合执行:遵循洋葱模型顺序 |
1.5.2MCP进度通知
https://fastmcp.wiki/zh/servers/progress
在此之前我们先来看下fastmcp在定义工具时的工具函数中可以再额外传入的一个参数Context:
Context 是 FastMCP 为工具函数提供的上下文对象,封装了进度报告、日志记录、用户交互等 MCP 协议功能。
获取方式:在工具函数签名中添加类型为 Context 的参数,FastMCP 会在调用时自动注入。
进度报告方法:report_progress,注意因为ctx.report_progress是异步通知,所以我们定义的工具需要是异步的才可以:
ctx.report_progress() 用于向客户端发送进度更新。
| 参数 | 类型 | 说明 |
|---|---|---|
progress |
float |
当前进度值 |
total |
float | None |
总量值,可选 |
message |
str | None |
进度描述信息,可选 |
Context 其他功能一览
| 功能 | 方法/属性 | 说明 |
|---|---|---|
| 日志记录 | ctx.debug()、ctx.info()、ctx.warning()、ctx.error() |
向客户端发送日志消息 |
| 用户交互 | ctx.elicit(message, schema) |
请求用户提供结构化输入 |
| 资源访问 | await ctx.read_resource(uri) |
读取服务器注册的资源 |
| LLM 采样 | await ctx.sample(messages) |
请求客户端 LLM 生成文本 |
| 会话/请求标识 | ctx.session_id、ctx.request_id、ctx.client_id |
获取标识信息 |
那么我们先来定义服务端的一个工具,让其模拟处理一个耗时任务,然后我们去调用report_progress向客户端发送进度消息:
@mcp.tool
async def solve_large_file(file_path : str,ctx : Context) -> str:
'''处理服务器上的指定路径下的大文件'''
await ctx.report_progress(0.0,100.0,"正在启动ing~")
await asyncio.sleep(1)
await ctx.report_progress(45.0,100.0,"加载资源文件完毕~")
await asyncio.sleep(1)
await ctx.report_progress(90.0,100.0,"文件处理完毕,退出处理程序ing~")
await asyncio.sleep(1)
await ctx.report_progress(100.0,100.0,"处理程序退出完毕")
return f"目标文件:{file_path},已经正常处理完毕。"
那么怎么让客户端能够接收到服务端传给我的进度消息呢?实际上客户端是通过回调函数的方式实现的,我们来认识下这个回调函数应该怎么写:
客户端订阅 MCP 服务器的进度更新需通过 Callbacks 注册进度回调函数。
回调函数 on_progress 参数:
| 参数名 | 类型 | 说明 |
|---|---|---|
progress |
float |
当前进度值(具体含义由服务器定义,通常为已处理单元数量或比例) |
total |
float | None |
总量值。若服务器未提供总量,则为 None,此时只能展示绝对进度 |
message |
str | None |
服务器发送的进度描述文本,如 "正在处理第3项..." |
context |
CallbackContext |
包含当前调用上下文的元数据(服务器名称、工具名称等) |
CallbackContext 结构:
class CallbackContext:
server_name: str # 发送该进度通知的 MCP 服务器名称。
tool_name: str # 当前正在执行的工具名称(仅在工具调用期间有效),
# 其他场景下可能为空。
那我们来为客户端编写一下这个回调函数:
async def on_progress(
progress : float,
total : float | None,
message : str | None,
context : CallbackContext
):
"""处理mcp服务器返回的进度消息"""
print(f"[{context.server_name}]:[{context.tool_name}]:{message}({progress}/{total})")
需要注意写完回调函数之后记得在create_agent处注册回调函数:
client = MultiServerMCPClient(
{
"Weather": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
},
},
callbacks=Callbacks(on_progress=on_progress), #需要注意以此种方式注册我们的回调
)
然后开启调用就能看到每隔1s就有进度消息打印在客户端了。
需要注意的是:
- 进度通知是单向推送,客户端无法通过回调干预工具执行。
- CallbackContext 提供了
server_name和tool_name,便于区分多个服务或工具。 - 服务端 Context 除进度报告外,还支持日志、用户交互、资源访问等 MCP 高级功能。
1.5.3MCP日志通知
https://fastmcp.wiki/zh/servers/logging
我们上面说Context的时候其实提到了MCP服务可以传给客户端日志消息,那么该怎么发日志消息给客户端呢,此处要使用到fastmcp的Content中的几个成员函数,当然它也是异步的,需要我们定义工具为异步函数。具体可以参考上面的文档,这里我们就直接写例子了:
@mcp.tool
async def solve_large_file(file_path : str,ctx : Context) -> str:
'''处理服务器上的指定路径下的大文件'''
await ctx.info("正在加载文件")
await asyncio.sleep(1)
await ctx.debug("此为调试信息,如果你看到此消息,请反馈开发者")
await asyncio.sleep(1)
await ctx.warning("文件过大,尝试切换方式处理")
await asyncio.sleep(1)
await ctx.error("切换处理方式失败")
return f"目标文件:{file_path},已经正常处理完毕。"
https://docs.langchain.com/oss/python/langchain/mcp#server-setup
当然客户端自然需要去注册一个回调函数来处理服务端的消息了:
async def on_logging_message(
params: LoggingMessageNotificationParams,
context: CallbackContext,
):
"""处理mcp服务器返回的日志消息"""
print(f"[{context.server_name}]:[{context.tool_name}]:[{params.level}]:({params.data})")
而LoggingMessageNotificationParams的类结构大致如下:
class LoggingMessageNotificationParams(NotificationParams):
"""日志消息通知的参数类"""
level: LoggingLevel
"""日志消息的严重性级别(例如:DEBUG、INFO、WARNING、ERROR等)"""
logger: str | None = None
"""可选的日志记录器名称,用于标识该消息来自哪个日志器"""
data: Any
"""
要记录的日志数据,可以是字符串消息或其他任意对象。
注意:该类型必须是 JSON 可序列化的(如字符串、数字、列表、字典等)
"""
model_config = ConfigDict(extra="allow")
"""允许接收未在类中定义的额外字段,提高灵活性"""
此时启动客户端和服务器就能看到日志输出了。别忘记了注册回调~
client = MultiServerMCPClient(
{
"Weather": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
},
},
callbacks=Callbacks(on_logging_message=on_logging_message), #需要注意以此种方式注册我们的回调
)
1.5.4引导式输入
https://fastmcp.wiki/zh/servers/elicitation
有的时候,我们客户端调用mcp服务的时候需要客户端提供一些信息才能继续进行,此时服务端就需要通过ctx.elicit()向客户端请求消息,它有两个必须填写的参数:ctx.elicit(message, schema) 方法参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
message |
str |
向用户显示的提示消息 |
response_type |
type(默认值:None) |
定义预期响应结构的 Python 类型(数据类、基本类型等)。请注意,征询响应受到 JSON Schema 类型的限制子集的限制。更多详细信息请参阅支持的响应类型。 |
在调用此方法后,客户端给予响应,服务端此时此方法会返回下面这样一个结构的对象:
ElicitationResult 对象结构:
| 属性 | 类型 | 说明 |
|---|---|---|
action |
Literal['accept', 'decline', 'cancel'] |
用户对请求的响应方式 |
data |
response_type | None |
用户的输入数据(仅在 action 为 "accept" 时存在) |
那么我们参考文档给的例子来写一个例子:
@dataclass
class UserInfo:
email : str
user_name : str
@mcp.tool
async def collect_user_info(ctx: Context) -> str:
"""通过交互式提示收集用户信息并进行用户注册。"""
result = await ctx.elicit(
message="请提供您的注册信息",
response_type=UserInfo
)
if result.action == "accept":
return f"[{result.data.email}]:[{result.data.user_name}],此用户信息是合法的,注册成功。"
elif result.action == "cancel":
return "用户取消了注册操作"
else:
return "用户拒绝填入用户信息,注册流程终止"
当然客户端也是需要去注册回调函数啦,我们来认识下这个回调该怎么写:
https://docs.langchain.com/oss/python/langchain/mcp#client-setup
基于官网文档的说明:
- 回调接收
ElicitRequestParams,其中包含message和requestedSchema。 - 必须返回一个
ElicitResult对象,指示用户的操作和提供的数据。
响应动作:
ElicitResult 支持三种动作,对应不同用户意图:
| 动作 | 描述 | 使用示例 |
|---|---|---|
| accept | 用户提供了有效输入,需在 content 中附带数据 |
ElicitResult(action="accept", content={"email": "user@example.com", "age": 25}) |
| decline | 用户拒绝提供所请求的信息 | ElicitResult(action="decline") |
| cancel | 用户完全取消当前操作 | ElicitResult(action="cancel") |
具体这些参数的内容是什么意思大家结合ai和源码看下就ok了,很简单,我这里就直接写例子了:
async def on_elicitation(
mcp_context: RequestContext,
params: ElicitRequestParams,
context: CallbackContext,
) -> ElicitResult:
"""处理mcp服务器的请求消息"""
user_action = input(f"是否发起注册请求,仅可填写accept | decline | cancel,默认为取消\n\n")
if user_action == "accept":
return ElicitResult(
action="accept",
content={"email": "user@example.com", "user_name": "咻"},
)
elif user_action == "decline":
return ElicitResult(
action="decline"
)
else:
return ElicitResult(
action="cancel"
)
记得注册回调,然后就能看到运行结果了:
client = MultiServerMCPClient(
{
"Weather": {
"transport": "streamable-http", # 基于 HTTP 的远程服务器
# Math服务的启动地址
"url": "http://127.0.0.1:8080/mcp",
},
},
callbacks=Callbacks(on_elicitation=on_elicitation), #需要注意以此种方式注册我们的回调
)
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐



所有评论(0)