[数智金融][10]为接入平台开发一个平台适配器
Bearer Token 鉴权可以理解为:客户端请求接口时,必须带上一串“通行证字符串”,服务器验证这串 token 是否正确,正确才允许访问。例如意思是:“我是带着 abc123456 这个 token 来访问接口的,请验证我。的请求才能通过。如果没有带,或者 token 写错了,服务器应该拒绝请求如果是浏览器前端直接请求 AstrBot HTTP 插件,例如前端运行在,而插件接口运行在由于浏览
前言
要想为我们的项目引入astrbot,首先要有一个平台适配器。对于一般的IM,用的是napcat,我的任务就是开发和napcat功能类似的适配器插件。
基本思路和开发过程
最简单的方式,就是通过适配器插件为我们的金融app提供相关的http接口,我们的app可以使用标准 HTTP API 或 SSE 流式接口与 AstrBot 进行交互。所以这里插件开发的大致方法是和一般项目后端开发的方法是类似的。
以数据流动的顺序开发适配器
请求传入适配器后
由于astrbot能够处理的数据类型和我们传入的数据类型不一样。当我们的请求传入适配器后,我们首先要做的就是转换数据类型。
对于相关请求元数据(不包含主要内容),我们自定义
@dataclass
class HTTPRequestData:#保存一次http请求的元信息
"""HTTP 请求数据"""
method: str#请求方法,如GET、POST、PUT、DELETE等
url: str#请求URL
headers: dict[str, str]#请求头
remote_addr: str | None = None#请求的远程地址
user_agent: str | None = None#请求的用户代理
content_type: str | None = None#请求的内容类型,如application/json、application/x-www-form-urlencoded等
accept: str | None = None#请求的接受类型,如application/json、application/x-www-form-urlencoded等
timestamp: float = field(default_factory=time.time)#请求的时间,单位为秒
对于具体的请求体,我们首先将其分解并规范化为astrbot中的消息组件,然后将其组装为astrbot支持的内部消息。
#构造astrbot内部消息
def _build_astrbot_message(
self,
*,
data: Any,
messages: list[BaseMessageComponent],#消息组件,是零件
message_str: str,
user_id: str,
nickname: str,
session_id: str,
message_id: str | None = None,
) -> AstrBotMessage:
abm = AstrBotMessage()#构造astrbot内部消息
abm.self_id = str(self._metadata.id)#平台ID,机器人的id
abm.sender = MessageMember(user_id=user_id, nickname=nickname)#构造消息发送者
#下面这行是关键,说明http请求被包装成群消息
abm.type = MessageType.GROUP_MESSAGE#消息类型
#会话ID和消息ID是关键,会话ID是该平台该用户唯一会话的ID,消息ID是该消息的唯一ID,能保证记忆
abm.session_id = session_id
abm.message_id = message_id or str(uuid.uuid4().hex)
abm.message = messages#把消息组件包成包裹
abm.message_str = message_str
abm.raw_message = data
abm.timestamp = int(time.time())
return abm
适配器将请求传入eventbus
之后,如下图所示,消息平台适配器需要将请求传入event bus。也就是说,要想让astrbot处嗯传入的消息,我们需要把消息包装成事件。
然后astrbot处理这个事件,返回结果给适配器。适配器再将这个结果返回给我们的金融app。
由于整个过程是一个异步的过程,而且最终的适配器实例也只有一个。所以我们需要特别解决当大模型处理完事件后,将结果返回到什么地方的问题。
这里我采用的方法是对于一次http请求。创建唯一的事件id并定义一个future类型的变量(这个类型的变量可以理解为一个未来才会有结果的盒子,当这个future被设置结果后,response就会被返回给金融app)。并将future类型的变量加入我们自定义的pendingResponses中
self.pending_responses: dict[str, PendingResponse] = {}#正在等待的HTTP响应,普通 /message 请求不是立即返回,而是会创建一个 Future 等 AstrBot 回复。这个等待对象就放在 pending_responses
@dataclass#保存“正在等待的htttp响应”,http请求已经进来了,但是astrbot还没有响应,那么这个请求就是“正在等待的htttp响应”
class PendingResponse:
"""待处理响应"""
future: asyncio.Future[Any]#异步任务的future,用于等待响应
created_at: float = field(default_factory=time.time)#创建时间,单位为秒
timeout: int = 30#超时时间,单位为秒
session_id: str | None = None#会话ID
这个responses容器的键就是event_id,值是一个response对象,其中包含待返回的future。
通过将adapter实例和event_id传入事件,当大模型处理完成后,通过事件关联的adapter对象和event_id找到待返回的future。至此,整个数据流就形成了闭环。
event = StandardHTTPMessageEvent(
message_str=message_str,
message_obj=abm,
platform_meta=self._metadata,
session_id=session_id,
adapter=self,
event_id=event_id,
request_data=request_data,
)
event.set_extra("data", data)#把请求体数据存储到事件中,方便后续使用
event.is_wake = True#是否唤醒
event.is_at_or_wake_command = True#是否是@或唤醒命令
#提交事件,事件会进入事件队列,等待处理
#这里其实就是把内容交给astrbot
self.commit_event(event)#提交事件
self.total_requests_processed += 1
astrbot处理和返回数据
至于astrbot对具体事件是如何处理的,这里是由astrbot自身的逻辑决定的。我们需要做的就是继承AstrMessageEvent来构造我们的httpevent,其中,具体需要做的就是重写send逻辑:在大模型处理完后,astrbot会将返回信息包装成message chain,我做的就是提取message chain,将其转化为可以用于返回的字典格式,然后找到并填充待返回的future。
for message in message_chain.chain:
#把消息组件转换成字典,方便后续处理,用于返回
response_json, text_type = BMC2Dict(message)
full_response.append({
"content": response_json,
"type": text_type
})
if self._cached_response is None:
self._cached_response = []
# 获取待处理响应
pending = None
if self.event_id in self._adapter.pending_responses:
pending = self._adapter.pending_responses.pop(self.event_id)
# 设置响应结果
if pending and not pending.future.done():
result_json = self._cached_response
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
current_loop = None
try:
future_loop = pending.future.get_loop()
except Exception:
future_loop = None
def _do_set_result() -> None:
if pending.future.done():
return
try:
pending.future.set_result(result_json)
except asyncio.InvalidStateError:
# Future was completed concurrently; ignore.
pass
if current_loop is not None and future_loop is not None and current_loop is future_loop:
_do_set_result()
elif future_loop is not None:
try:
future_loop.call_soon_threadsafe(_do_set_result)
except RuntimeError:
_do_set_result()
else:
_do_set_result()
logger.debug(f"[StandardHTTPMessageEvent] 已发送响应 (event_id: {self.event_id}, 消息数: {len(self._cached_response)})")
else:
logger.warning(f"[StandardHTTPMessageEvent] 没有找到待处理响应: event_id={self.event_id}")
response = await asyncio.wait_for(future, timeout=timeout)


整体流程
外部 App / 后端
↓ HTTP 请求
HTTPAdapter
↓ 转换成 AstrBot 消息事件,到这里,是adapter的作用。
下面的功能是由重写的httpmessageevent实现的
↓
AstrBot 插件系统 / LLM
↓ 生成回复
httpmessageevent
↓ 规范格式,填充future
HTTPAdapter
↓ HTTP JSON 或 SSE 流式返回
外部 App / 后端
“”"
适配器文档(方便其他人使用)
我开发的插件的功能是为astrbot提供“http/https接入能力”,通过该插件,我的金融app可以使用标准http api或sse流式接口与astrbot进行交互。
功能特性
核心能力
- 支持标准 HTTP 请求响应
- 支持 SSE 流式响应
- 支持普通模式与流式模式两种调用方式
- 支持多种消息入参格式
- 支持 Bearer Token 鉴权
- 支持 CORS 跨域配置
- 支持请求超时控制
- 支持健康检查接口
安装方法
将zip文件放入 AstrBot 插件目录后,重启 AstrBot 即可。
配置说明
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
http_host |
字符串 | 0.0.0.0 |
HTTP 服务监听地址 |
http_port |
整数 | 8080 |
HTTP 服务监听端口 |
api_prefix |
字符串 | /api/v1 |
API 前缀 |
enable_http_api |
布尔 | true |
是否启用 HTTP API |
auth_token |
字符串 | "" |
Bearer Token,为空表示不启用鉴权 |
cors_origins |
字符串 | * |
允许的跨域来源,多个来源可用逗号分隔 |

这里可以由webui显式进行配置。
API 文档
基础信息
- Base URL:
http://<http_host>:<http_port><api_prefix>
默认情况下:
http://127.0.0.1:8080/api/v1
- 认证方式:
Authorization: Bearer <auth_token>
- 请求体类型:
application/json
1. 健康检查
GET /health
用于检查 HTTP 适配器是否正常运行。
示例返回:
{
"status": "ok",
"service": "astrbot_http_adapter",
"timestamp": 1710000000.0,
"pending_responses": 0,
"version": "1.0.0"
}
2. 发送消息(标准 HTTP)
POST /message
发送普通 HTTP 请求,并等待完整响应返回。
请求体示例
{
"message": "你好,AstrBot",
"platform": "http_test",
"user_id": "123456",
"nickname": "测试用户"
}
可选字段
| 字段 | 类型 | 说明 |
|---|---|---|
session_id |
字符串 | 自定义会话 ID,不传时默认使用 <platform>_<user_id> |
message_id |
字符串 | 自定义消息 ID |
timeout |
整数 | 请求超时时间,单位秒 |
username |
字符串 | 用户名,未传时会回退到 nickname |
标准返回示例
{
"success": true,
"response": [
{
"content": {
"type": "text",
"data": {
"text": "你好,很高兴见到你。"
}
},
"type": "ComponentType.Plain"
}
],
"event_id": "f1d6d516-f95b-45b4-9f7f-4f80f2fef0c0",
"session_id": "http_test_123456",
"timestamp": 1710000000.0
}
response返回的是 AstrBot 组件数组,而不是单纯字符串。
3. 流式发送消息(SSE)
POST /message/stream
发送请求后,以 SSE 方式持续接收分片响应。
返回类型:
Content-Type: text/event-stream
SSE 请求体示例
{
"message": "请写一段自我介绍",
"platform": "http_test",
"user_id": "123456",
"nickname": "测试用户",
"heartbeat_interval": 5
}
SSE 返回示例
event: connected
data: {"event_id":"xxx","session_id":"http_test_123456"}
event: message
data: {"type":"message","data":{"content":{"type":"text","data":{"text":"你好"}}},"text_type":"ComponentType.Plain"}
event: message
data: {"type":"message","data":{"content":{"type":"text","data":{"text":",欢迎使用 AstrBot"}}},"text_type":"ComponentType.Plain"}
event: end
data: {"type":"end","data":{}}
事件说明
| 事件类型 | 说明 |
|---|---|
connected |
SSE 连接建立成功 |
message |
收到一段消息分片 |
end |
正常结束 |
timeout |
流式请求超时结束 |
error |
流式处理过程中发生错误 |
4. 消息输入格式
当前版本的 message 字段支持多种格式,便于不同客户端按自己的数据结构接入。
4.1 纯文本
{
"message": "你好,AstrBot"
}
4.2 单个组件对象
{
"message": {
"type": "image",
"data": {
"url": "https://example.com/demo.png"
}
}
}
4.3 混合组件数组
{
"message": [
{
"type": "text",
"data": {
"text": "请描述这张图"
}
},
{
"type": "image",
"data": {
"url": "https://example.com/demo.png"
}
}
]
}
4.4 简写对象格式
文本
{
"message": {
"text": "hello"
}
}
图片
{
"message": {
"image_url": "https://example.com/demo.png"
}
}
音频
{
"message": {
"audio_url": "https://example.com/demo.mp3"
}
}
视频
{
"message": {
"video_url": "https://example.com/demo.mp4"
}
}
文件
{
"message": {
"file_url": "https://example.com/demo.pdf",
"name": "demo.pdf"
}
}
4.5 OpenAI 风格内容片段
单条消息对象
{
"message": {
"role": "user",
"content": [
{
"type": "input_text",
"text": "请分析这张图片"
},
{
"type": "input_image",
"image_url": "https://example.com/demo.png"
}
]
}
}
内容片段数组
{
"message": [
{
"type": "input_text",
"text": "帮我总结文件内容"
},
{
"type": "input_file",
"file_url": "https://example.com/demo.pdf",
"filename": "demo.pdf"
}
]
}
5. 常见组件类型
当前常用输入组件类型包括:
textimagerecordvideofileatreplypokefacesharelocationmusicjsonnodenodes
6. 请求头额外数据说明
除了 JSON 请求体中的 message、platform、user_id 等字段之外,当前版本也会保留原始 HTTP 请求头信息,方便在插件逻辑中读取调用方(这里也就是我们的金融app)传入的附加数据。
处理逻辑
请求进入 HTTP 适配器后,会先读取原始请求头,并将其保存到事件对象中,主要会放到下面两个位置:
event.headersevent.get_extra("request_headers")
同时,适配器还会把一些常用请求信息拆分后放入 extra 字段,例如:
event.get_extra("request_method")event.get_extra("request_url")event.get_extra("remote_addr")event.get_extra("user_agent")event.get_extra("content_type")event.get_extra("accept")event.get_extra("request_timestamp")event.get_extra("data")(原始 JSON 请求体)
也就是说,请求头不会参与消息组件解析本身,但会作为事件上下文的一部分完整保留下来,供插件自行读取。
如何添加额外数据
我们可以在请求时直接添加自定义 Header,例如:
curl -X POST "http://127.0.0.1:8080/api/v1/message" \
-H "Content-Type: application/json" \
-H "X-User-Role: admin" \
-H "X-Trace-Id: trace-demo-001" \
-d "{\"message\":\"hello\",\"platform\":\"http_test\",\"user_id\":\"123456\",\"nickname\":\"tester\"}"
Python 示例:
import requests
resp = requests.post(
"http://127.0.0.1:8080/api/v1/message",
headers={
"Content-Type": "application/json",
"X-User-Role": "admin",
"X-Trace-Id": "trace-demo-001",
},
json={
"message": "hello",
"platform": "http_test",
"user_id": "123456",
"nickname": "tester",
},
timeout=30,
)
print(resp.json())
如何在插件中获取额外数据
在 AstrBot 插件事件处理中,可以直接从事件对象里读取:
request_headers = event.get_extra("request_headers") or {}
trace_id = request_headers.get("X-Trace-Id")
user_role = request_headers.get("X-User-Role")
raw_data = event.get_extra("data") or {}
如果希望直接访问原始请求头映射,也可以使用:
request_headers = getattr(event, "headers", {}) or {}
trace_id = request_headers.get("X-Trace-Id")
一个更完整的示例:
@filter.on_message_event()
async def handle_http_extra_data(event: AstrMessageEvent):
if not event.get_extra("http_request"):
return
request_headers = event.get_extra("request_headers") or {}
trace_id = request_headers.get("X-Trace-Id", "")
user_role = request_headers.get("X-User-Role", "")
remote_addr = event.get_extra("remote_addr", "")
payload = event.get_extra("data") or {}
print("trace_id:", trace_id)
print("user_role:", user_role)
print("remote_addr:", remote_addr)
print("payload:", payload)
使用建议
- 业务标识类信息,建议放在自定义 Header 中,例如
X-Trace-Id、X-Tenant-Id、X-User-Role - 需要参与业务消息本身的内容,建议仍然放在 JSON 请求体中
- 如果是浏览器跨域调用,自定义 Header 是否能成功发送,还会受到 CORS 预检限制
当前插件默认允许的常见请求头包括:
Content-TypeAuthorizationAcceptX-Request-IDOrigin
如果从浏览器前端跨域发送额外自定义 Header,而该 Header 不在允许列表中,就可能被浏览器拦截预检请求;脚本调用或服务端对服务端调用通常不会受这个限制。
使用示例
cURL
curl -X POST "http://127.0.0.1:8080/api/v1/message" \
-H "Content-Type: application/json" \
-d "{\"message\":\"hello\",\"platform\":\"http_test\",\"user_id\":\"123456\",\"nickname\":\"tester\"}"
Python
import requests
BASE_URL = "http://127.0.0.1:8080/api/v1"
payload = {
"message": [
{"type": "text", "data": {"text": "请解释这张图"}},
{"type": "image", "data": {"url": "https://example.com/demo.png"}}
],
"platform": "http_test",
"user_id": "123456",
"nickname": "测试用户"
}
resp = requests.post(f"{BASE_URL}/message", json=payload, timeout=60)
print(resp.json())
JavaScript
const payload = {
message: {
role: "user",
content: [
{ type: "input_text", text: "请总结这张图片" },
{ type: "input_image", image_url: "https://example.com/demo.png" }
]
},
platform: "http_test",
user_id: "123456",
nickname: "测试用户"
};
const resp = await fetch("http://127.0.0.1:8080/api/v1/message", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(payload)
});
console.log(await resp.json());
安全建议
- 生产环境建议配置
auth_token - 对公网开放时建议使用 HTTPS
- 不建议暴露未鉴权的接口到公网
- 如需浏览器访问,请按需配置
cors_origins
常见问题
Q:/message 和 /message/stream 的区别是什么?
/message:等待完整结果后一次性返回/message/stream:以 SSE 分片方式持续返回
Q:为什么 response 是数组?
因为 AstrBot 返回的是消息组件结构,而不是单一文本字段。
Q:媒体消息一定要用 URL 吗?
不一定。当前接口支持 url、file、部分简写字段,以及 OpenAI 风格内容片段格式。
在跨机器调用场景下,更推荐使用可访问的 URL 或可被服务端识别的安全格式。
Q:什么是Bearer Token 鉴权?
Bearer Token 鉴权可以理解为:客户端请求接口时,必须带上一串“通行证字符串”,服务器验证这串 token 是否正确,正确才允许访问。
例如
curl -X POST "http://127.0.0.1:8080/api/v1/message" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer abc123456" \
-d "{\"message\":\"hello\",\"platform\":\"http_test\",\"user_id\":\"123456\"}"
这里的核心是:Bearer abc123456意思是:“我是带着 abc123456 这个 token 来访问接口的,请验证我。”
如果我们的服务端配置了:auth_token = abc123456,那么,只有请求头里带:Authorization: Bearer abc123456的请求才能通过。
如果没有带,或者 token 写错了,服务器应该拒绝请求
Q:什么是 CORS 跨域配置?
如果是浏览器前端直接请求 AstrBot HTTP 插件,例如前端运行在http://localhost:3000,而插件接口运行在 http://127.0.0.1:8080,
由于浏览器存在同源策略限制,需要在插件中配置 cors_origins。
例如:
cors_origins = http://localhost:3000
表示允许 http://localhost:3000 这个前端网页跨域访问插件接口。
但如果是金融 App 后端请求 AstrBot HTTP 插件,则通常不需要配置
cors_origins,因为 CORS 主要是浏览器安全策略,不限制服务端到服务端的请求。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)