From 60e8ac0ce9d0adc03a0f5ced00f3c258d6155ef1 Mon Sep 17 00:00:00 2001 From: DLmaster361 Date: Sat, 20 Sep 2025 18:41:25 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=B7=BB=E5=8A=A0ws=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=9F=E5=88=97=E9=81=BF=E5=85=8D=E6=B6=88=E6=81=AF=E6=BC=8F?= =?UTF-8?q?=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/composables/useWebSocket.ts | 149 ++++++++++++++++++++++- 1 file changed, 143 insertions(+), 6 deletions(-) diff --git a/frontend/src/composables/useWebSocket.ts b/frontend/src/composables/useWebSocket.ts index 06ccdee..fc1389d 100644 --- a/frontend/src/composables/useWebSocket.ts +++ b/frontend/src/composables/useWebSocket.ts @@ -55,6 +55,8 @@ interface GlobalWSStorage { wsRef: WebSocket | null status: Ref subscribers: Ref> + // 修改消息队列结构,使用Map存储消息和时间戳 + messageQueue: Ref> heartbeatTimer?: number isConnecting: boolean lastPingTime: number @@ -84,6 +86,8 @@ const initGlobalStorage = (): GlobalWSStorage => { wsRef: null, status: ref('已断开'), subscribers: ref(new Map()), + // 初始化消息队列 + messageQueue: ref(new Map()), heartbeatTimer: undefined, isConnecting: false, lastPingTime: 0, @@ -289,6 +293,18 @@ const startGlobalHeartbeat = (ws: WebSocket) => { }, HEARTBEAT_INTERVAL) } +// 获取消息队列 +const getMessageQueue = (): Map => { + const global = getGlobalStorage() + return global.messageQueue.value +} + +// 设置消息队列 +const setMessageQueue = (queue: Map): void => { + const global = getGlobalStorage() + global.messageQueue.value = queue +} + const handleMessage = (raw: WebSocketBaseMessage) => { const global = getGlobalStorage() const msgType = String(raw.type) @@ -382,13 +398,27 @@ const handleMessage = (raw: WebSocketBaseMessage) => { console.log('[WebSocket Debug] 分发消息给订阅者:', id) dispatch(sub) } else { - console.warn('[WebSocket Debug] 未找到对应的订阅者:', id) + // 将没有订阅者的消息暂存到消息队列中 + console.warn('[WebSocket Debug] 未找到对应的订阅者,将消息暂存到队列中:', id) + const messageQueue = getMessageQueue() + messageQueue.set(id, { + message: raw, + timestamp: Date.now() + }) + setMessageQueue(messageQueue) + + // 清理过期消息(超过1分钟的消息) + const now = Date.now() + messageQueue.forEach((queued, msgId) => { + if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒 + console.log('[WebSocket] 清理过期消息:', msgId) + messageQueue.delete(msgId) + } + }) + setMessageQueue(messageQueue) } - } else { - // 无 id 的消息广播给所有订阅者 - console.log('[WebSocket Debug] 广播消息给所有订阅者,订阅者数量:', global.subscribers.value.size) - global.subscribers.value.forEach((sub: WebSocketSubscriber) => dispatch(sub)) } + // 移除无id消息的处理逻辑,因为后端不会发送这类消息 } // 后端启动后建立连接的公开函数 @@ -455,6 +485,13 @@ const createGlobalWebSocket = (): WebSocket => { } catch { /* ignore */ } + + // 自动订阅ID为"Main"的消息,用于处理ping-pong等系统消息 + _subscribe("Main", { + onNotify: () => { + // Main ID的消息通常不包含需要处理的Notify内容,但保留处理函数以确保订阅完整性 + } + }); } ws.onmessage = ev => { @@ -618,7 +655,8 @@ export function useWebSocket() { const global = getGlobalStorage() const subscribe = (id: string, handlers: Omit) => { - global.subscribers.value.set(id, { id, ...handlers }) + // 使用全局的subscribe函数来确保消息队列机制正常工作 + _subscribe(id, handlers); } const unsubscribe = (id: string) => { @@ -678,3 +716,102 @@ export function useWebSocket() { getBackendStatus, } } + +/** + * 订阅指定ID的消息 + * @param id 消息ID + * @param subscriber 订阅者对象 + */ +export const _subscribe = (id: string, subscriber: Omit) => { + const global = getGlobalStorage() + const fullSubscriber: WebSocketSubscriber = { ...subscriber, id } + + // 添加订阅者 + global.subscribers.value.set(id, fullSubscriber) + console.log('[WebSocket] 添加订阅者:', id, '当前订阅者数量:', global.subscribers.value.size) + + // 检查消息队列中是否有该订阅者的消息 + const messageQueue = getMessageQueue() + console.log('[WebSocket] 检查消息队列,当前队列大小:', messageQueue.size, '队列内容:', Array.from(messageQueue.entries())) + + // 检查特定ID的消息 + const queuedMessage = messageQueue.get(id) + if (queuedMessage) { + console.log('[WebSocket] 发现队列中的消息,立即分发给新订阅者:', id, '消息内容:', queuedMessage) + // 创建临时订阅者对象用于分发消息 + const tempSubscriber: WebSocketSubscriber = { ...subscriber, id } + console.log('[WebSocket] 开始处理遗留消息,订阅者ID:', id, '消息:', queuedMessage.message) + try { + handleMessageDispatch(queuedMessage.message, tempSubscriber) + console.log('[WebSocket] 遗留消息处理完成,订阅者ID:', id) + } catch (error) { + console.error('[WebSocket] 处理遗留消息时出错,订阅者ID:', id, '错误:', error) + } + // 从队列中移除已处理的消息 + messageQueue.delete(id) + setMessageQueue(messageQueue) + console.log('[WebSocket] 已从队列中移除消息,剩余队列大小:', messageQueue.size) + } else { + console.log('[WebSocket] 未在队列中找到ID为', id, '的消息') + } + + // 清理过期消息(超过1分钟的消息) + const now = Date.now() + let cleanedCount = 0 + messageQueue.forEach((queued, msgId) => { + if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒 + console.log('[WebSocket] 清理过期消息:', msgId, '消息内容:', queued.message, '时间戳:', queued.timestamp) + messageQueue.delete(msgId) + cleanedCount++ + } + }) + if (cleanedCount > 0) { + console.log('[WebSocket] 共清理过期消息数量:', cleanedCount) + setMessageQueue(messageQueue) + } +} + +// 新增函数:处理消息分发 +const handleMessageDispatch = (raw: WebSocketBaseMessage, sub: WebSocketSubscriber) => { + const msgType = raw.type + + if (msgType === 'Signal') return + + if (msgType === 'Progress') return sub.onProgress?.(raw.data as ProgressMessage) + if (msgType === 'Result') return sub.onResult?.(raw.data as ResultMessage) + if (msgType === 'Update') { + // 处理Update类型消息,当作Progress处理 + console.log('[WebSocket Debug] 分发Update消息:', raw) + return sub.onProgress?.(raw as any) + } + if (msgType === 'Info') { + // 处理Info类型消息,当作Notify处理 + console.log('[WebSocket Debug] 分发Info消息:', raw) + return sub.onNotify?.(raw as any) + } + if (msgType === 'Message') { + // 处理Message类型消息,当作Notify处理 + console.log('[WebSocket Debug] 分发Message消息:', raw) + return sub.onNotify?.(raw as any) + } + if (msgType === 'Error') { + sub.onError?.(raw.data as ErrorMessage) + if (!sub.onError && raw.data && (raw.data as ErrorMessage).msg) { + message.error((raw.data as ErrorMessage).msg) + } + return + } + if (msgType === 'Notify') { + sub.onNotify?.(raw.data as NotifyMessage) + if (raw.data && (raw.data as NotifyMessage).title) { + notification.info({ + message: (raw.data as NotifyMessage).title, + description: (raw.data as NotifyMessage).content, + }) + } + return + } + + // 其他类型可扩展 + console.log('[WebSocket Debug] 未处理的消息类型:', msgType, raw) +}