logoQukaAI

WebSocket 实时通信

Chat Session、Tool Call 与记忆首页的实时事件链路说明

WebSocket 实时通信总览

Quka 的实时能力采用「HTTP + WebSocket」组合:

  • HTTP 负责创建会话、发送消息、停止流、拉取历史
  • WebSocket(Centrifuge)负责推送增量事件(AI 流式回复、工具调用进度、知识处理状态)

1. 连接与鉴权

前端在登录后调用 buildTower(...) 建立连接:

  • 连接地址:{host}/connect(自动从 http/https 转 ws/wss)
  • Token 放在 Centrifuge token
  • 请求头包含 x-appidx-auth-type(普通 access token 或 authorization token)

后端路由为:

  • GET /api/v1/connect -> handler.Websocket -> centrifugeManager.HandleWebSocket

认证与订阅权限由 pkg/socket/centrifuge/auth.go 处理:

  • 连接阶段校验 token,得到 UserID
  • 订阅阶段校验频道访问权限(用户频道、空间知识频道、会话频道)

2. 频道与消息外层格式

主要频道:

  • 用户频道:/user/{userID}
  • 会话频道:/chat_session/{spaceID}/{sessionID}
  • 记忆首页频道:/knowledge/list/{spaceID}

服务端统一推送格式:

  • subject:事件域(如 on_messageon_message_initstage_changed
  • type:事件类型(后端按字符串下发,前端会转数字)
  • data:业务负载(如 StreamMessage、知识 stage 变更)

3. Chat Session 实时链路

3.1 发送阶段(HTTP)

前端 chat-session.tsx 的发送流程:

  1. POST /:spaceid/chat/:session/message/id 生成 message_id
  2. POST /:spaceid/chat/:session/message 发送用户消息,返回 answer_id
  3. 前端先本地插入 user 消息,并进入 aiTyping=true

后端 ChatLogic.NewUserMessage

  • 写入用户消息到 DB
  • 启动对应 Agent(Auto/Butler/Journal)
  • 返回 answer_id 给前端

3.2 流式阶段(WebSocket)

前端订阅:

  • /chat_session/{spaceID}/{sessionID}
  • 仅消费 subject in ["on_message","on_message_init"]

前端事件处理关键点:

  • 有队列 queue,按 200ms 周期消费,避免并发写 UI
  • 50s message daemon 超时兜底,触发历史重载
  • 断线或状态异常时会 loadData(1) 全量回补

事件映射(前后端一致):

  • 1 EVENT_ASSISTANT_INIT
  • 2 EVENT_ASSISTANT_CONTINUE
  • 3 EVENT_ASSISTANT_DONE
  • 4 EVENT_ASSISTANT_FAILED
  • 5 EVENT_TOOL_INIT
  • 6 EVENT_TOOL_CONTINUE
  • 7 EVENT_TOOL_DONE
  • 8 EVENT_TOOL_FAILED

4. ReActAgent 与 Tool Call 推送逻辑

AutoAssistant 使用 Eino ReActAgent,核心在 auto_assistant.go

  • CreateAutoRagReActAgent(...) 组装工具(RAG/WebSearch/Knowledge/OCR/Vision)
  • CreateReActAgentWithConfig(...) 创建 ReActAgent,启用 StreamToolCallChecker

4.1 Assistant 文本流

EinoResponseHandler + CallbackHandlers 负责:

  • 首个有效 chunk 到来时创建 assistant 消息(RecvMessageInit
  • 增量文本写入 ReceiveFunc,触发 EVENT_ASSISTANT_CONTINUE
  • 流结束触发 DoneFunc,落 EVENT_ASSISTANT_DONE/FAILED

4.2 Tool Call 流

NotifyingTool 包装每个工具:

  1. InvokableRun 前先 RecvMessageInit,创建 tool 消息(role=tool)
  2. 下发 ToolTips 运行中状态 -> EVENT_TOOL_CONTINUE
  3. 工具执行完成或失败 -> EVENT_TOOL_DONE/FAILED
  4. 工具参数/结果写入消息扩展,前端可通过 GetMessageExt 查看

前端收到 msg_type = MESSAGE_TYPE_TOOL_TIPS 时,会更新 toolTips 列表与状态,不与普通 assistant 文本混用。

5. 停止生成(Stop Stream)

前端点击停止:

  • POST /:spaceid/chat/:session/stop

后端:

  • ChatLogic.StopStream -> Centrifuge.NewCloseChatStreamSignal(sessionID)
  • 先尝试本机 streamSignals 执行 closeFunc
  • 同时发布系统频道 system:stop_chat_stream(用于分布式场景传播)

Agent 请求侧在 ChatSessionHandle/ButlerSessionHandle/JournalSessionHandle 中通过 RegisterStreamSignal 注册取消函数,实现中断流式推理。

6. 记忆首页(Knowledge)监听逻辑

前端 pages/dashboard/knowledge.tsx 订阅:

  • /knowledge/list/{spaceID}
  • 仅处理 subject = "stage_changed"

处理策略:

  • Embedding:仅更新列表项阶段状态
  • Done:调用 GetKnowledge(...) 拉取最新内容替换列表项

后端在知识处理流程中通过 publishStageChangedMessage(...) 发布:

  • topic: /knowledge/list/{spaceID}
  • subject: stage_changed

7. 稳定性与兼容性设计

  • 前端 CentrifugeManager 支持指数退避重连(1s 起步,最大 30s,最多 5 次)
  • 后端 type 按字符串下发,前端统一 parseInt 做兼容
  • 前端保留历史重载兜底,防止局部事件丢失造成 UI 与 DB 不一致