[Deep Agents:LangChain的Agent Harness-11]AsyncSubAgentMiddleware如何让Agent像微服务一样远程协作?
本文介绍了如何利用AsyncSubAgentMiddleware构建异步Sub-Agent系统。通过将transport_agent和accommodation_agent部署为独立服务,Main-Agent可以异步调用这些Sub-Agent执行机票购买和酒店预订任务,同时保持与用户交互的能力。文章详细展示了Agent服务器的构建过程、配置方法以及Main-Agent如何通过AsyncSubAge
异步Sub-Agent允许Main-Agent启动后台任务并立即返回,这样Main-Agent就可以在Sub-Agent并发工作的同时继续与用户交互。Main-Agent可以随时检查进度、发送后续指令或取消任务。这与同步运行的Sub-Agent不同,同步运行的Sub-Agent会阻塞Main-Agent直到任务完成。当任务耗时较长、可并行化或需要中途调整时,就需要使用异步Sub-Agent。AsyncSubAgentMiddleware让我们能够轻松地创建和管理异步Sub-Agent。
1. 利用AsyncSubAgentMiddleware重写差旅助手
上面我们在“SubAgentMiddleware如何驱动Sub-Agent的协同演化”利用SubAgentMiddleware创建了一个差旅助理Agent,下面我们将它改造成使用AsyncSubAgentMiddleware的版本。我们将transport_agent和accommodation_agent这两个Sub-Agent独立部署在单独的Agent Server,并利用langgraph dev启动它们,部署方式可以参阅“Local development & testing”。两个Project采用如下所示的最简单的目录结构。
project/
├── .env
├── langgraph.json
├── agent.py
└── pyproject.toml
1.1 构建Agent服务器
两个Agent定义在agent.py中,完整定义如下:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
@tool
def buy_air_ticket(origin: str, destination: str, departure_date: str, flight_no: str):
"""机票购买工具,提供出发地、目的地、出发日期和航班号,返回购买信息"""
return f"Flight ticket bought from {origin} to {destination} on {departure_date} with flight number {flight_no}"
graph = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools=[buy_air_ticket],
system_prompt=("你是一个专注于出行安排的个人助理,可以帮助用户调用`buy_air_ticket`工具购买机票。"
"达成购买机票的目标是首要任务,你可以自行决定航司、航班和舱位等级。"
"将购买信息以清晰的格式返回给用户。"))
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
@tool
def book_hotel(city: str, hotel: str, check_in_date: str, check_out_date: str):
"""酒店预订工具,提供城市、酒店、入住日期和退房日期,返回预订信息"""
return f"Hotel booked in {city} at {hotel} from {check_in_date} to {check_out_date}"
graph = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools=[book_hotel],
system_prompt=("你是一个专注于住宿安排的个人助理,可以帮助用户调用`book_hotel`工具预订酒店。"
"达成预订酒店的目标是首要任务,你可以自行决定酒店、房型和价格等级。"
"将预订信息以清晰的格式返回给用户。"))
如下所示两个langgraph.json的定义:
{
"dependencies": ["."],
"graphs": {
"transport_agent": "./agent.py:graph"
},
"env": ".env"
}
{
"dependencies": ["."],
"graphs": {
"accommodation_agent": "./agent.py:graph"
},
"env": ".env"
}
我们指定如下的命令在本地以langgraph dev的方式启动Agent Server,并将端口分别设置为9999和9998。
uv run langgraph dev --port 9999
uv run langgraph dev --port 9998
1.2 利用AsyncSubAgentMiddleware远程调用Sub-Agent
如下所示的是作为Main-Agent的程序定义。创建的AsyncSubAgentMiddleware由两个AsyncSubAgent组成,分别对应前面定义的transport_agent和accommodation_agent。由于Sub-Agent以是异步任务形式启动的,所以Main-Agent第一次调用它们的时候得到的仅仅代表异步任务的Task对象,但是它可以利用此对象跟踪任务的状态。为我们可以通过与Main-Agent的持续对话来查询Sub-Agent的执行进度,并在Sub-Agent完成任务后得到最终结果。
from datetime import datetime, date
from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.runnables import RunnableConfig
from langchain.agents import create_agent
from langchain.tools import tool
from deepagents.middleware import AsyncSubAgentMiddleware
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import time
load_dotenv()
class Event(TypedDict):
"""日程事件"""
title: str
"""事件标题"""
description: str
"""事件描述"""
start_time: datetime
"""事件开始时间"""
end_time: datetime
"""事件结束时间"""
@tool
def check_calendar(date: date) -> list[Event]:
"""日程查询工具,提供日期,返回该日期的日程信息"""
if date == datetime.now().date():
return [
Event(
title="团队会议",
description="与团队成员讨论项目进展",
start_time=datetime.strptime(f"{date} 10:00", "%Y-%m-%d %H:%M"),
end_time=datetime.strptime(f"{date} 11:00", "%Y-%m-%d %H:%M"),
),
Event(
title="客户电话会议",
description="与客户讨论需求和反馈",
start_time=datetime.strptime(f"{date} 14:00", "%Y-%m-%d %H:%M"),
end_time=datetime.strptime(f"{date} 19:00", "%Y-%m-%d %H:%M"),
),
]
return []
middleware = AsyncSubAgentMiddleware(
async_subagents=[
{
"name": "transport_agent",
"url": "http://127.0.0.1:9999",
"graph_id": "transport_agent",
"description": "一个专注于出行安排的个人助理,负责处理与机票购买相关的安排。",
},
{
"name": "accommodation_agent",
"url": "http://127.0.0.1:9998",
"graph_id": "accommodation_agent",
"description": "一个专注于住宿安排的个人助理,负责处理与酒店预订相关的安排。",
},
]
)
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools=[check_calendar],
system_prompt=(
"你是一个差旅助理,可以帮助用户安排差旅计划。"
"你可以调用`check_calendar`工具查询用户的日程安排,以便更好地规划差旅计划。"
"如果没有日程安排冲突,你可以直接调用注册工具购买机票和预订酒店。"
),
middleware=[middleware],
checkpointer=MemorySaver(),
)
config: RunnableConfig = {"configurable": {"thread_id": "thread_123"}}
result = agent.invoke(
input={
"messages": [
{
"role": "user",
"content": "明后两天有一个从上海到北京的差旅计划,请帮购买往返机票(明天出发,后天返回)和预订酒店(入住明天,退房后天)。",
}
]
},
config=config,
)
print(result["messages"][-1].content)
print("-" * 50)
result = agent.invoke(
input={
"messages": [{"role": "user", "content": "帮我查一下机票和酒店预订的进度。"}]
},
config=config,
)
print(result["messages"][-1].content)
time.sleep(3) # 模拟过一段时间后用户再次查询
print("-" * 50)
result = agent.invoke(
input={
"messages": [{"role": "user", "content": "帮我查一下机票和酒店预订的进度。"}]
},
config=config,
)
print(result["messages"][-1].content)
1.3 测试我们的差旅助手
我们对Main-Agent进行了三次调用。第一次调用下达了差旅安排的指令,后面两次调用则是查询Sub-Agent的执行进度。三次调用对应如下所示的三段输出:
好的,我已经帮你把相关安排交给后台助理去处理了,现在它们正在**后台异步执行**中:
### ✈️ 机票购买
- 行程:上海 ⇄ 北京
- 去程:明天(4 月 24 日)
- 回程:后天(4 月 25 日)
- 任务 ID:**385189eb-eb85-4c15-8c45-0aa51678568f**
### 🏨 酒店预订
- 城市:北京
- 入住:明天(4 月 24 日)
- 退房:后天(4 月 25 日)
- 任务 ID:**a6b6ddeb-8523-474a-a291-29065320ed39**
这两个任务都会在后台完成机票购买和酒店预订。
你可以随时对我说例如:
- “帮我看看机票订好了吗?”
- “查询一下酒店预订结果”
我会帮你查询最新状态 ✅
我刚刚帮你查了最新进度,结果如下:
### ✈️ 机票购买
- 任务 ID:385189eb-eb85-4c15-8c45-0aa51678568f
- 当前状态:**进行中(running)**
- 说明:正在筛选合适航班并执行购买流程。
### 🏨 酒店预订
- 任务 ID:a6b6ddeb-8523-474a-a291-29065320ed39
- 当前状态:**进行中(running)**
- 说明:正在查询并锁定合适的酒店房型。
这两个任务都还在后台处理中,一有结果我可以立刻帮你查询。
你可以稍后再跟我说一声,比如:“再帮我查一次进度”即可。
好消息,两项都已经**顺利完成**了 ✅
我帮你汇总一下结果,方便你查看:
---
## ✈️ 往返机票(已购)
**行程:上海 ⇄ 北京**
### 去程
- 出发:上海虹桥机场(SHA)
- 到达:北京首都机场(PEK)
- 日期:**2026 年 4 月 24 日**
- 航班号:**东方航空 MU5101**
### 回程
- 出发:北京首都机场(PEK)
- 到达:上海虹桥机场(SHA)
- 日期:**2026 年 4 月 25 日**
- 航班号:**东方航空 MU5102**
---
## 🏨 酒店预订(已订)
- 酒店:**北京国贸大酒店**
- 城市:北京(CBD 核心商务区)
- 入住:**2026 年 4 月 24 日**
- 退房:**2026 年 4 月 25 日**
- 入住晚数:1 晚
- 适合:商务出行、交通便利
---
如果你需要:
- 调整航班时间 / 舱位
- 更换或升级酒店
- 安排接送机、早餐、会议室
- 导出行程单用于报销
直接告诉我就可以,我继续帮你安排 👍
针对第一调用,输出内容只是已经下达了购买机票和预订酒店的指令,并且Sub-Agent正在后台异步执行中。针对第二次调用(紧接着第一次调用),输出内容则是Sub-Agent的最新执行状态,告诉用户两个任务都还在处理中。针对第三次调用(间隔了三秒),输出内容则是Sub-Agent已经完成了任务,并且给出了具体的购买机票和预订酒店的结果信息。
1.4 调用链分析
如下所示的是针对第一个请求的调用链,可以看出它实际上调用了start_async_task工具来启动Sub-Agent,start_async_task是AsyncSubAgentMiddleware创建的工具集之一。
当我们发出查看进度的请求时,Agent内部会按照如下的调用链来查询Sub-Agent的执行状态,可以看出它调用了check_async_task工具来查询Sub-Agent的执行状态,check_async_task同样是AsyncSubAgentMiddleware创建的工具集之一。
2. AsyncSubAgentMiddleware设计与实现
2.1 描述执行异步Sub-Agent的后台任务
AsyncSubAgentMiddleware以后台任务的形式运行Sub-Agent,这个任务通过如下这个AsyncTask表示。
class AsyncTask(TypedDict):
task_id: str
agent_name: str
thread_id: str
run_id: str
status: str
created_at: str
last_checked_at: str
last_updated_at: str
各字段说明如下:
- task_id:任务ID,唯一标识一个异步任务;
- agent_name:Sub-Agent的名称,表示这个任务是哪个Sub-Agent在执行;
- thread_id:用来标识Session(Thread)的唯一ID;
- run_id:标识在一个Thread中的某一次具体调用的ID;
- status:任务状态,处于兼容性考虑没有选择Literal类型,而是直接使用字符串来表示任务的状态,常用的状态包括
running,success,error和cancelled; - created_at/last_checked_at/last_updated_at:记录了任务的创建、轮询检查、内容更新(收到新消息)三个维度的时间,全部采用ISO-8601标准。这对于处理超时逻辑、清理过期任务或前端显示进度至关重要。
AsyncSubAgentMiddleware会生成相应的工具来管理AsyncTask对象,并将它们保存在AgentState中,以便在不同的请求中跟踪和查询这些异步任务的状态。这一点体现state_schema字段返回的状态类型的AsyncSubAgentState定义上,所有的AsyncTask对象都保存在async_tasks这个字段中,从reducer函数的定义可以看出它采用合并操作实现任务的添加和更新。
class AsyncSubAgentMiddleware(AgentMiddleware[Any, ContextT, ResponseT]):
state_schema = AsyncSubAgentState
class AsyncSubAgentState(AgentState):
async_tasks: Annotated[NotRequired[dict[str, AsyncTask]], _tasks_reducer]
def _tasks_reducer(ttttttttttttttttttt
existing: dict[str, AsyncTask] | None,
update: dict[str, AsyncTask],
) -> dict[str, AsyncTask]:
merged = dict(existing or {})
merged.update(update)
return merged
AsyncSubAgentMiddleware的__init__方法接受一个async_subagents参数,这个参数是一个AsyncSubAgent对象的列表,每个AsyncSubAgent对象都包含了Sub-Agent的基本信息(如名称、描述、部署URL等)。AsyncSubAgentMiddleware会根据这些信息来创建相应的工具,以便Main-Agent能够通过工具调用来启动和管理这些异步Sub-Agent。它还提供了用于指定系统提示词的system_prompt参数,一般情况下我们不会显式地传入这个参数,除非你觉得你提供的提示词比默认指定的(ASYNC_TASK_SYSTEM_PROMPT)更好。
class AsyncSubAgentMiddleware(AgentMiddleware[Any, ContextT, ResponseT]):
def __init__(
self,
*,
async_subagents: list[AsyncSubAgent],
system_prompt: str | None = ASYNC_TASK_SYSTEM_PROMPT,
) -> None
class AsyncSubAgent(TypedDict):
name: str
description: str
graph_id: str
url: NotRequired[str]
headers: NotRequired[dict[str, str]]
AsyncSubAgent各字段说明如下:
- name:Sub-Agent的名称,必须唯一,用于标识和调用Sub-Agent;
- description:Sub-Agent的描述信息,用于指导LLM如何使用这个Sub-Agent,以及在工具描述中展示给用户;
- graph_id:Sub-Agent部署在Agent Server上的Graph ID,用于工具调用时指定要启动哪个Sub-Agent。对于前面演示的例子,此字段对应Agent Server中langgraph.json中graphs节点定义的Key;
- url:Sub-Agent部署的Agent Server的URL地址;
- headers:调用Sub-Agent时需要携带的HTTP请求头信息,通常用于认证等场景;
2.2 执行异步Sub-Agent的若干工具
AsyncSubAgentMiddleware的__init__方法会创建如下这一系列的工具来管理用于异步执行Sub-Agent的AsyncTask对象:
- start_async_task:启动一个新的异步Sub-Agent任务;
- check_async_task:查询一个异步Sub-Agent任务的最新状态;
- update_async_task:更新一个异步Sub-Agent任务的状态;
- cancel_async_task:取消一个正在运行的异步Sub-Agent任务;
- list_async_tasks:列出所有的异步Sub-Agent任务及其状态;
对于前面演示实例使用的AsyncSubAgentMiddleware来说,我们可以利用如下的代码片段来查看它创建的工具:
from deepagents.middleware import AsyncSubAgentMiddleware
middleware = AsyncSubAgentMiddleware(
async_subagents=[
{
"name": "transport_agent",
"url": "http://127.0.0.1:9999",
"graph_id": "transport_agent",
"description": "一个专注于出行安排的个人助理,负责处理与机票购买相关的安排。",
},
{
"name": "accommodation_agent",
"url": "http://127.0.0.1:9998",
"graph_id": "accommodation_agent",
"description": "一个专注于住宿安排的个人助理,负责处理与酒店预订相关的安排。",
},
]
)
for index, tool in enumerate(middleware.tools, start=1):
print(f"\n# {index}. {tool.name}")
print(f"response_format: {tool.response_format}")
print("args:")
for k,v in tool.args.items():
print(f"""\
{k}:
type: {v.get('type')}
title: {v.get('title')}
description: \n{v.get('description')}""")
print("description:", tool.description)
print('-'*100)
输出如下所示:
# 1. start_async_task
response_format: content
args:
description:
type: string
title: Description
description:
A detailed description of the task for the async subagent to perform.
subagent_type:
type: string
title: Subagent Type
description:
The type of async subagent to use. Must be one of the available types listed in the tool description.
description: Start an async subagent on a remote LangGraph server. The subagent runs in the background and returns a task ID immediately.
Available async agent types:
- transport_agent: 一个专注于出行安排的个人助理,负责处理与机票购买相关的安排。
- accommodation_agent: 一个专注于住宿安排的个人助理,负责处理与酒店预订相关的安排。
## Usage notes:
1. This tool launches a background task and returns immediately with a task ID. Report the task ID to the user and stop — do NOT immediately check status.
2. Use `check_async_task` only when the user asks for a status update or result.
3. Use `update_async_task` to send new instructions to a running task.
4. Multiple async subagents can run concurrently — launch several and let them run in the background.
5. The subagent runs on a remote LangGraph server, so it has its own tools and capabilities.
----------------------------------------------------------------------------------------------------
# 2. check_async_task
response_format: content
args:
task_id:
type: string
title: Task Id
description:
The exact task_id string returned by start_async_task. Pass it verbatim.
description: Check the status of an async subagent task. Returns the current status and, if complete, the result.
----------------------------------------------------------------------------------------------------
# 3. update_async_task
response_format: content
args:
task_id:
type: string
title: Task Id
description:
The exact task_id string returned by start_async_task. Pass it verbatim.
message:
type: string
title: Message
description:
Follow-up instructions or context to send to the subagent.
description: Send updated instructions to an async subagent. Interrupts the current run and starts a new one on the same thread, so the subagent sees the full conversation history plus your new message. The task_id remains the same.
----------------------------------------------------------------------------------------------------
# 4. cancel_async_task
response_format: content
args:
task_id:
type: string
title: Task Id
description:
The exact task_id string returned by start_async_task. Pass it verbatim.
description: Cancel a running async subagent task. Use this to stop a task that is no longer needed.
----------------------------------------------------------------------------------------------------
# 5. list_async_tasks
response_format: content
args:
status_filter:
type: None
title: Status Filter
description:
Filter tasks by status. One of: 'running', 'success', 'error', 'cancelled', 'all'. Defaults to 'all'.
description: List tracked async subagent tasks with their current live statuses. By default shows all tasks. Use `status_filter` to narrow by status (e.g. 'running', 'success', 'error', 'cancelled'). Use `check_async_task` to get the full result of a specific completed task.
AsyncSubAgentMiddleware利用重写的wrap_model_call和awrap_model_call方法来应用设置的系统提示词。
class AsyncSubAgentMiddleware(AgentMiddleware[Any, ContextT, ResponseT]):
def wrap_model_call(
self,
request: ModelRequest[ContextT],
handler: Callable[[ModelRequest[ContextT]], ModelResponse[ResponseT]],
) -> ModelResponse[ResponseT]
async def awrap_model_call(
self,
request: ModelRequest[ContextT],
handler: Callable[[ModelRequest[ContextT]], Awaitable[ModelResponse[ResponseT]]],
) -> ModelResponse[ResponseT]
2.3 AsyncSubAgentMiddleware提供的系统提示词
对于我们前面演示的例子来说,当AsyncSubAgentMiddleware附加了它指定的系统提示词后,完整的提示词会编程如下的样子:
你是一个差旅助理,可以帮助用户安排差旅计划。你可以调用`check_calendar`工具查询用户的日程安排,以便更好地规划差旅计划。如果没有日程安排冲突,你可以直接调用注册工具购买机票和预订酒店。
## Async subagents (remote LangGraph servers)
You have access to async subagent tools that launch background tasks on remote LangGraph servers.
### Tools:
- `start_async_task`: Start a new background task. Returns a task ID immediately.
- `check_async_task`: Get current status and result of a task. Returns status + result (if complete).
- `update_async_task`: Send new instructions to a running task. Returns confirmation + updated status.
- `cancel_async_task`: Stop a running task. Returns confirmation.
- `list_async_tasks`: List all tracked tasks with live statuses. Returns summary of all tasks.
### Workflow:
1. **Start** — Use `start_async_task` to start a task. Report the task ID to the user and stop.
Do NOT immediately check the status — the task runs in the background while you and the user continue other work.
2. **Check (on request)** — Only use `check_async_task` when the user explicitly asks for a status update or
result. If the status is "running", report that and stop — do not poll in a loop.
3. **Update** (optional) — Use `update_async_task` to send new instructions to a running task. This interrupts
the current run and starts a fresh one on the same thread. The task_id stays the same.
4. **Cancel** (optional) — Use `cancel_async_task` to stop a task that is no longer needed.
5. **Collect** — When `check_async_task` returns status "success", the result is included in the response.
6. **List** — Use `list_async_tasks` to see live statuses for all tasks at once, or to recall task IDs after context compaction.
### Critical rules:
- After launching, ALWAYS return control to the user immediately. Never auto-check after launching.
- Never poll `check_async_task` in a loop. Check once per user request, then stop.
- If a check returns "running", tell the user and wait for them to ask again.
- Task statuses in conversation history are ALWAYS stale — a task that was "running" may now be done.
NEVER report a status from a previous tool result. ALWAYS call a tool to get the current status:
use `list_async_tasks` when the user asks about multiple tasks or "all tasks",
use `check_async_task` when the user asks about a specific task.
- Always show the full task_id — never truncate or abbreviate it.
### When to use async subagents:
- Long-running tasks that would block the main agent
- Tasks that benefit from running on specialized remote deployments
- When you want to run multiple tasks concurrently and collect results later
Available async subagent types:
- transport_agent: 一个专注于出行安排的个人助理,负责处理与机票购买相关的安排。
- accommodation_agent: 一个专注于住宿安排的个人助理,负责处理与酒店预订相关的安排。
System prompt being used: 你是一个差旅助理,可以帮助用户安排差旅计划。你可以调用`check_calendar`工具查询用户的日程安排,以便更好地规划差旅计划。如果没有日程安排冲突,你可以直接调用注册工具购买机票和预订酒店。
## Async subagents (remote LangGraph servers)
You have access to async subagent tools that launch background tasks on remote LangGraph servers.
### Tools:
- `start_async_task`: Start a new background task. Returns a task ID immediately.
- `check_async_task`: Get current status and result of a task. Returns status + result (if complete).
- `update_async_task`: Send new instructions to a running task. Returns confirmation + updated status.
- `cancel_async_task`: Stop a running task. Returns confirmation.
- `list_async_tasks`: List all tracked tasks with live statuses. Returns summary of all tasks.
### Workflow:
1. **Start** — Use `start_async_task` to start a task. Report the task ID to the user and stop.
Do NOT immediately check the status — the task runs in the background while you and the user continue other work.
2. **Check (on request)** — Only use `check_async_task` when the user explicitly asks for a status update or
result. If the status is "running", report that and stop — do not poll in a loop.
3. **Update** (optional) — Use `update_async_task` to send new instructions to a running task. This interrupts
the current run and starts a fresh one on the same thread. The task_id stays the same.
4. **Cancel** (optional) — Use `cancel_async_task` to stop a task that is no longer needed.
5. **Collect** — When `check_async_task` returns status "success", the result is included in the response.
6. **List** — Use `list_async_tasks` to see live statuses for all tasks at once, or to recall task IDs after context compaction.
### Critical rules:
- After launching, ALWAYS return control to the user immediately. Never auto-check after launching.
- Never poll `check_async_task` in a loop. Check once per user request, then stop.
- If a check returns "running", tell the user and wait for them to ask again.
- Task statuses in conversation history are ALWAYS stale — a task that was "running" may now be done.
NEVER report a status from a previous tool result. ALWAYS call a tool to get the current status:
use `list_async_tasks` when the user asks about multiple tasks or "all tasks",
use `check_async_task` when the user asks about a specific task.
- Always show the full task_id — never truncate or abbreviate it.
### When to use async subagents:
- Long-running tasks that would block the main agent
- Tasks that benefit from running on specialized remote deployments
- When you want to run multiple tasks concurrently and collect results later
Available async subagent types:
- transport_agent: 一个专注于出行安排的个人助理,负责处理与机票购买相关的安排。
- accommodation_agent: 一个专注于住宿安排的个人助理,负责处理与酒店预订相关的安排。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)