From 70fc623f5491515bd50a22108932f3f9cb424aaa Mon Sep 17 00:00:00 2001 From: AoXuan Date: Mon, 22 Sep 2025 01:05:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E5=AE=9E=E7=8E=B0TaskManage?= =?UTF-8?q?r=20WebSocket=E6=B6=88=E6=81=AF=E8=87=AA=E5=8A=A8=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E8=B0=83=E5=BA=A6=E5=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TaskManager_WebSocket_Implementation.md | 126 ++++++++++++++++++ frontend/src/composables/useWebSocket.ts | 59 +++++--- frontend/src/views/scheduler/index.vue | 3 + .../src/views/scheduler/useSchedulerLogic.ts | 96 +++++++++++-- 4 files changed, 250 insertions(+), 34 deletions(-) create mode 100644 TaskManager_WebSocket_Implementation.md diff --git a/TaskManager_WebSocket_Implementation.md b/TaskManager_WebSocket_Implementation.md new file mode 100644 index 0000000..a2ab0d2 --- /dev/null +++ b/TaskManager_WebSocket_Implementation.md @@ -0,0 +1,126 @@ +# TaskManager WebSocket消息处理功能实现 + +## 功能概述 + +根据后端TaskManager的WebSocket消息机制,实现了前端对ID为"TaskManager"的WebSocket消息的完整处理逻辑。 + +## 后端TaskManager消息分析 + +### 消息格式 +```json +{ + "id": "TaskManager", + "type": "Signal", + "data": { + "newTask": "任务UUID" + } +} +``` + +### 触发时机 +当后端启动时运行的队列开始执行时,TaskManager会发送此消息通知前端有新任务被自动创建。 + +## 前端实现 + +### 1. WebSocket订阅机制 +在`useSchedulerLogic.ts`中添加了以下功能: + +- **subscribeToTaskManager()**: 订阅ID为"TaskManager"的WebSocket消息 +- **handleTaskManagerMessage()**: 处理TaskManager发送的消息 +- **createSchedulerTabForTask()**: 根据任务ID自动创建调度台 + +### 2. 自动调度台创建逻辑 + +当收到`newTask`信号时,系统会: + +1. **检查重复**: 验证是否已存在相同websocketId的调度台 +2. **创建调度台**: 自动创建新的调度台标签页 +3. **设置状态**: 直接将调度台状态设置为"运行" +4. **建立连接**: 立即订阅该任务的WebSocket消息 +5. **用户提示**: 显示成功创建的消息提示 + +### 3. 调度台特性 + +自动创建的调度台具有以下特性: +- 标题格式:`自动调度台{编号}` +- 初始状态:`运行` +- 可关闭:`true`(但运行时不可删除) +- 自动订阅:立即开始接收任务消息 + +### 4. 生命周期管理 + +- **初始化**: 在组件挂载时调用`initialize()`订阅TaskManager消息 +- **清理**: 在组件卸载时取消TaskManager订阅 +- **任务结束**: 复用现有的任务结束处理逻辑 + +## 代码修改点 + +### 1. useSchedulerLogic.ts +```typescript +// 新增TaskManager消息订阅 +const subscribeToTaskManager = () => { + ws.subscribe('TaskManager', { + onMessage: (message) => handleTaskManagerMessage(message) + }) +} + +// 新增TaskManager消息处理 +const handleTaskManagerMessage = (wsMessage: any) => { + if (type === 'Signal' && data && data.newTask) { + createSchedulerTabForTask(data.newTask) + } +} + +// 新增自动调度台创建 +const createSchedulerTabForTask = (taskId: string) => { + // 创建运行状态的调度台并立即订阅 +} +``` + +### 2. index.vue +```typescript +// 生命周期中添加初始化调用 +onMounted(() => { + initialize() // 订阅TaskManager消息 + loadTaskOptions() +}) +``` + +## 功能特点 + +### 1. 无缝集成 +- 完全复用现有的调度台逻辑和UI组件 +- 与手动创建的调度台行为一致 +- 支持所有现有功能(日志显示、任务总览、消息处理等) + +### 2. 状态同步 +- 调度台状态与后端任务状态严格同步 +- 支持任务完成后的自动状态更新 +- 正确处理WebSocket连接的建立和清理 + +### 3. 用户体验 +- 自动切换到新创建的调度台 +- 提供清晰的成功提示 +- 防止重复创建相同任务的调度台 + +### 4. 错误处理 +- 检查消息格式的有效性 +- 防止重复订阅和创建 +- 优雅处理异常情况 + +## 测试验证 + +功能实现后需要验证以下场景: + +1. **启动时队列**: 后端启动时运行的队列应自动创建调度台 +2. **消息接收**: 调度台应正确接收和显示任务消息 +3. **状态更新**: 任务状态变化应正确反映在UI上 +4. **任务结束**: 任务完成后应正确清理资源 +5. **重复处理**: 相同任务不应创建多个调度台 + +## 兼容性 + +- 完全向后兼容现有功能 +- 不影响手动创建的调度台 +- 保持现有的WebSocket消息处理机制 +- 复用所有现有的UI组件和样式 \ No newline at end of file diff --git a/frontend/src/composables/useWebSocket.ts b/frontend/src/composables/useWebSocket.ts index b5cf498..703c17d 100644 --- a/frontend/src/composables/useWebSocket.ts +++ b/frontend/src/composables/useWebSocket.ts @@ -1,5 +1,5 @@ import { ref, type Ref } from 'vue' -import { message, Modal, notification } from 'ant-design-vue' +import { Modal } from 'ant-design-vue' // 基础配置 const BASE_WS_URL = 'ws://localhost:36163/api/core/ws' @@ -280,7 +280,9 @@ const getMessageQueue = (): Map): void => { +const setMessageQueue = ( + queue: Map +): void => { const global = getGlobalStorage() global.messageQueue.value = queue } @@ -295,7 +297,7 @@ const handleMessage = (raw: WebSocketBaseMessage) => { type: msgType, id: id, data: raw.data, - fullMessage: raw + fullMessage: raw, }) if (id) { @@ -304,7 +306,7 @@ const handleMessage = (raw: WebSocketBaseMessage) => { messageId: id, hasSubscriber: !!sub, totalSubscribers: global.subscribers.value.size, - allSubscriberIds: Array.from(global.subscribers.value.keys()) + allSubscriberIds: Array.from(global.subscribers.value.keys()), }) if (sub) { // 有订阅者,直接分发消息 @@ -313,9 +315,9 @@ const handleMessage = (raw: WebSocketBaseMessage) => { } else { // 没有订阅者,将消息暂存到队列中 console.log(`[WebSocket Debug] 没有找到ID为${id}的订阅者,将消息暂存到队列`) - + const currentMessageQueue = getMessageQueue() - + // 对于Map类型的消息队列,直接存储或更新 currentMessageQueue.set(id, { message: raw, timestamp: Date.now() }) console.log(`[WebSocket Debug] 添加新消息到队列,ID: ${id}, Type: ${msgType}`) @@ -329,11 +331,11 @@ const handleMessage = (raw: WebSocketBaseMessage) => { deletedCount++ } }) - + if (deletedCount > 0) { console.log(`[WebSocket Debug] 清理了${deletedCount}条过期消息`) } - + // 更新消息队列 setMessageQueue(currentMessageQueue) } @@ -404,9 +406,9 @@ const createGlobalWebSocket = (): WebSocket => { } catch { /* ignore */ } - + // 自动订阅ID为"Main"的消息,用于处理ping-pong等系统消息 - _subscribe("Main", { + _subscribe('Main', { onMessage: (message: WebSocketBaseMessage) => { // 处理系统级消息(如ping-pong) if (message && message.type === 'Signal' && message.data) { @@ -433,8 +435,8 @@ const createGlobalWebSocket = (): WebSocket => { return } } - } - }); + }, + }) } ws.onmessage = ev => { @@ -599,7 +601,7 @@ export function useWebSocket() { const subscribe = (id: string, handlers: Omit) => { // 使用全局的subscribe函数来确保消息队列机制正常工作 - _subscribe(id, handlers); + _subscribe(id, handlers) } const unsubscribe = (id: string) => { @@ -668,15 +670,20 @@ export function useWebSocket() { export const _subscribe = (id: string, subscriber: Omit) => { const global = getGlobalStorage() const fullSubscriber: WebSocketSubscriber = { ...subscriber, id } - + // 添加订阅者 global.subscribers.value.set(id, fullSubscriber) console.log('[WebSocket] 添加订阅者:', id, '当前订阅者数量:', global.subscribers.value.size) - + // 检查消息队列中是否有该订阅者的消息 const messageQueue = getMessageQueue() - console.log('[WebSocket] 检查消息队列,当前队列大小:', messageQueue.size, '队列内容:', Array.from(messageQueue.entries())) - + console.log( + '[WebSocket] 检查消息队列,当前队列大小:', + messageQueue.size, + '队列内容:', + Array.from(messageQueue.entries()) + ) + // 检查特定ID的消息 const queuedMessage = messageQueue.get(id) if (queuedMessage) { @@ -697,13 +704,21 @@ export const _subscribe = (id: string, subscriber: Omit { - if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒 - console.log('[WebSocket] 清理过期消息:', msgId, '消息内容:', queued.message, '时间戳:', queued.timestamp) + if (now - queued.timestamp > 60000) { + // 1分钟 = 60000毫秒 + console.log( + '[WebSocket] 清理过期消息:', + msgId, + '消息内容:', + queued.message, + '时间戳:', + queued.timestamp + ) messageQueue.delete(msgId) cleanedCount++ } @@ -724,11 +739,11 @@ const handleMessageDispatch = (raw: WebSocketBaseMessage, sub: WebSocketSubscrib if (sub.onMessage) { return sub.onMessage(raw) } - + // 如果没有 onMessage 处理函数,则记录错误(理论上不应该出现) console.error('[WebSocket] 错误:订阅者没有定义onMessage处理函数', { subscriberId: sub.id, messageType: msgType, - messageContent: raw + messageContent: raw, }) } diff --git a/frontend/src/views/scheduler/index.vue b/frontend/src/views/scheduler/index.vue index eba4242..efea200 100644 --- a/frontend/src/views/scheduler/index.vue +++ b/frontend/src/views/scheduler/index.vue @@ -140,6 +140,7 @@