fix: ws消息允许接收多条遗留消息

This commit is contained in:
DLmaster361
2025-09-24 01:01:48 +08:00
parent 0f1cbfc82d
commit 97ec518e43

View File

@@ -35,8 +35,8 @@ interface GlobalWSStorage {
wsRef: WebSocket | null wsRef: WebSocket | null
status: Ref<WebSocketStatus> status: Ref<WebSocketStatus>
subscribers: Ref<Map<string, WebSocketSubscriber>> subscribers: Ref<Map<string, WebSocketSubscriber>>
// 修改消息队列结构使用Map存储消息和时间戳 // 修改消息队列结构使用Map存储消息数组和时间戳
messageQueue: Ref<Map<string, { message: WebSocketBaseMessage; timestamp: number }>> messageQueue: Ref<Map<string, Array<{ message: WebSocketBaseMessage; timestamp: number }>>>
heartbeatTimer?: number heartbeatTimer?: number
isConnecting: boolean isConnecting: boolean
lastPingTime: number lastPingTime: number
@@ -66,8 +66,8 @@ const initGlobalStorage = (): GlobalWSStorage => {
wsRef: null, wsRef: null,
status: ref<WebSocketStatus>('已断开'), status: ref<WebSocketStatus>('已断开'),
subscribers: ref(new Map<string, WebSocketSubscriber>()), subscribers: ref(new Map<string, WebSocketSubscriber>()),
// 初始化消息队列 // 初始化消息队列使用数组存储每个ID的多条消息
messageQueue: ref(new Map<string, { message: WebSocketBaseMessage; timestamp: number }>()), messageQueue: ref(new Map<string, Array<{ message: WebSocketBaseMessage; timestamp: number }>>()),
heartbeatTimer: undefined, heartbeatTimer: undefined,
isConnecting: false, isConnecting: false,
lastPingTime: 0, lastPingTime: 0,
@@ -274,14 +274,14 @@ const startGlobalHeartbeat = (ws: WebSocket) => {
} }
// 获取消息队列 // 获取消息队列
const getMessageQueue = (): Map<string, { message: WebSocketBaseMessage; timestamp: number }> => { const getMessageQueue = (): Map<string, Array<{ message: WebSocketBaseMessage; timestamp: number }>> => {
const global = getGlobalStorage() const global = getGlobalStorage()
return global.messageQueue.value return global.messageQueue.value
} }
// 设置消息队列 // 设置消息队列
const setMessageQueue = ( const setMessageQueue = (
queue: Map<string, { message: WebSocketBaseMessage; timestamp: number }> queue: Map<string, Array<{ message: WebSocketBaseMessage; timestamp: number }>>
): void => { ): void => {
const global = getGlobalStorage() const global = getGlobalStorage()
global.messageQueue.value = queue global.messageQueue.value = queue
@@ -318,22 +318,31 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
const currentMessageQueue = getMessageQueue() const currentMessageQueue = getMessageQueue()
// 对于Map类型的消息队列直接存储或更新 // 获取该ID现有的消息数组如果不存在则创建新的空数组
currentMessageQueue.set(id, { message: raw, timestamp: Date.now() }) const existingMessages = currentMessageQueue.get(id) || []
console.log(`[WebSocket Debug] 添加新消息到队列ID: ${id}, Type: ${msgType}`) // 将新消息添加到数组末尾
existingMessages.push({ message: raw, timestamp: Date.now() })
// 更新该ID的消息数组
currentMessageQueue.set(id, existingMessages)
console.log(`[WebSocket Debug] 添加新消息到队列ID: ${id}, Type: ${msgType}, 当前消息数量: ${existingMessages.length}`)
// 清理过期消息超过1分钟的消息 // 清理过期消息超过1分钟的消息
const now = Date.now() const now = Date.now()
let deletedCount = 0 let deletedCount = 0
currentMessageQueue.forEach((value, key) => { currentMessageQueue.forEach((messages, key) => {
if (now - (value.timestamp || 0) > 60000) { // 过滤掉过期的消息
currentMessageQueue.delete(key) const filteredMessages = messages.filter(msg => now - (msg.timestamp || 0) <= 60000)
deletedCount++ const removedCount = messages.length - filteredMessages.length
if (removedCount > 0) {
deletedCount += removedCount
console.log(`[WebSocket Debug] 清理了${removedCount}条ID为${key}的过期消息`)
} }
// 更新过滤后的消息数组
currentMessageQueue.set(key, filteredMessages)
}) })
if (deletedCount > 0) { if (deletedCount > 0) {
console.log(`[WebSocket Debug] 清理了${deletedCount}条过期消息`) console.log(`[WebSocket Debug] 清理了${deletedCount}条过期消息`)
} }
// 更新消息队列 // 更新消息队列
@@ -685,18 +694,23 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
) )
// 检查特定ID的消息 // 检查特定ID的消息
const queuedMessage = messageQueue.get(id) const queuedMessages = messageQueue.get(id)
if (queuedMessage) { if (queuedMessages && queuedMessages.length > 0) {
console.log('[WebSocket] 发现队列中的消息,立即分发给新订阅者:', id, '消息内容:', queuedMessage) console.log('[WebSocket] 发现队列中的消息,立即按顺序分发给新订阅者:', id, '消息数量:', queuedMessages.length)
// 创建临时订阅者对象用于分发消息 // 创建临时订阅者对象用于分发消息
const tempSubscriber: WebSocketSubscriber = { ...subscriber, id } const tempSubscriber: WebSocketSubscriber = { ...subscriber, id }
console.log('[WebSocket] 开始处理遗留消息订阅者ID:', id, '消息:', queuedMessage.message)
try { // 按顺序处理所有遗留消息
handleMessageDispatch(queuedMessage.message, tempSubscriber) queuedMessages.forEach((queuedMessage, index) => {
console.log('[WebSocket] 遗留消息处理完成订阅者ID:', id) console.log('[WebSocket] 开始处理遗留消息订阅者ID:', id, '消息索引:', index, '消息:', queuedMessage.message)
} catch (error) { try {
console.error('[WebSocket] 处理遗留消息时出错订阅者ID:', id, '错误:', error) handleMessageDispatch(queuedMessage.message, tempSubscriber)
} console.log('[WebSocket] 遗留消息处理完成订阅者ID:', id, '消息索引:', index)
} catch (error) {
console.error('[WebSocket] 处理遗留消息时出错订阅者ID:', id, '消息索引:', index, '错误:', error)
}
})
// 从队列中移除已处理的消息 // 从队列中移除已处理的消息
messageQueue.delete(id) messageQueue.delete(id)
setMessageQueue(messageQueue) setMessageQueue(messageQueue)
@@ -708,21 +722,18 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
// 清理过期消息超过1分钟的消息 // 清理过期消息超过1分钟的消息
const now = Date.now() const now = Date.now()
let cleanedCount = 0 let cleanedCount = 0
messageQueue.forEach((queued, msgId) => { messageQueue.forEach((queuedMessages, msgId) => {
if (now - queued.timestamp > 60000) { // 过滤掉过期的消息
// 1分钟 = 60000毫秒 const filteredMessages = queuedMessages.filter(msg => now - msg.timestamp <= 60000)
console.log( const removedCount = queuedMessages.length - filteredMessages.length
'[WebSocket] 清理过期消息:', if (removedCount > 0) {
msgId, cleanedCount += removedCount
'消息内容:', console.log('[WebSocket] 清理过期消息:', msgId, '清理数量:', removedCount)
queued.message, // 更新过滤后的消息数组
'时间戳:', messageQueue.set(msgId, filteredMessages)
queued.timestamp
)
messageQueue.delete(msgId)
cleanedCount++
} }
}) })
if (cleanedCount > 0) { if (cleanedCount > 0) {
console.log('[WebSocket] 共清理过期消息数量:', cleanedCount) console.log('[WebSocket] 共清理过期消息数量:', cleanedCount)
setMessageQueue(messageQueue) setMessageQueue(messageQueue)