diff --git a/frontend/src/composables/useWebSocket.ts b/frontend/src/composables/useWebSocket.ts index fc1389d..b5cf498 100644 --- a/frontend/src/composables/useWebSocket.ts +++ b/frontend/src/composables/useWebSocket.ts @@ -18,33 +18,13 @@ export interface WebSocketBaseMessage { data?: any } -export interface ProgressMessage { - percent?: number - status?: string - msg?: string -} - -export interface ResultMessage { - success?: boolean - result?: any -} - -export interface ErrorMessage { - msg?: string - code?: number -} - -export interface NotifyMessage { - title?: string - content?: string -} +// 删除了冗余的类型定义: +// ProgressMessage、ResultMessage、ErrorMessage、NotifyMessage +// 因为现在WebSocket消息处理统一使用onMessage回调函数,不再需要这些特定的类型 export interface WebSocketSubscriber { id: string - onProgress?: (data: ProgressMessage) => void - onResult?: (data: ResultMessage) => void - onError?: (err: ErrorMessage) => void - onNotify?: (n: NotifyMessage) => void + onMessage?: (message: WebSocketBaseMessage) => void } // 后端状态类型 @@ -318,74 +298,6 @@ const handleMessage = (raw: WebSocketBaseMessage) => { fullMessage: raw }) - // 优先处理Signal类型的ping-pong消息,不受id限制 - if (msgType === 'Signal') { - // 处理心跳响应 - if (raw.data && raw.data.Pong) { - global.lastPingTime = 0 // 重置ping时间,表示收到了响应 - return - } - - // 处理后端发送的Ping,回复Pong - if (raw.data && raw.data.Ping) { - const ws = global.wsRef - if (ws && ws.readyState === WebSocket.OPEN) { - try { - ws.send( - JSON.stringify({ - type: 'Signal', - data: { Pong: raw.data.Ping, connectionId: global.connectionId }, - }) - ) - } catch (e) { - // Pong发送失败,静默处理 - } - } - return - } - } - - const dispatch = (sub: WebSocketSubscriber) => { - if (msgType === 'Signal') return - - if (msgType === 'Progress') return sub.onProgress?.(raw.data as ProgressMessage) - if (msgType === 'Result') return sub.onResult?.(raw.data as ResultMessage) - if (msgType === 'Update') { - // 处理Update类型消息,当作Progress处理 - console.log('[WebSocket Debug] 收到Update消息:', raw) - return sub.onProgress?.(raw as any) - } - if (msgType === 'Info') { - // 处理Info类型消息,当作Notify处理 - console.log('[WebSocket Debug] 收到Info消息:', raw) - return sub.onNotify?.(raw as any) - } - if (msgType === 'Message') { - // 处理Message类型消息,当作Notify处理 - console.log('[WebSocket Debug] 收到Message消息:', raw) - return sub.onNotify?.(raw as any) - } - if (msgType === 'Error') { - sub.onError?.(raw.data as ErrorMessage) - if (!sub.onError && raw.data && (raw.data as ErrorMessage).msg) { - message.error((raw.data as ErrorMessage).msg) - } - return - } - if (msgType === 'Notify') { - sub.onNotify?.(raw.data as NotifyMessage) - if (raw.data && (raw.data as NotifyMessage).title) { - notification.info({ - message: (raw.data as NotifyMessage).title, - description: (raw.data as NotifyMessage).content, - }) - } - return - } - // 其他类型可扩展 - console.log('[WebSocket Debug] 未处理的消息类型:', msgType, raw) - } - if (id) { const sub = global.subscribers.value.get(id) console.log('[WebSocket Debug] 查找订阅者:', { @@ -395,30 +307,37 @@ const handleMessage = (raw: WebSocketBaseMessage) => { allSubscriberIds: Array.from(global.subscribers.value.keys()) }) if (sub) { - console.log('[WebSocket Debug] 分发消息给订阅者:', id) - dispatch(sub) + // 有订阅者,直接分发消息 + console.log('[WebSocket Debug] 找到订阅者,直接分发消息') + handleMessageDispatch(raw, sub) } else { - // 将没有订阅者的消息暂存到消息队列中 - console.warn('[WebSocket Debug] 未找到对应的订阅者,将消息暂存到队列中:', id) - const messageQueue = getMessageQueue() - messageQueue.set(id, { - message: raw, - timestamp: Date.now() - }) - setMessageQueue(messageQueue) + // 没有订阅者,将消息暂存到队列中 + console.log(`[WebSocket Debug] 没有找到ID为${id}的订阅者,将消息暂存到队列`) + const currentMessageQueue = getMessageQueue() + + // 对于Map类型的消息队列,直接存储或更新 + currentMessageQueue.set(id, { message: raw, timestamp: Date.now() }) + console.log(`[WebSocket Debug] 添加新消息到队列,ID: ${id}, Type: ${msgType}`) + // 清理过期消息(超过1分钟的消息) const now = Date.now() - messageQueue.forEach((queued, msgId) => { - if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒 - console.log('[WebSocket] 清理过期消息:', msgId) - messageQueue.delete(msgId) + let deletedCount = 0 + currentMessageQueue.forEach((value, key) => { + if (now - (value.timestamp || 0) > 60000) { + currentMessageQueue.delete(key) + deletedCount++ } }) - setMessageQueue(messageQueue) + + if (deletedCount > 0) { + console.log(`[WebSocket Debug] 清理了${deletedCount}条过期消息`) + } + + // 更新消息队列 + setMessageQueue(currentMessageQueue) } } - // 移除无id消息的处理逻辑,因为后端不会发送这类消息 } // 后端启动后建立连接的公开函数 @@ -488,8 +407,32 @@ const createGlobalWebSocket = (): WebSocket => { // 自动订阅ID为"Main"的消息,用于处理ping-pong等系统消息 _subscribe("Main", { - onNotify: () => { - // Main ID的消息通常不包含需要处理的Notify内容,但保留处理函数以确保订阅完整性 + onMessage: (message: WebSocketBaseMessage) => { + // 处理系统级消息(如ping-pong) + if (message && message.type === 'Signal' && message.data) { + // 处理心跳响应 + if (message.data.Pong) { + global.lastPingTime = 0 // 重置ping时间,表示收到了响应 + return + } + + // 处理后端发送的Ping,回复Pong + if (message.data.Ping) { + if (ws && ws.readyState === WebSocket.OPEN) { + try { + ws.send( + JSON.stringify({ + type: 'Signal', + data: { Pong: message.data.Ping, connectionId: global.connectionId }, + }) + ) + } catch (e) { + // Pong发送失败,静默处理 + } + } + return + } + } } }); } @@ -775,43 +718,17 @@ export const _subscribe = (id: string, subscriber: Omit { const msgType = raw.type - if (msgType === 'Signal') return + console.log('[WebSocket] 分发消息类型:', msgType, '消息内容:', raw) - if (msgType === 'Progress') return sub.onProgress?.(raw.data as ProgressMessage) - if (msgType === 'Result') return sub.onResult?.(raw.data as ResultMessage) - if (msgType === 'Update') { - // 处理Update类型消息,当作Progress处理 - console.log('[WebSocket Debug] 分发Update消息:', raw) - return sub.onProgress?.(raw as any) - } - if (msgType === 'Info') { - // 处理Info类型消息,当作Notify处理 - console.log('[WebSocket Debug] 分发Info消息:', raw) - return sub.onNotify?.(raw as any) - } - if (msgType === 'Message') { - // 处理Message类型消息,当作Notify处理 - console.log('[WebSocket Debug] 分发Message消息:', raw) - return sub.onNotify?.(raw as any) - } - if (msgType === 'Error') { - sub.onError?.(raw.data as ErrorMessage) - if (!sub.onError && raw.data && (raw.data as ErrorMessage).msg) { - message.error((raw.data as ErrorMessage).msg) - } - return - } - if (msgType === 'Notify') { - sub.onNotify?.(raw.data as NotifyMessage) - if (raw.data && (raw.data as NotifyMessage).title) { - notification.info({ - message: (raw.data as NotifyMessage).title, - description: (raw.data as NotifyMessage).content, - }) - } - return + // 如果订阅者定义了 onMessage 回调,则优先使用统一的处理函数 + if (sub.onMessage) { + return sub.onMessage(raw) } - // 其他类型可扩展 - console.log('[WebSocket Debug] 未处理的消息类型:', msgType, raw) + // 如果没有 onMessage 处理函数,则记录错误(理论上不应该出现) + console.error('[WebSocket] 错误:订阅者没有定义onMessage处理函数', { + subscriberId: sub.id, + messageType: msgType, + messageContent: raw + }) } diff --git a/frontend/src/views/scheduler/useSchedulerLogic.ts b/frontend/src/views/scheduler/useSchedulerLogic.ts index ab3b711..8b828e3 100644 --- a/frontend/src/views/scheduler/useSchedulerLogic.ts +++ b/frontend/src/views/scheduler/useSchedulerLogic.ts @@ -14,53 +14,34 @@ import { type TaskMessage, } from './schedulerConstants' -// 本地存储键名 -const SCHEDULER_STORAGE_KEY = 'scheduler-tabs-state' +// 电源操作状态仍然保存到localStorage中,以便重启后保持用户设置 const SCHEDULER_POWER_ACTION_KEY = 'scheduler-power-action' -// 从本地存储加载调度台状态 +// 使用内存变量存储调度台状态,而不是localStorage +let schedulerTabsMemory: SchedulerTab[] = [] + +// 从内存加载调度台状态 const loadTabsFromStorage = (): SchedulerTab[] => { - try { - const stored = localStorage.getItem(SCHEDULER_STORAGE_KEY) - if (stored) { - const parsed = JSON.parse(stored) - // 确保运行中的任务状态正确重置 - return parsed.map((tab: any) => ({ - ...tab, - // 重置WebSocket相关状态 + // 如果内存中没有状态,则初始化默认状态 + if (schedulerTabsMemory.length === 0) { + schedulerTabsMemory = [ + { + key: 'main', + title: '主调度台', + closable: false, + status: '新建', + selectedTaskId: null, + selectedMode: TaskCreateIn.mode.AutoMode, websocketId: null, - status: tab.status === '运行' ? '结束' : tab.status, - // 确保数组存在 - taskQueue: Array.isArray(tab.taskQueue) ? tab.taskQueue : [], - userQueue: Array.isArray(tab.userQueue) ? tab.userQueue : [], - logs: Array.isArray(tab.logs) ? tab.logs : [], - // 确保其他属性存在 - selectedTaskId: tab.selectedTaskId || null, - selectedMode: tab.selectedMode || TaskCreateIn.mode.AutoMode, - isLogAtBottom: typeof tab.isLogAtBottom === 'boolean' ? tab.isLogAtBottom : true, - lastLogContent: tab.lastLogContent || '', - })) - } - } catch (e) { - console.error('Failed to load scheduler tabs from storage:', e) + taskQueue: [], + userQueue: [], + logs: [], + isLogAtBottom: true, + lastLogContent: '', + }, + ] } - // 默认返回主调度台 - return [ - { - key: 'main', - title: '主调度台', - closable: false, - status: '新建', - selectedTaskId: null, - selectedMode: TaskCreateIn.mode.AutoMode, - websocketId: null, - taskQueue: [], - userQueue: [], - logs: [], - isLogAtBottom: true, - lastLogContent: '', - }, - ] + return schedulerTabsMemory } // 从本地存储加载电源操作状态 @@ -76,20 +57,10 @@ const loadPowerActionFromStorage = (): PowerIn.signal => { return PowerIn.signal.NO_ACTION } -// 保存调度台状态到本地存储 +// 保存调度台状态到内存 const saveTabsToStorage = (tabs: SchedulerTab[]) => { - try { - // 保存前清理运行时状态 - const tabsToSave = tabs.map(tab => ({ - ...tab, - // 清理运行时属性 - websocketId: null, - status: tab.status === '运行' ? '结束' : tab.status, - })) - localStorage.setItem(SCHEDULER_STORAGE_KEY, JSON.stringify(tabsToSave)) - } catch (e) { - console.error('Failed to save scheduler tabs to storage:', e) - } + // 保存到内存变量而不是localStorage + schedulerTabsMemory = tabs } // 保存电源操作状态到本地存储 @@ -269,26 +240,13 @@ export function useSchedulerLogic() { try { await Service.stopTaskApiDispatchStopPost({ taskId: tab.websocketId }) - if (tab.websocketId) { - ws.unsubscribe(tab.websocketId) - } - - tab.status = '结束' - tab.websocketId = null - - message.success('任务已停止') - checkAllTasksCompleted() + // 不再取消订阅,保持WebSocket连接以便接收结束信号 + // 只需发送停止请求,等待后端通过WebSocket发送结束信号 + message.success('已发送停止任务请求,等待任务完成确认') saveTabsToStorage(schedulerTabs.value) } catch (error) { console.error('停止任务失败:', error) message.error('停止任务失败') - - // 即使 API 调用失败也要清理本地状态 - if (tab.websocketId) { - ws.unsubscribe(tab.websocketId) - tab.status = '结束' - tab.websocketId = null - } saveTabsToStorage(schedulerTabs.value) } } @@ -298,12 +256,7 @@ export function useSchedulerLogic() { if (!tab.websocketId) return ws.subscribe(tab.websocketId, { - onProgress: data => - handleWebSocketMessage(tab, { ...data, type: 'Update', id: tab.websocketId }), - onResult: data => - handleWebSocketMessage(tab, { ...data, type: 'Result', id: tab.websocketId }), - onError: data => handleWebSocketMessage(tab, { ...data, type: 'Error', id: tab.websocketId }), - onNotify: data => handleWebSocketMessage(tab, { ...data, type: 'Info', id: tab.websocketId }), + onMessage: (message) => handleWebSocketMessage(tab, message) }) } @@ -312,24 +265,55 @@ export function useSchedulerLogic() { const { id, type, data } = wsMessage - // 只处理与当前标签页相关的消息,除非是全局信号 - if (id && id !== tab.websocketId && type !== 'Signal') return + console.log('[Scheduler] 收到WebSocket消息:', { id, type, data, tabId: tab.websocketId }) + + // 只处理与当前标签页相关的消息 + if (id && id !== tab.websocketId) { + console.log('[Scheduler] 消息ID不匹配,忽略消息:', { messageId: id, tabId: tab.websocketId }) + return + } + + // 处理通过Info类型包装的Signal消息 + if (type === 'Info' && data && data.title === 'Signal' && data.data) { + console.log('[Scheduler] 处理通过Info包装的Signal消息:', data.data) + handleSignalMessage(tab, data.data) + return + } switch (type) { case 'Update': + console.log('[Scheduler] 处理Update消息:', data) handleUpdateMessage(tab, data) break case 'Info': + console.log('[Scheduler] 处理Info消息:', data) handleInfoMessage(tab, data) break case 'Message': + console.log('[Scheduler] 处理Message消息:', data) handleMessageDialog(tab, data) break case 'Signal': + console.log('[Scheduler] 处理Signal消息:', data) handleSignalMessage(tab, data) break default: - console.warn('未知的WebSocket消息类型:', type) + console.warn('[Scheduler] 未知的WebSocket消息类型:', type, wsMessage) + // 即使是未知类型的消息,也尝试处理其中可能包含的有效数据 + if (data) { + // 尝试处理可能的任务队列更新 + if (data.task_dict || data.task_list || data.user_list) { + handleUpdateMessage(tab, data) + } + // 尝试处理可能的日志信息 + if (data.log) { + handleUpdateMessage(tab, data) + } + // 尝试处理可能的错误/警告/信息 + if (data.Error || data.Warning || data.Info) { + handleInfoMessage(tab, data) + } + } } } @@ -419,8 +403,23 @@ export function useSchedulerLogic() { } const handleSignalMessage = (tab: SchedulerTab, data: any) => { - if (data.Accomplish) { + console.log('[Scheduler] 处理Signal消息:', data) + + // 只有收到WebSocket的Accomplish信号才将任务标记为结束状态 + // 这确保了调度台状态与实际任务执行状态严格同步 + if (data && data.Accomplish) { + console.log('[Scheduler] 收到Accomplish信号,设置任务状态为结束') + // 使用Vue的响应式更新方式 tab.status = '结束' + console.log('[Scheduler] 已更新tab.status为结束,当前tab状态:', tab.status) + + // 强制触发Vue响应式更新 + const tabIndex = schedulerTabs.value.findIndex(t => t.key === tab.key) + if (tabIndex !== -1) { + const updatedTab: SchedulerTab = { ...tab } + schedulerTabs.value.splice(tabIndex, 1, updatedTab) + console.log('[Scheduler] 已强制更新schedulerTabs,当前tabs状态:', schedulerTabs.value) + } if (tab.websocketId) { ws.unsubscribe(tab.websocketId) @@ -430,9 +429,12 @@ export function useSchedulerLogic() { notification.success({ message: '任务完成', description: data.Accomplish }) checkAllTasksCompleted() saveTabsToStorage(schedulerTabs.value) + + // 触发Vue的响应式更新 + schedulerTabs.value = [...schedulerTabs.value] } - if (data.power && data.power !== 'NoAction') { + if (data && data.power && data.power !== 'NoAction') { powerAction.value = data.power as PowerIn.signal savePowerActionToStorage(powerAction.value) startPowerCountdown() @@ -607,4 +609,4 @@ export function useSchedulerLogic() { loadTaskOptions, cleanup, } -} +} \ No newline at end of file