fix: 添加ws消息队列避免消息漏收

This commit is contained in:
DLmaster361
2025-09-20 18:41:25 +08:00
parent a5e09bc489
commit 60e8ac0ce9

View File

@@ -55,6 +55,8 @@ interface GlobalWSStorage {
wsRef: WebSocket | null
status: Ref<WebSocketStatus>
subscribers: Ref<Map<string, WebSocketSubscriber>>
// 修改消息队列结构使用Map存储消息和时间戳
messageQueue: Ref<Map<string, { message: WebSocketBaseMessage; timestamp: number }>>
heartbeatTimer?: number
isConnecting: boolean
lastPingTime: number
@@ -84,6 +86,8 @@ const initGlobalStorage = (): GlobalWSStorage => {
wsRef: null,
status: ref<WebSocketStatus>('已断开'),
subscribers: ref(new Map<string, WebSocketSubscriber>()),
// 初始化消息队列
messageQueue: ref(new Map<string, { message: WebSocketBaseMessage; timestamp: number }>()),
heartbeatTimer: undefined,
isConnecting: false,
lastPingTime: 0,
@@ -289,6 +293,18 @@ const startGlobalHeartbeat = (ws: WebSocket) => {
}, HEARTBEAT_INTERVAL)
}
// 获取消息队列
const getMessageQueue = (): Map<string, { message: WebSocketBaseMessage; timestamp: number }> => {
const global = getGlobalStorage()
return global.messageQueue.value
}
// 设置消息队列
const setMessageQueue = (queue: Map<string, { message: WebSocketBaseMessage; timestamp: number }>): void => {
const global = getGlobalStorage()
global.messageQueue.value = queue
}
const handleMessage = (raw: WebSocketBaseMessage) => {
const global = getGlobalStorage()
const msgType = String(raw.type)
@@ -382,13 +398,27 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
console.log('[WebSocket Debug] 分发消息给订阅者:', id)
dispatch(sub)
} else {
console.warn('[WebSocket Debug] 未找到对应的订阅者:', id)
// 将没有订阅者的消息暂存到消息队列中
console.warn('[WebSocket Debug] 未找到对应的订阅者,将消息暂存到队列中:', id)
const messageQueue = getMessageQueue()
messageQueue.set(id, {
message: raw,
timestamp: Date.now()
})
setMessageQueue(messageQueue)
// 清理过期消息超过1分钟的消息
const now = Date.now()
messageQueue.forEach((queued, msgId) => {
if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒
console.log('[WebSocket] 清理过期消息:', msgId)
messageQueue.delete(msgId)
}
} else {
// 无 id 的消息广播给所有订阅者
console.log('[WebSocket Debug] 广播消息给所有订阅者,订阅者数量:', global.subscribers.value.size)
global.subscribers.value.forEach((sub: WebSocketSubscriber) => dispatch(sub))
})
setMessageQueue(messageQueue)
}
}
// 移除无id消息的处理逻辑因为后端不会发送这类消息
}
// 后端启动后建立连接的公开函数
@@ -455,6 +485,13 @@ const createGlobalWebSocket = (): WebSocket => {
} catch {
/* ignore */
}
// 自动订阅ID为"Main"的消息用于处理ping-pong等系统消息
_subscribe("Main", {
onNotify: () => {
// Main ID的消息通常不包含需要处理的Notify内容但保留处理函数以确保订阅完整性
}
});
}
ws.onmessage = ev => {
@@ -618,7 +655,8 @@ export function useWebSocket() {
const global = getGlobalStorage()
const subscribe = (id: string, handlers: Omit<WebSocketSubscriber, 'id'>) => {
global.subscribers.value.set(id, { id, ...handlers })
// 使用全局的subscribe函数来确保消息队列机制正常工作
_subscribe(id, handlers);
}
const unsubscribe = (id: string) => {
@@ -678,3 +716,102 @@ export function useWebSocket() {
getBackendStatus,
}
}
/**
* 订阅指定ID的消息
* @param id 消息ID
* @param subscriber 订阅者对象
*/
export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id'>) => {
const global = getGlobalStorage()
const fullSubscriber: WebSocketSubscriber = { ...subscriber, id }
// 添加订阅者
global.subscribers.value.set(id, fullSubscriber)
console.log('[WebSocket] 添加订阅者:', id, '当前订阅者数量:', global.subscribers.value.size)
// 检查消息队列中是否有该订阅者的消息
const messageQueue = getMessageQueue()
console.log('[WebSocket] 检查消息队列,当前队列大小:', messageQueue.size, '队列内容:', Array.from(messageQueue.entries()))
// 检查特定ID的消息
const queuedMessage = messageQueue.get(id)
if (queuedMessage) {
console.log('[WebSocket] 发现队列中的消息,立即分发给新订阅者:', id, '消息内容:', queuedMessage)
// 创建临时订阅者对象用于分发消息
const tempSubscriber: WebSocketSubscriber = { ...subscriber, id }
console.log('[WebSocket] 开始处理遗留消息订阅者ID:', id, '消息:', queuedMessage.message)
try {
handleMessageDispatch(queuedMessage.message, tempSubscriber)
console.log('[WebSocket] 遗留消息处理完成订阅者ID:', id)
} catch (error) {
console.error('[WebSocket] 处理遗留消息时出错订阅者ID:', id, '错误:', error)
}
// 从队列中移除已处理的消息
messageQueue.delete(id)
setMessageQueue(messageQueue)
console.log('[WebSocket] 已从队列中移除消息,剩余队列大小:', messageQueue.size)
} else {
console.log('[WebSocket] 未在队列中找到ID为', id, '的消息')
}
// 清理过期消息超过1分钟的消息
const now = Date.now()
let cleanedCount = 0
messageQueue.forEach((queued, msgId) => {
if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒
console.log('[WebSocket] 清理过期消息:', msgId, '消息内容:', queued.message, '时间戳:', queued.timestamp)
messageQueue.delete(msgId)
cleanedCount++
}
})
if (cleanedCount > 0) {
console.log('[WebSocket] 共清理过期消息数量:', cleanedCount)
setMessageQueue(messageQueue)
}
}
// 新增函数:处理消息分发
const handleMessageDispatch = (raw: WebSocketBaseMessage, sub: WebSocketSubscriber) => {
const msgType = raw.type
if (msgType === 'Signal') return
if (msgType === 'Progress') return sub.onProgress?.(raw.data as ProgressMessage)
if (msgType === 'Result') return sub.onResult?.(raw.data as ResultMessage)
if (msgType === 'Update') {
// 处理Update类型消息当作Progress处理
console.log('[WebSocket Debug] 分发Update消息:', raw)
return sub.onProgress?.(raw as any)
}
if (msgType === 'Info') {
// 处理Info类型消息当作Notify处理
console.log('[WebSocket Debug] 分发Info消息:', raw)
return sub.onNotify?.(raw as any)
}
if (msgType === 'Message') {
// 处理Message类型消息当作Notify处理
console.log('[WebSocket Debug] 分发Message消息:', raw)
return sub.onNotify?.(raw as any)
}
if (msgType === 'Error') {
sub.onError?.(raw.data as ErrorMessage)
if (!sub.onError && raw.data && (raw.data as ErrorMessage).msg) {
message.error((raw.data as ErrorMessage).msg)
}
return
}
if (msgType === 'Notify') {
sub.onNotify?.(raw.data as NotifyMessage)
if (raw.data && (raw.data as NotifyMessage).title) {
notification.info({
message: (raw.data as NotifyMessage).title,
description: (raw.data as NotifyMessage).content,
})
}
return
}
// 其他类型可扩展
console.log('[WebSocket Debug] 未处理的消息类型:', msgType, raw)
}