fix: 重新修正ws消息分发逻辑

This commit is contained in:
DLmaster361
2025-09-20 23:32:06 +08:00
parent 60e8ac0ce9
commit 5eab1b0986
2 changed files with 148 additions and 229 deletions

View File

@@ -18,33 +18,13 @@ export interface WebSocketBaseMessage {
data?: any
}
export interface ProgressMessage {
percent?: number
status?: string
msg?: string
}
export interface ResultMessage {
success?: boolean
result?: any
}
export interface ErrorMessage {
msg?: string
code?: number
}
export interface NotifyMessage {
title?: string
content?: string
}
// 删除了冗余的类型定义:
// ProgressMessage、ResultMessage、ErrorMessage、NotifyMessage
// 因为现在WebSocket消息处理统一使用onMessage回调函数不再需要这些特定的类型
export interface WebSocketSubscriber {
id: string
onProgress?: (data: ProgressMessage) => void
onResult?: (data: ResultMessage) => void
onError?: (err: ErrorMessage) => void
onNotify?: (n: NotifyMessage) => void
onMessage?: (message: WebSocketBaseMessage) => void
}
// 后端状态类型
@@ -318,74 +298,6 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
fullMessage: raw
})
// 优先处理Signal类型的ping-pong消息不受id限制
if (msgType === 'Signal') {
// 处理心跳响应
if (raw.data && raw.data.Pong) {
global.lastPingTime = 0 // 重置ping时间表示收到了响应
return
}
// 处理后端发送的Ping回复Pong
if (raw.data && raw.data.Ping) {
const ws = global.wsRef
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(
JSON.stringify({
type: 'Signal',
data: { Pong: raw.data.Ping, connectionId: global.connectionId },
})
)
} catch (e) {
// Pong发送失败静默处理
}
}
return
}
}
const dispatch = (sub: WebSocketSubscriber) => {
if (msgType === 'Signal') return
if (msgType === 'Progress') return sub.onProgress?.(raw.data as ProgressMessage)
if (msgType === 'Result') return sub.onResult?.(raw.data as ResultMessage)
if (msgType === 'Update') {
// 处理Update类型消息当作Progress处理
console.log('[WebSocket Debug] 收到Update消息:', raw)
return sub.onProgress?.(raw as any)
}
if (msgType === 'Info') {
// 处理Info类型消息当作Notify处理
console.log('[WebSocket Debug] 收到Info消息:', raw)
return sub.onNotify?.(raw as any)
}
if (msgType === 'Message') {
// 处理Message类型消息当作Notify处理
console.log('[WebSocket Debug] 收到Message消息:', raw)
return sub.onNotify?.(raw as any)
}
if (msgType === 'Error') {
sub.onError?.(raw.data as ErrorMessage)
if (!sub.onError && raw.data && (raw.data as ErrorMessage).msg) {
message.error((raw.data as ErrorMessage).msg)
}
return
}
if (msgType === 'Notify') {
sub.onNotify?.(raw.data as NotifyMessage)
if (raw.data && (raw.data as NotifyMessage).title) {
notification.info({
message: (raw.data as NotifyMessage).title,
description: (raw.data as NotifyMessage).content,
})
}
return
}
// 其他类型可扩展
console.log('[WebSocket Debug] 未处理的消息类型:', msgType, raw)
}
if (id) {
const sub = global.subscribers.value.get(id)
console.log('[WebSocket Debug] 查找订阅者:', {
@@ -395,30 +307,37 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
allSubscriberIds: Array.from(global.subscribers.value.keys())
})
if (sub) {
console.log('[WebSocket Debug] 分发消息给订阅者:', id)
dispatch(sub)
// 有订阅者,直接分发消息
console.log('[WebSocket Debug] 找到订阅者,直接分发消息')
handleMessageDispatch(raw, sub)
} else {
// 没有订阅者消息暂存到消息队列中
console.warn('[WebSocket Debug] 未找到对应的订阅者,将消息暂存到队列中:', id)
const messageQueue = getMessageQueue()
messageQueue.set(id, {
message: raw,
timestamp: Date.now()
})
setMessageQueue(messageQueue)
// 没有订阅者,将消息暂存到队列中
console.log(`[WebSocket Debug] 没有找到ID为${id}的订阅者,将消息暂存到队列`)
const currentMessageQueue = getMessageQueue()
// 对于Map类型的消息队列直接存储或更新
currentMessageQueue.set(id, { message: raw, timestamp: Date.now() })
console.log(`[WebSocket Debug] 添加新消息到队列ID: ${id}, Type: ${msgType}`)
// 清理过期消息超过1分钟的消息
const now = Date.now()
messageQueue.forEach((queued, msgId) => {
if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒
console.log('[WebSocket] 清理过期消息:', msgId)
messageQueue.delete(msgId)
let deletedCount = 0
currentMessageQueue.forEach((value, key) => {
if (now - (value.timestamp || 0) > 60000) {
currentMessageQueue.delete(key)
deletedCount++
}
})
setMessageQueue(messageQueue)
if (deletedCount > 0) {
console.log(`[WebSocket Debug] 清理了${deletedCount}条过期消息`)
}
// 更新消息队列
setMessageQueue(currentMessageQueue)
}
}
// 移除无id消息的处理逻辑因为后端不会发送这类消息
}
// 后端启动后建立连接的公开函数
@@ -488,8 +407,32 @@ const createGlobalWebSocket = (): WebSocket => {
// 自动订阅ID为"Main"的消息用于处理ping-pong等系统消息
_subscribe("Main", {
onNotify: () => {
// Main ID的消息通常不包含需要处理的Notify内容但保留处理函数以确保订阅完整性
onMessage: (message: WebSocketBaseMessage) => {
// 处理系统级消息如ping-pong
if (message && message.type === 'Signal' && message.data) {
// 处理心跳响应
if (message.data.Pong) {
global.lastPingTime = 0 // 重置ping时间表示收到了响应
return
}
// 处理后端发送的Ping回复Pong
if (message.data.Ping) {
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(
JSON.stringify({
type: 'Signal',
data: { Pong: message.data.Ping, connectionId: global.connectionId },
})
)
} catch (e) {
// Pong发送失败静默处理
}
}
return
}
}
}
});
}
@@ -775,43 +718,17 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
const handleMessageDispatch = (raw: WebSocketBaseMessage, sub: WebSocketSubscriber) => {
const msgType = raw.type
if (msgType === 'Signal') return
console.log('[WebSocket] 分发消息类型:', msgType, '消息内容:', raw)
if (msgType === 'Progress') return sub.onProgress?.(raw.data as ProgressMessage)
if (msgType === 'Result') return sub.onResult?.(raw.data as ResultMessage)
if (msgType === 'Update') {
// 处理Update类型消息当作Progress处理
console.log('[WebSocket Debug] 分发Update消息:', raw)
return sub.onProgress?.(raw as any)
}
if (msgType === 'Info') {
// 处理Info类型消息当作Notify处理
console.log('[WebSocket Debug] 分发Info消息:', raw)
return sub.onNotify?.(raw as any)
}
if (msgType === 'Message') {
// 处理Message类型消息当作Notify处理
console.log('[WebSocket Debug] 分发Message消息:', raw)
return sub.onNotify?.(raw as any)
}
if (msgType === 'Error') {
sub.onError?.(raw.data as ErrorMessage)
if (!sub.onError && raw.data && (raw.data as ErrorMessage).msg) {
message.error((raw.data as ErrorMessage).msg)
}
return
}
if (msgType === 'Notify') {
sub.onNotify?.(raw.data as NotifyMessage)
if (raw.data && (raw.data as NotifyMessage).title) {
notification.info({
message: (raw.data as NotifyMessage).title,
description: (raw.data as NotifyMessage).content,
})
}
return
// 如果订阅者定义了 onMessage 回调,则优先使用统一的处理函数
if (sub.onMessage) {
return sub.onMessage(raw)
}
// 其他类型可扩展
console.log('[WebSocket Debug] 未处理的消息类型:', msgType, raw)
// 如果没有 onMessage 处理函数,则记录错误(理论上不应该出现)
console.error('[WebSocket] 错误订阅者没有定义onMessage处理函数', {
subscriberId: sub.id,
messageType: msgType,
messageContent: raw
})
}

View File

@@ -14,53 +14,34 @@ import {
type TaskMessage,
} from './schedulerConstants'
// 本地存储键名
const SCHEDULER_STORAGE_KEY = 'scheduler-tabs-state'
// 电源操作状态仍然保存到localStorage中以便重启后保持用户设置
const SCHEDULER_POWER_ACTION_KEY = 'scheduler-power-action'
// 从本地存储加载调度台状态
// 使用内存变量存储调度台状态而不是localStorage
let schedulerTabsMemory: SchedulerTab[] = []
// 从内存加载调度台状态
const loadTabsFromStorage = (): SchedulerTab[] => {
try {
const stored = localStorage.getItem(SCHEDULER_STORAGE_KEY)
if (stored) {
const parsed = JSON.parse(stored)
// 确保运行中的任务状态正确重置
return parsed.map((tab: any) => ({
...tab,
// 重置WebSocket相关状态
// 如果内存中没有状态,则初始化默认状态
if (schedulerTabsMemory.length === 0) {
schedulerTabsMemory = [
{
key: 'main',
title: '主调度台',
closable: false,
status: '新建',
selectedTaskId: null,
selectedMode: TaskCreateIn.mode.AutoMode,
websocketId: null,
status: tab.status === '运行' ? '结束' : tab.status,
// 确保数组存在
taskQueue: Array.isArray(tab.taskQueue) ? tab.taskQueue : [],
userQueue: Array.isArray(tab.userQueue) ? tab.userQueue : [],
logs: Array.isArray(tab.logs) ? tab.logs : [],
// 确保其他属性存在
selectedTaskId: tab.selectedTaskId || null,
selectedMode: tab.selectedMode || TaskCreateIn.mode.AutoMode,
isLogAtBottom: typeof tab.isLogAtBottom === 'boolean' ? tab.isLogAtBottom : true,
lastLogContent: tab.lastLogContent || '',
}))
}
} catch (e) {
console.error('Failed to load scheduler tabs from storage:', e)
taskQueue: [],
userQueue: [],
logs: [],
isLogAtBottom: true,
lastLogContent: '',
},
]
}
// 默认返回主调度台
return [
{
key: 'main',
title: '主调度台',
closable: false,
status: '新建',
selectedTaskId: null,
selectedMode: TaskCreateIn.mode.AutoMode,
websocketId: null,
taskQueue: [],
userQueue: [],
logs: [],
isLogAtBottom: true,
lastLogContent: '',
},
]
return schedulerTabsMemory
}
// 从本地存储加载电源操作状态
@@ -76,20 +57,10 @@ const loadPowerActionFromStorage = (): PowerIn.signal => {
return PowerIn.signal.NO_ACTION
}
// 保存调度台状态到本地存储
// 保存调度台状态到内存
const saveTabsToStorage = (tabs: SchedulerTab[]) => {
try {
// 保存前清理运行时状态
const tabsToSave = tabs.map(tab => ({
...tab,
// 清理运行时属性
websocketId: null,
status: tab.status === '运行' ? '结束' : tab.status,
}))
localStorage.setItem(SCHEDULER_STORAGE_KEY, JSON.stringify(tabsToSave))
} catch (e) {
console.error('Failed to save scheduler tabs to storage:', e)
}
// 保存到内存变量而不是localStorage
schedulerTabsMemory = tabs
}
// 保存电源操作状态到本地存储
@@ -269,26 +240,13 @@ export function useSchedulerLogic() {
try {
await Service.stopTaskApiDispatchStopPost({ taskId: tab.websocketId })
if (tab.websocketId) {
ws.unsubscribe(tab.websocketId)
}
tab.status = '结束'
tab.websocketId = null
message.success('任务已停止')
checkAllTasksCompleted()
// 不再取消订阅保持WebSocket连接以便接收结束信号
// 只需发送停止请求等待后端通过WebSocket发送结束信号
message.success('已发送停止任务请求,等待任务完成确认')
saveTabsToStorage(schedulerTabs.value)
} catch (error) {
console.error('停止任务失败:', error)
message.error('停止任务失败')
// 即使 API 调用失败也要清理本地状态
if (tab.websocketId) {
ws.unsubscribe(tab.websocketId)
tab.status = '结束'
tab.websocketId = null
}
saveTabsToStorage(schedulerTabs.value)
}
}
@@ -298,12 +256,7 @@ export function useSchedulerLogic() {
if (!tab.websocketId) return
ws.subscribe(tab.websocketId, {
onProgress: data =>
handleWebSocketMessage(tab, { ...data, type: 'Update', id: tab.websocketId }),
onResult: data =>
handleWebSocketMessage(tab, { ...data, type: 'Result', id: tab.websocketId }),
onError: data => handleWebSocketMessage(tab, { ...data, type: 'Error', id: tab.websocketId }),
onNotify: data => handleWebSocketMessage(tab, { ...data, type: 'Info', id: tab.websocketId }),
onMessage: (message) => handleWebSocketMessage(tab, message)
})
}
@@ -312,24 +265,55 @@ export function useSchedulerLogic() {
const { id, type, data } = wsMessage
// 只处理与当前标签页相关的消息,除非是全局信号
if (id && id !== tab.websocketId && type !== 'Signal') return
console.log('[Scheduler] 收到WebSocket消息:', { id, type, data, tabId: tab.websocketId })
// 只处理与当前标签页相关的消息
if (id && id !== tab.websocketId) {
console.log('[Scheduler] 消息ID不匹配忽略消息:', { messageId: id, tabId: tab.websocketId })
return
}
// 处理通过Info类型包装的Signal消息
if (type === 'Info' && data && data.title === 'Signal' && data.data) {
console.log('[Scheduler] 处理通过Info包装的Signal消息:', data.data)
handleSignalMessage(tab, data.data)
return
}
switch (type) {
case 'Update':
console.log('[Scheduler] 处理Update消息:', data)
handleUpdateMessage(tab, data)
break
case 'Info':
console.log('[Scheduler] 处理Info消息:', data)
handleInfoMessage(tab, data)
break
case 'Message':
console.log('[Scheduler] 处理Message消息:', data)
handleMessageDialog(tab, data)
break
case 'Signal':
console.log('[Scheduler] 处理Signal消息:', data)
handleSignalMessage(tab, data)
break
default:
console.warn('未知的WebSocket消息类型:', type)
console.warn('[Scheduler] 未知的WebSocket消息类型:', type, wsMessage)
// 即使是未知类型的消息,也尝试处理其中可能包含的有效数据
if (data) {
// 尝试处理可能的任务队列更新
if (data.task_dict || data.task_list || data.user_list) {
handleUpdateMessage(tab, data)
}
// 尝试处理可能的日志信息
if (data.log) {
handleUpdateMessage(tab, data)
}
// 尝试处理可能的错误/警告/信息
if (data.Error || data.Warning || data.Info) {
handleInfoMessage(tab, data)
}
}
}
}
@@ -419,8 +403,23 @@ export function useSchedulerLogic() {
}
const handleSignalMessage = (tab: SchedulerTab, data: any) => {
if (data.Accomplish) {
console.log('[Scheduler] 处理Signal消息:', data)
// 只有收到WebSocket的Accomplish信号才将任务标记为结束状态
// 这确保了调度台状态与实际任务执行状态严格同步
if (data && data.Accomplish) {
console.log('[Scheduler] 收到Accomplish信号设置任务状态为结束')
// 使用Vue的响应式更新方式
tab.status = '结束'
console.log('[Scheduler] 已更新tab.status为结束当前tab状态:', tab.status)
// 强制触发Vue响应式更新
const tabIndex = schedulerTabs.value.findIndex(t => t.key === tab.key)
if (tabIndex !== -1) {
const updatedTab: SchedulerTab = { ...tab }
schedulerTabs.value.splice(tabIndex, 1, updatedTab)
console.log('[Scheduler] 已强制更新schedulerTabs当前tabs状态:', schedulerTabs.value)
}
if (tab.websocketId) {
ws.unsubscribe(tab.websocketId)
@@ -430,9 +429,12 @@ export function useSchedulerLogic() {
notification.success({ message: '任务完成', description: data.Accomplish })
checkAllTasksCompleted()
saveTabsToStorage(schedulerTabs.value)
// 触发Vue的响应式更新
schedulerTabs.value = [...schedulerTabs.value]
}
if (data.power && data.power !== 'NoAction') {
if (data && data.power && data.power !== 'NoAction') {
powerAction.value = data.power as PowerIn.signal
savePowerActionToStorage(powerAction.value)
startPowerCountdown()
@@ -607,4 +609,4 @@ export function useSchedulerLogic() {
loadTaskOptions,
cleanup,
}
}
}