feat(scheduler): 实现TaskManager WebSocket消息自动创建调度台

This commit is contained in:
2025-09-22 01:05:31 +08:00
parent 32df65fb65
commit 70fc623f54
4 changed files with 250 additions and 34 deletions

View File

@@ -1,5 +1,5 @@
import { ref, type Ref } from 'vue'
import { message, Modal, notification } from 'ant-design-vue'
import { Modal } from 'ant-design-vue'
// 基础配置
const BASE_WS_URL = 'ws://localhost:36163/api/core/ws'
@@ -280,7 +280,9 @@ const getMessageQueue = (): Map<string, { message: WebSocketBaseMessage; timesta
}
// 设置消息队列
const setMessageQueue = (queue: Map<string, { message: WebSocketBaseMessage; timestamp: number }>): void => {
const setMessageQueue = (
queue: Map<string, { message: WebSocketBaseMessage; timestamp: number }>
): void => {
const global = getGlobalStorage()
global.messageQueue.value = queue
}
@@ -295,7 +297,7 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
type: msgType,
id: id,
data: raw.data,
fullMessage: raw
fullMessage: raw,
})
if (id) {
@@ -304,7 +306,7 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
messageId: id,
hasSubscriber: !!sub,
totalSubscribers: global.subscribers.value.size,
allSubscriberIds: Array.from(global.subscribers.value.keys())
allSubscriberIds: Array.from(global.subscribers.value.keys()),
})
if (sub) {
// 有订阅者,直接分发消息
@@ -313,9 +315,9 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
} else {
// 没有订阅者,将消息暂存到队列中
console.log(`[WebSocket Debug] 没有找到ID为${id}的订阅者,将消息暂存到队列`)
const currentMessageQueue = getMessageQueue()
// 对于Map类型的消息队列直接存储或更新
currentMessageQueue.set(id, { message: raw, timestamp: Date.now() })
console.log(`[WebSocket Debug] 添加新消息到队列ID: ${id}, Type: ${msgType}`)
@@ -329,11 +331,11 @@ const handleMessage = (raw: WebSocketBaseMessage) => {
deletedCount++
}
})
if (deletedCount > 0) {
console.log(`[WebSocket Debug] 清理了${deletedCount}条过期消息`)
}
// 更新消息队列
setMessageQueue(currentMessageQueue)
}
@@ -404,9 +406,9 @@ const createGlobalWebSocket = (): WebSocket => {
} catch {
/* ignore */
}
// 自动订阅ID为"Main"的消息用于处理ping-pong等系统消息
_subscribe("Main", {
_subscribe('Main', {
onMessage: (message: WebSocketBaseMessage) => {
// 处理系统级消息如ping-pong
if (message && message.type === 'Signal' && message.data) {
@@ -433,8 +435,8 @@ const createGlobalWebSocket = (): WebSocket => {
return
}
}
}
});
},
})
}
ws.onmessage = ev => {
@@ -599,7 +601,7 @@ export function useWebSocket() {
const subscribe = (id: string, handlers: Omit<WebSocketSubscriber, 'id'>) => {
// 使用全局的subscribe函数来确保消息队列机制正常工作
_subscribe(id, handlers);
_subscribe(id, handlers)
}
const unsubscribe = (id: string) => {
@@ -668,15 +670,20 @@ export function useWebSocket() {
export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id'>) => {
const global = getGlobalStorage()
const fullSubscriber: WebSocketSubscriber = { ...subscriber, id }
// 添加订阅者
global.subscribers.value.set(id, fullSubscriber)
console.log('[WebSocket] 添加订阅者:', id, '当前订阅者数量:', global.subscribers.value.size)
// 检查消息队列中是否有该订阅者的消息
const messageQueue = getMessageQueue()
console.log('[WebSocket] 检查消息队列,当前队列大小:', messageQueue.size, '队列内容:', Array.from(messageQueue.entries()))
console.log(
'[WebSocket] 检查消息队列,当前队列大小:',
messageQueue.size,
'队列内容:',
Array.from(messageQueue.entries())
)
// 检查特定ID的消息
const queuedMessage = messageQueue.get(id)
if (queuedMessage) {
@@ -697,13 +704,21 @@ export const _subscribe = (id: string, subscriber: Omit<WebSocketSubscriber, 'id
} else {
console.log('[WebSocket] 未在队列中找到ID为', id, '的消息')
}
// 清理过期消息超过1分钟的消息
const now = Date.now()
let cleanedCount = 0
messageQueue.forEach((queued, msgId) => {
if (now - queued.timestamp > 60000) { // 1分钟 = 60000毫秒
console.log('[WebSocket] 清理过期消息:', msgId, '消息内容:', queued.message, '时间戳:', queued.timestamp)
if (now - queued.timestamp > 60000) {
// 1分钟 = 60000毫秒
console.log(
'[WebSocket] 清理过期消息:',
msgId,
'消息内容:',
queued.message,
'时间戳:',
queued.timestamp
)
messageQueue.delete(msgId)
cleanedCount++
}
@@ -724,11 +739,11 @@ const handleMessageDispatch = (raw: WebSocketBaseMessage, sub: WebSocketSubscrib
if (sub.onMessage) {
return sub.onMessage(raw)
}
// 如果没有 onMessage 处理函数,则记录错误(理论上不应该出现)
console.error('[WebSocket] 错误订阅者没有定义onMessage处理函数', {
subscriberId: sub.id,
messageType: msgType,
messageContent: raw
messageContent: raw,
})
}

View File

@@ -140,6 +140,7 @@
<script setup lang="ts">
import { onMounted, onUnmounted } from 'vue'
import { LockOutlined } from '@ant-design/icons-vue'
import {
getPowerActionText,
POWER_ACTION_TEXT,
@@ -189,6 +190,7 @@ const {
cancelMessage,
// 初始化与清理
initialize,
loadTaskOptions,
cleanup,
@@ -208,6 +210,7 @@ const onSchedulerTabEdit = (targetKey: string | MouseEvent, action: 'add' | 'rem
// 生命周期
onMounted(() => {
initialize() // 初始化TaskManager订阅
loadTaskOptions()
})

View File

@@ -82,10 +82,10 @@ export function useSchedulerLogic() {
let tabCounter =
schedulerTabs.value.length > 1
? Math.max(
...schedulerTabs.value
.filter(tab => tab.key.startsWith('tab-'))
.map(tab => parseInt(tab.key.replace('tab-', '')) || 0)
) + 1
...schedulerTabs.value
.filter(tab => tab.key.startsWith('tab-'))
.map(tab => parseInt(tab.key.replace('tab-', '')) || 0)
) + 1
: 1
// 任务选项
@@ -106,6 +106,67 @@ export function useSchedulerLogic() {
// WebSocket 实例
const ws = useWebSocket()
// 订阅TaskManager消息处理自动创建的任务
const subscribeToTaskManager = () => {
ws.subscribe('TaskManager', {
onMessage: (message) => handleTaskManagerMessage(message)
})
}
const handleTaskManagerMessage = (wsMessage: any) => {
if (!wsMessage || typeof wsMessage !== 'object') return
const { type, data } = wsMessage
console.log('[Scheduler] 收到TaskManager消息:', { type, data })
if (type === 'Signal' && data && data.newTask) {
// 收到新任务信号,自动创建调度台
const taskId = data.newTask
console.log('[Scheduler] 收到新任务信号任务ID:', taskId)
// 创建新的调度台
createSchedulerTabForTask(taskId)
}
}
const createSchedulerTabForTask = (taskId: string) => {
// 检查是否已经存在相同websocketId的调度台
const existingTab = schedulerTabs.value.find(tab => tab.websocketId === taskId)
if (existingTab) {
console.log('[Scheduler] 调度台已存在,切换到该调度台:', existingTab.title)
activeSchedulerTab.value = existingTab.key
return
}
// 创建新的调度台
tabCounter++
const tab: SchedulerTab = {
key: `tab-${tabCounter}`,
title: `自动调度台${tabCounter}`,
closable: true,
status: '运行', // 直接设置为运行状态
selectedTaskId: null,
selectedMode: TaskCreateIn.mode.AutoMode,
websocketId: taskId, // 设置websocketId
taskQueue: [],
userQueue: [],
logs: [],
isLogAtBottom: true,
lastLogContent: '',
}
schedulerTabs.value.push(tab)
activeSchedulerTab.value = tab.key
// 立即订阅该任务的WebSocket消息
subscribeToTask(tab)
console.log('[Scheduler] 已创建新的自动调度台:', tab.title, '任务ID:', taskId)
message.success(`已自动创建调度台: ${tab.title}`)
saveTabsToStorage(schedulerTabs.value)
}
// 计算属性
const canChangePowerAction = computed(() => {
return !schedulerTabs.value.some(tab => tab.status === '运行')
@@ -186,7 +247,7 @@ export function useSchedulerLogic() {
// 清理日志引用
logRefs.value.delete(key)
// 清理任务总览面板引用
overviewRefs.value.delete(key)
@@ -327,7 +388,7 @@ export function useSchedulerLogic() {
console.log('传递 WebSocket 消息给 TaskOverviewPanel:', wsMessage)
overviewPanel.handleWSMessage(wsMessage)
}
// 处理task_dict初始化消息
if (data.task_dict && Array.isArray(data.task_dict)) {
// 初始化任务队列 - 保持原始状态
@@ -335,7 +396,7 @@ export function useSchedulerLogic() {
name: item.name || '未知任务',
status: item.status || '等待', // 使用实际状态,而不是强制设置为等待
}));
// 初始化用户队列(仅包含运行状态下的用户)
const newUserQueue: QueueItem[] = [];
data.task_dict.forEach((taskItem: any) => {
@@ -351,11 +412,11 @@ export function useSchedulerLogic() {
});
}
});
tab.taskQueue.splice(0, tab.taskQueue.length, ...newTaskQueue);
tab.userQueue.splice(0, tab.userQueue.length, ...newUserQueue);
}
// 更新任务队列
if (data.task_list && Array.isArray(data.task_list)) {
const newTaskQueue = data.task_list.map((item: any) => ({
@@ -414,7 +475,7 @@ export function useSchedulerLogic() {
const handleSignalMessage = (tab: SchedulerTab, data: any) => {
console.log('[Scheduler] 处理Signal消息:', data)
// 只有收到WebSocket的Accomplish信号才将任务标记为结束状态
// 这确保了调度台状态与实际任务执行状态严格同步
if (data && data.Accomplish) {
@@ -422,7 +483,7 @@ export function useSchedulerLogic() {
// 使用Vue的响应式更新方式
tab.status = '结束'
console.log('[Scheduler] 已更新tab.status为结束当前tab状态:', tab.status)
// 强制触发Vue响应式更新
const tabIndex = schedulerTabs.value.findIndex(t => t.key === tab.key)
if (tabIndex !== -1) {
@@ -439,7 +500,7 @@ export function useSchedulerLogic() {
message.success('任务完成')
checkAllTasksCompleted()
saveTabsToStorage(schedulerTabs.value)
// 触发Vue的响应式更新
schedulerTabs.value = [...schedulerTabs.value]
}
@@ -571,12 +632,22 @@ export function useSchedulerLogic() {
}
}
// 初始化函数
const initialize = () => {
// 订阅TaskManager消息
subscribeToTaskManager()
console.log('[Scheduler] 已订阅TaskManager消息')
}
// 清理函数
const cleanup = () => {
if (powerCountdownTimer) {
clearInterval(powerCountdownTimer)
}
// 取消订阅TaskManager
ws.unsubscribe('TaskManager')
schedulerTabs.value.forEach(tab => {
if (tab.websocketId) {
ws.unsubscribe(tab.websocketId)
@@ -625,6 +696,7 @@ export function useSchedulerLogic() {
cancelMessage,
// 初始化与清理
initialize,
loadTaskOptions,
cleanup,