refactor: 重构WebSocket消息处理逻辑,使用全局处理函数替代订阅方式

This commit is contained in:
2025-09-24 19:15:19 +08:00
parent cf4c8cfe17
commit bd58a512c9
2 changed files with 82 additions and 60 deletions

View File

@@ -93,7 +93,7 @@ const initGlobalStorage = (): GlobalWSStorage => {
// 获取全局存储 // 获取全局存储
const getGlobalStorage = (): GlobalWSStorage => { const getGlobalStorage = (): GlobalWSStorage => {
if (!(window as any)[WS_STORAGE_KEY]) { if (!(window as any)[WS_STORAGE_KEY]) {
;(window as any)[WS_STORAGE_KEY] = initGlobalStorage() ; (window as any)[WS_STORAGE_KEY] = initGlobalStorage()
} }
return (window as any)[WS_STORAGE_KEY] as GlobalWSStorage return (window as any)[WS_STORAGE_KEY] as GlobalWSStorage
@@ -173,7 +173,7 @@ const handleBackendFailure = async () => {
okText: '重启应用', okText: '重启应用',
onOk: () => { onOk: () => {
if ((window.electronAPI as any)?.windowClose) { if ((window.electronAPI as any)?.windowClose) {
;(window.electronAPI as any).windowClose() ; (window.electronAPI as any).windowClose()
} else { } else {
window.location.reload() window.location.reload()
} }
@@ -416,36 +416,8 @@ const createGlobalWebSocket = (): WebSocket => {
/* ignore */ /* ignore */
} }
// 自动订阅ID为"Main"的消息用于处理ping-pong等系统消息 // 初始化全局订阅TaskManager和Main
_subscribe('Main', { initializeGlobalSubscriptions()
onMessage: (message: WebSocketBaseMessage) => {
// 处理系统级消息如ping-pong
if (message && message.type === 'Signal' && message.data) {
// 处理心跳响应
if (message.data.Pong) {
global.lastPingTime = 0 // 重置ping时间表示收到了响应
return
}
// 处理后端发送的Ping回复Pong
if (message.data.Ping) {
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(
JSON.stringify({
type: 'Signal',
data: { Pong: message.data.Ping, connectionId: global.connectionId },
})
)
} catch (e) {
// Pong发送失败静默处理
}
}
return
}
}
},
})
} }
ws.onmessage = ev => { ws.onmessage = ev => {
@@ -604,6 +576,71 @@ window.addEventListener('beforeunload', () => {
// 保持连接 // 保持连接
}) })
// 全局订阅处理函数 - 供调度台逻辑调用
let globalTaskManagerHandler: ((message: any) => void) | null = null
let globalMainMessageHandler: ((message: any) => void) | null = null
// 设置TaskManager消息处理函数
export const setTaskManagerHandler = (handler: (message: any) => void) => {
globalTaskManagerHandler = handler
}
// 设置Main消息处理函数
export const setMainMessageHandler = (handler: (message: any) => void) => {
globalMainMessageHandler = handler
}
// 初始化全局订阅
const initializeGlobalSubscriptions = () => {
// 订阅TaskManager消息
_subscribe('TaskManager', {
onMessage: (message) => {
if (globalTaskManagerHandler) {
globalTaskManagerHandler(message)
}
}
})
// 订阅Main消息
_subscribe('Main', {
onMessage: (message) => {
// 处理系统级消息如ping-pong
if (message && message.type === 'Signal' && message.data) {
// 处理心跳响应
if (message.data.Pong) {
const global = getGlobalStorage()
global.lastPingTime = 0 // 重置ping时间表示收到了响应
return
}
// 处理后端发送的Ping回复Pong
if (message.data.Ping) {
const global = getGlobalStorage()
const ws = global.wsRef
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(
JSON.stringify({
type: 'Signal',
data: { Pong: message.data.Ping, connectionId: global.connectionId },
})
)
} catch (e) {
// Pong发送失败静默处理
}
}
return
}
}
// 调用外部设置的Main消息处理函数
if (globalMainMessageHandler) {
globalMainMessageHandler(message)
}
}
})
}
// 主要 Hook 函数 // 主要 Hook 函数
export function useWebSocket() { export function useWebSocket() {
const global = getGlobalStorage() const global = getGlobalStorage()
@@ -699,7 +736,7 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
console.log('[WebSocket] 发现队列中的消息,立即按顺序分发给新订阅者:', id, '消息数量:', queuedMessages.length) console.log('[WebSocket] 发现队列中的消息,立即按顺序分发给新订阅者:', id, '消息数量:', queuedMessages.length)
// 创建临时订阅者对象用于分发消息 // 创建临时订阅者对象用于分发消息
const tempSubscriber: WebSocketSubscriber = { ...subscriber, id } const tempSubscriber: WebSocketSubscriber = { ...subscriber, id }
// 按顺序处理所有遗留消息 // 按顺序处理所有遗留消息
queuedMessages.forEach((queuedMessage, index) => { queuedMessages.forEach((queuedMessage, index) => {
console.log('[WebSocket] 开始处理遗留消息订阅者ID:', id, '消息索引:', index, '消息:', queuedMessage.message) console.log('[WebSocket] 开始处理遗留消息订阅者ID:', id, '消息索引:', index, '消息:', queuedMessage.message)
@@ -710,7 +747,7 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
console.error('[WebSocket] 处理遗留消息时出错订阅者ID:', id, '消息索引:', index, '错误:', error) console.error('[WebSocket] 处理遗留消息时出错订阅者ID:', id, '消息索引:', index, '错误:', error)
} }
}) })
// 从队列中移除已处理的消息 // 从队列中移除已处理的消息
messageQueue.delete(id) messageQueue.delete(id)
setMessageQueue(messageQueue) setMessageQueue(messageQueue)
@@ -733,7 +770,7 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
messageQueue.set(msgId, filteredMessages) messageQueue.set(msgId, filteredMessages)
} }
}) })
if (cleanedCount > 0) { if (cleanedCount > 0) {
console.log('[WebSocket] 共清理过期消息数量:', cleanedCount) console.log('[WebSocket] 共清理过期消息数量:', cleanedCount)
setMessageQueue(messageQueue) setMessageQueue(messageQueue)

View File

@@ -3,7 +3,7 @@ import { message, Modal, notification } from 'ant-design-vue'
import { Service } from '@/api/services/Service' import { Service } from '@/api/services/Service'
import { TaskCreateIn } from '@/api/models/TaskCreateIn' import { TaskCreateIn } from '@/api/models/TaskCreateIn'
import { PowerIn } from '@/api/models/PowerIn' import { PowerIn } from '@/api/models/PowerIn'
import { useWebSocket } from '@/composables/useWebSocket' import { useWebSocket, setTaskManagerHandler, setMainMessageHandler } from '@/composables/useWebSocket'
import type { ComboBoxItem } from '@/api/models/ComboBoxItem' import type { ComboBoxItem } from '@/api/models/ComboBoxItem'
import type { QueueItem, Script } from './schedulerConstants' import type { QueueItem, Script } from './schedulerConstants'
import { import {
@@ -110,13 +110,7 @@ export function useSchedulerLogic() {
// WebSocket 实例 // WebSocket 实例
const ws = useWebSocket() const ws = useWebSocket()
// 订阅TaskManager消息处理自动创建的任务 // TaskManager消息处理函数供全局WebSocket调用
const subscribeToTaskManager = () => {
ws.subscribe('TaskManager', {
onMessage: (message) => handleTaskManagerMessage(message)
})
}
const handleTaskManagerMessage = (wsMessage: any) => { const handleTaskManagerMessage = (wsMessage: any) => {
if (!wsMessage || typeof wsMessage !== 'object') return if (!wsMessage || typeof wsMessage !== 'object') return
@@ -772,13 +766,10 @@ export function useSchedulerLogic() {
// 初始化函数 // 初始化函数
const initialize = () => { const initialize = () => {
// 订阅TaskManager消息 // 设置全局WebSocket的消息处理函数
subscribeToTaskManager() setTaskManagerHandler(handleTaskManagerMessage)
console.log('[Scheduler] 已订阅TaskManager消息') setMainMessageHandler(handleMainMessage)
console.log('[Scheduler] 已设置全局WebSocket消息处理函数')
// 订阅Main消息用于接收全局消息如电源操作倒计时
subscribeToMainMessages()
console.log('[Scheduler] 已订阅Main消息')
// 新增:为已有的“运行中”标签恢复 WebSocket 订阅,防止路由切换返回后不再更新 // 新增:为已有的“运行中”标签恢复 WebSocket 订阅,防止路由切换返回后不再更新
try { try {
@@ -793,13 +784,7 @@ export function useSchedulerLogic() {
} }
} }
// 订阅Main消息处理全局消息 // Main消息处理函数供全局WebSocket调用
const subscribeToMainMessages = () => {
ws.subscribe('Main', {
onMessage: (message) => handleMainMessage(message)
})
}
const handleMainMessage = (wsMessage: any) => { const handleMainMessage = (wsMessage: any) => {
if (!wsMessage || typeof wsMessage !== 'object') return if (!wsMessage || typeof wsMessage !== 'object') return
@@ -825,9 +810,9 @@ export function useSchedulerLogic() {
powerCountdownTimer = null powerCountdownTimer = null
} }
// 取消订阅TaskManager和Main // 清理全局WebSocket的消息处理函数
ws.unsubscribe('TaskManager') setTaskManagerHandler(() => {})
ws.unsubscribe('Main') setMainMessageHandler(() => {})
schedulerTabs.value.forEach(tab => { schedulerTabs.value.forEach(tab => {
if (tab.websocketId) { if (tab.websocketId) {