WebSocket 实时通信
Chat Session、Tool Call 与记忆首页的实时事件链路说明
WebSocket 实时通信总览
Quka 的实时能力采用「HTTP + WebSocket」组合:
- HTTP 负责创建会话、发送消息、停止流、拉取历史
- WebSocket(Centrifuge)负责推送增量事件(AI 流式回复、工具调用进度、知识处理状态)
1. 连接与鉴权
前端在登录后调用 buildTower(...) 建立连接:
- 连接地址:
{host}/connect(自动从 http/https 转 ws/wss) - Token 放在 Centrifuge
token - 请求头包含
x-appid与x-auth-type(普通 access token 或 authorization token)
后端路由为:
GET /api/v1/connect->handler.Websocket->centrifugeManager.HandleWebSocket
认证与订阅权限由 pkg/socket/centrifuge/auth.go 处理:
- 连接阶段校验 token,得到
UserID - 订阅阶段校验频道访问权限(用户频道、空间知识频道、会话频道)
2. 频道与消息外层格式
主要频道:
- 用户频道:
/user/{userID} - 会话频道:
/chat_session/{spaceID}/{sessionID} - 记忆首页频道:
/knowledge/list/{spaceID}
服务端统一推送格式:
subject:事件域(如on_message、on_message_init、stage_changed)type:事件类型(后端按字符串下发,前端会转数字)data:业务负载(如StreamMessage、知识 stage 变更)
3. Chat Session 实时链路
3.1 发送阶段(HTTP)
前端 chat-session.tsx 的发送流程:
POST /:spaceid/chat/:session/message/id生成message_idPOST /:spaceid/chat/:session/message发送用户消息,返回answer_id- 前端先本地插入 user 消息,并进入
aiTyping=true
后端 ChatLogic.NewUserMessage:
- 写入用户消息到 DB
- 启动对应 Agent(Auto/Butler/Journal)
- 返回
answer_id给前端
3.2 流式阶段(WebSocket)
前端订阅:
/chat_session/{spaceID}/{sessionID}- 仅消费
subject in ["on_message","on_message_init"]
前端事件处理关键点:
- 有队列
queue,按 200ms 周期消费,避免并发写 UI - 50s
message daemon超时兜底,触发历史重载 - 断线或状态异常时会
loadData(1)全量回补
事件映射(前后端一致):
1EVENT_ASSISTANT_INIT2EVENT_ASSISTANT_CONTINUE3EVENT_ASSISTANT_DONE4EVENT_ASSISTANT_FAILED5EVENT_TOOL_INIT6EVENT_TOOL_CONTINUE7EVENT_TOOL_DONE8EVENT_TOOL_FAILED
4. ReActAgent 与 Tool Call 推送逻辑
AutoAssistant 使用 Eino ReActAgent,核心在 auto_assistant.go:
CreateAutoRagReActAgent(...)组装工具(RAG/WebSearch/Knowledge/OCR/Vision)CreateReActAgentWithConfig(...)创建 ReActAgent,启用StreamToolCallChecker
4.1 Assistant 文本流
由 EinoResponseHandler + CallbackHandlers 负责:
- 首个有效 chunk 到来时创建 assistant 消息(
RecvMessageInit) - 增量文本写入
ReceiveFunc,触发EVENT_ASSISTANT_CONTINUE - 流结束触发
DoneFunc,落EVENT_ASSISTANT_DONE/FAILED
4.2 Tool Call 流
由 NotifyingTool 包装每个工具:
InvokableRun前先RecvMessageInit,创建 tool 消息(role=tool)- 下发
ToolTips运行中状态 ->EVENT_TOOL_CONTINUE - 工具执行完成或失败 ->
EVENT_TOOL_DONE/FAILED - 工具参数/结果写入消息扩展,前端可通过
GetMessageExt查看
前端收到 msg_type = MESSAGE_TYPE_TOOL_TIPS 时,会更新 toolTips 列表与状态,不与普通 assistant 文本混用。
5. 停止生成(Stop Stream)
前端点击停止:
POST /:spaceid/chat/:session/stop
后端:
ChatLogic.StopStream->Centrifuge.NewCloseChatStreamSignal(sessionID)- 先尝试本机
streamSignals执行closeFunc - 同时发布系统频道
system:stop_chat_stream(用于分布式场景传播)
Agent 请求侧在 ChatSessionHandle/ButlerSessionHandle/JournalSessionHandle 中通过 RegisterStreamSignal 注册取消函数,实现中断流式推理。
6. 记忆首页(Knowledge)监听逻辑
前端 pages/dashboard/knowledge.tsx 订阅:
/knowledge/list/{spaceID}- 仅处理
subject = "stage_changed"
处理策略:
Embedding:仅更新列表项阶段状态Done:调用GetKnowledge(...)拉取最新内容替换列表项
后端在知识处理流程中通过 publishStageChangedMessage(...) 发布:
- topic:
/knowledge/list/{spaceID} - subject:
stage_changed
7. 稳定性与兼容性设计
- 前端
CentrifugeManager支持指数退避重连(1s 起步,最大 30s,最多 5 次) - 后端
type按字符串下发,前端统一parseInt做兼容 - 前端保留历史重载兜底,防止局部事件丢失造成 UI 与 DB 不一致