From 7b0307070e8ac784f67bee694ec996d6c97c4b44 Mon Sep 17 00:00:00 2001 From: MoeSnowyFox Date: Sat, 27 Sep 2025 03:32:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=A8=A1=E6=8B=9F?= =?UTF-8?q?=E5=99=A8=E7=AE=A1=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/utils/device_manager/__init__.py | 5 + app/utils/device_manager/general.py | 491 ++++++++++++++++++++++++ app/utils/device_manager/ldplayer.py | 309 +++++++++++++++ app/utils/device_manager/mumu.py | 220 +++++++++++ app/utils/device_manager/postMessage.py | 156 ++++++++ app/utils/device_manager/utils.py | 104 +++++ 6 files changed, 1285 insertions(+) create mode 100644 app/utils/device_manager/__init__.py create mode 100644 app/utils/device_manager/general.py create mode 100644 app/utils/device_manager/ldplayer.py create mode 100644 app/utils/device_manager/mumu.py create mode 100644 app/utils/device_manager/postMessage.py create mode 100644 app/utils/device_manager/utils.py diff --git a/app/utils/device_manager/__init__.py b/app/utils/device_manager/__init__.py new file mode 100644 index 0000000..817b8d9 --- /dev/null +++ b/app/utils/device_manager/__init__.py @@ -0,0 +1,5 @@ +from .mumu import MumuManager +from .ldplayer import LDManager +from .utils import BaseDevice, DeviceStatus + +__all__ = ["MumuManager", "LDManager", "BaseDevice", "DeviceStatus"] diff --git a/app/utils/device_manager/general.py b/app/utils/device_manager/general.py new file mode 100644 index 0000000..949a657 --- /dev/null +++ b/app/utils/device_manager/general.py @@ -0,0 +1,491 @@ +# AUTO-MAS: A Multi-Script, Multi-Config Management and Automation Software +# Copyright © 2024-2025 DLmaster361 + +# This file is part of AUTO-MAS. + +# AUTO-MAS is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, +# or (at your option) any later version. + +# AUTO-MAS is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See +# the GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with AUTO-MAS. If not, see . + +# Contact: DLmaster_361@163.com + + +import asyncio +import psutil +import subprocess +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, Any +from app.utils.device_manager.utils import BaseDevice, DeviceStatus +from app.utils.logger import get_logger + + +class ProcessManager: + """进程监视器类, 用于跟踪主进程及其所有子进程的状态""" + + def __init__(self): + super().__init__() + + self.main_pid = None + self.tracked_pids = set() + self.check_task = None + self.track_end_time = datetime.now() + + async def open_process( + self, path: Path, args: list = [], tracking_time: int = 60 + ) -> None: + """ + 启动一个新进程并返回其pid, 并开始监视该进程 + + Parameters + ---------- + path: 可执行文件的路径 + args: 启动参数列表 + tracking_time: 子进程追踪持续时间(秒) + """ + + process = subprocess.Popen( + [path, *args], + cwd=path.parent, + creationflags=subprocess.CREATE_NO_WINDOW, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + await self.start_monitoring(process.pid, tracking_time) + + async def start_monitoring(self, pid: int, tracking_time: int = 60) -> None: + """ + 启动进程监视器, 跟踪指定的主进程及其子进程 + + :param pid: 被监视进程的PID + :param tracking_time: 子进程追踪持续时间(秒) + """ + + await self.clear() + + self.main_pid = pid + self.tracking_time = tracking_time + + # 扫描并记录所有相关进程 + try: + # 获取主进程 + main_proc = psutil.Process(self.main_pid) + self.tracked_pids.add(self.main_pid) + + # 递归获取所有子进程 + if tracking_time: + for child in main_proc.children(recursive=True): + self.tracked_pids.add(child.pid) + + except psutil.NoSuchProcess: + pass + + # 启动持续追踪任务 + if tracking_time > 0: + self.track_end_time = datetime.now() + timedelta(seconds=tracking_time) + self.check_task = asyncio.create_task(self.track_processes()) + + async def track_processes(self) -> None: + """更新子进程列表""" + + while datetime.now() < self.track_end_time: + current_pids = set(self.tracked_pids) + for pid in current_pids: + try: + proc = psutil.Process(pid) + for child in proc.children(): + if child.pid not in self.tracked_pids: + # 新发现的子进程 + self.tracked_pids.add(child.pid) + except psutil.NoSuchProcess: + continue + await asyncio.sleep(0.1) + + async def is_running(self) -> bool: + """检查所有跟踪的进程是否还在运行""" + + for pid in self.tracked_pids: + try: + proc = psutil.Process(pid) + if proc.is_running(): + return True + except psutil.NoSuchProcess: + continue + + return False + + async def kill(self, if_force: bool = False) -> None: + """停止监视器并中止所有跟踪的进程""" + + for pid in self.tracked_pids: + try: + proc = psutil.Process(pid) + if if_force: + kill_process = subprocess.Popen( + ["taskkill", "/F", "/T", "/PID", str(pid)], + creationflags=subprocess.CREATE_NO_WINDOW, + ) + kill_process.wait() + proc.terminate() + except psutil.NoSuchProcess: + continue + + await self.clear() + + async def clear(self) -> None: + """清空跟踪的进程列表""" + + if self.check_task is not None and not self.check_task.done(): + self.check_task.cancel() + + try: + await self.check_task + except asyncio.CancelledError: + pass + + self.main_pid = None + self.tracked_pids.clear() + + +class GeneralDeviceManager(BaseDevice): + """ + 通用设备管理器,基于BaseDevice和ProcessManager实现 + 用于管理一般应用程序进程 + """ + + def __init__(self, executable_path: str, name: str = "通用设备"): + """ + 初始化通用设备管理器 + + Args: + executable_path (str): 可执行文件的绝对路径 + name (str): 设备管理器名称 + """ + self.executable_path = Path(executable_path) + self.name = name + self.logger = get_logger(f"{name}管理器") + + # 进程管理实例字典,以idx为键 + self.process_managers: Dict[str, ProcessManager] = {} + + # 设备信息存储 + self.device_info: Dict[str, Dict[str, Any]] = {} + + # 默认等待时间 + self.wait_time = 60 + + if not self.executable_path.exists(): + raise FileNotFoundError(f"可执行文件不存在: {executable_path}") + + async def start(self, idx: str, package_name: str = "") -> tuple[bool, int, dict]: + """ + 启动设备 + + Args: + idx: 设备ID + package_name: 包名(可选) + + Returns: + tuple[bool, int, dict]: (是否成功, 状态码, 启动信息) + """ + try: + # 检查是否已经在运行 + current_status = await self.get_status(idx) + if current_status in [DeviceStatus.ONLINE, DeviceStatus.STARTING]: + self.logger.warning(f"设备{idx}已经在运行,状态: {current_status}") + return False, current_status, {} + + # 创建进程管理器 + if idx not in self.process_managers: + self.process_managers[idx] = ProcessManager() + + # 准备启动参数 + args = [] + if package_name: + args.extend(["-pkg", package_name]) + + # 启动进程 + await self.process_managers[idx].open_process( + self.executable_path, args, tracking_time=self.wait_time + ) + + # 等待进程启动 + start_time = datetime.now() + timeout = timedelta(seconds=self.wait_time) + + while datetime.now() - start_time < timeout: + if await self.process_managers[idx].is_running(): + self.device_info[idx] = { + "title": f"{self.name}_{idx}", + "status": str(DeviceStatus.ONLINE), + "pid": self.process_managers[idx].main_pid, + "start_time": start_time.isoformat(), + } + + self.logger.info(f"设备{idx}启动成功") + return True, DeviceStatus.ONLINE, self.device_info[idx] + + await asyncio.sleep(0.1) + + self.logger.error(f"设备{idx}启动超时") + return False, DeviceStatus.ERROR, {} + + except Exception as e: + self.logger.error(f"启动设备{idx}失败: {str(e)}") + return False, DeviceStatus.ERROR, {} + + async def close(self, idx: str) -> tuple[bool, int]: + """ + 关闭设备或服务 + + Args: + idx: 设备ID + + Returns: + tuple[bool, int]: (是否成功, 状态码) + """ + try: + if idx not in self.process_managers: + self.logger.warning(f"设备{idx}的进程管理器不存在") + return False, DeviceStatus.NOT_FOUND + + # 检查进程是否在运行 + if not await self.process_managers[idx].is_running(): + self.logger.info(f"设备{idx}进程已经停止") + return True, DeviceStatus.OFFLINE + + # 终止进程 + await self.process_managers[idx].kill(if_force=False) + + # 等待进程完全停止 + stop_time = datetime.now() + timeout = timedelta(seconds=10) # 10秒超时 + + while datetime.now() - stop_time < timeout: + if not await self.process_managers[idx].is_running(): + # 清理设备信息 + if idx in self.device_info: + del self.device_info[idx] + + self.logger.info(f"设备{idx}已成功关闭") + return True, DeviceStatus.OFFLINE + + await asyncio.sleep(0.1) + + # 强制终止 + self.logger.warning(f"设备{idx}未能正常关闭,尝试强制终止") + await self.process_managers[idx].kill(if_force=True) + + if idx in self.device_info: + del self.device_info[idx] + + return True, DeviceStatus.OFFLINE + + except Exception as e: + self.logger.error(f"关闭设备{idx}失败: {str(e)}") + return False, DeviceStatus.ERROR + + async def get_status(self, idx: str) -> int: + """ + 获取指定设备当前状态 + + Args: + idx: 设备ID + + Returns: + int: 状态码 + """ + try: + if idx not in self.process_managers: + return DeviceStatus.OFFLINE + + if await self.process_managers[idx].is_running(): + return DeviceStatus.ONLINE + else: + return DeviceStatus.OFFLINE + + except Exception as e: + self.logger.error(f"获取设备{idx}状态失败: {str(e)}") + return DeviceStatus.ERROR + + async def hide_device(self, idx: str) -> tuple[bool, int]: + """ + 隐藏设备窗口 + + Args: + idx: 设备ID + + Returns: + tuple[bool, int]: (是否成功, 状态码) + """ + try: + status = await self.get_status(idx) + if status != DeviceStatus.ONLINE: + return False, status + + if ( + idx not in self.process_managers + or not self.process_managers[idx].main_pid + ): + return False, DeviceStatus.NOT_FOUND + + # 窗口隐藏功能(简化实现) + # 注意:完整的窗口隐藏功能需要更复杂的Windows API调用 + self.logger.info(f"设备{idx}窗口隐藏请求已处理(简化实现)") + return True, DeviceStatus.ONLINE + + self.logger.info(f"设备{idx}窗口已隐藏") + return True, DeviceStatus.ONLINE + + except ImportError: + self.logger.warning("隐藏窗口功能需要pywin32库") + return False, DeviceStatus.ERROR + except Exception as e: + self.logger.error(f"隐藏设备{idx}窗口失败: {str(e)}") + return False, DeviceStatus.ERROR + + async def show_device(self, idx: str) -> tuple[bool, int]: + """ + 显示设备窗口 + + Args: + idx: 设备ID + + Returns: + tuple[bool, int]: (是否成功, 状态码) + """ + try: + status = await self.get_status(idx) + if status != DeviceStatus.ONLINE: + return False, status + + if ( + idx not in self.process_managers + or not self.process_managers[idx].main_pid + ): + return False, DeviceStatus.NOT_FOUND + + # 窗口显示功能(简化实现) + # 注意:完整的窗口显示功能需要更复杂的Windows API调用 + self.logger.info(f"设备{idx}窗口显示请求已处理(简化实现)") + return True, DeviceStatus.ONLINE + + self.logger.info(f"设备{idx}窗口已显示") + return True, DeviceStatus.ONLINE + + except ImportError: + self.logger.warning("显示窗口功能需要pywin32库") + return False, DeviceStatus.ERROR + except Exception as e: + self.logger.error(f"显示设备{idx}窗口失败: {str(e)}") + return False, DeviceStatus.ERROR + + async def get_all_info(self) -> dict[str, dict[str, str]]: + """ + 获取所有设备信息 + + Returns: + dict[str, dict[str, str]]: 设备信息字典 + 结构示例: + { + "0": { + "title": "设备名称", + "status": "1" + } + } + """ + result = {} + + for idx in list(self.process_managers.keys()): + try: + status = await self.get_status(idx) + + if idx in self.device_info: + title = self.device_info[idx].get("title", f"{self.name}_{idx}") + else: + title = f"{self.name}_{idx}" + + result[idx] = {"title": title, "status": str(status)} + + except Exception as e: + self.logger.error(f"获取设备{idx}信息失败: {str(e)}") + result[idx] = { + "title": f"{self.name}_{idx}", + "status": str(DeviceStatus.ERROR), + } + + return result + + async def cleanup(self) -> None: + """ + 清理所有资源 + """ + self.logger.info("开始清理设备管理器资源") + + for idx, pm in list(self.process_managers.items()): + try: + if await pm.is_running(): + await pm.kill(if_force=True) + await pm.clear() + except Exception as e: + self.logger.error(f"清理设备{idx}资源失败: {str(e)}") + + self.process_managers.clear() + self.device_info.clear() + + self.logger.info("设备管理器资源清理完成") + + def __del__(self): + """析构函数,确保资源被正确释放""" + try: + # 注意:析构函数中不能使用async/await + # 这里只是标记,实际清理需要显式调用cleanup() + if hasattr(self, "process_managers") and self.process_managers: + self.logger.warning("设备管理器未正确清理,请显式调用cleanup()方法") + except: # noqa: E722 + pass + + +# 使用示例 +if __name__ == "__main__": + + async def main(): + # 创建通用设备管理器 + manager = GeneralDeviceManager( + executable_path=r"C:\Windows\System32\notepad.exe", name="记事本" + ) + + try: + # 启动设备 + success, status, info = await manager.start("0") + print(f"启动结果: {success}, 状态: {status}, 信息: {info}") + + if success: + # 获取所有设备信息 + all_info = await manager.get_all_info() + print(f"所有设备信息: {all_info}") + + # 等待5秒 + await asyncio.sleep(5) + + # 关闭设备 + close_success, close_status = await manager.close("0") + print(f"关闭结果: {close_success}, 状态: {close_status}") + + finally: + # 清理资源 + await manager.cleanup() + + # 运行示例 + asyncio.run(main()) diff --git a/app/utils/device_manager/ldplayer.py b/app/utils/device_manager/ldplayer.py new file mode 100644 index 0000000..2e505d1 --- /dev/null +++ b/app/utils/device_manager/ldplayer.py @@ -0,0 +1,309 @@ +import asyncio +from typing import Literal +from app.utils.device_manager.utils import BaseDevice, ExeRunner, DeviceStatus +from app.utils.logger import get_logger +from app.utils.device_manager.postMessage import ( + post_keys_to_hwnd, + post_keys_to_hwnd_sync, +) +import psutil +from pydantic import BaseModel +import win32gui + + +class EmulatorInfo(BaseModel): + idx: int + title: str + top_hwnd: int + bind_hwnd: int + in_android: int + pid: int + vbox_pid: int + width: int + height: int + density: int + + +class LDManager(BaseDevice): + """ + 基于dnconsole.exe的模拟器管理 + + !需要管理员权限 + + """ + + def __init__(self, exe_path: str) -> None: + """_summary_ + + Args: + exe_path (str): dnconsole.exe的绝对路径 + """ + self.runner = ExeRunner(exe_path, "gbk") + self.logger = get_logger("雷电模拟器管理器") + self.wait_time = 60 # 配置获取 后续改一下 单位为s + + async def start(self, idx: str, package_name="") -> tuple[bool, int, dict]: + """ + 启动指定模拟器 + Returns: + tuple[bool, int, str]: 是否成功, 当前状态码, ADB端口信息 + """ + OK, info, status = await self.get_device_info(idx) + if status != DeviceStatus.OFFLINE: + self.logger.error( + f"模拟器{idx}未处于关闭状态,当前状态码: {status}, 需求状态码: {DeviceStatus.OFFLINE}" + ) + return False, status, {} + if package_name: + result = self.runner.run( + "launch", + "--index", + idx, + "--packagename", + f'"{package_name}"', + ) + else: + result = self.runner.run( + "launch", + "--index", + idx, + ) + # 参考命令 dnconsole.exe launch --index 0 + self.logger.debug(f"启动结果:{result}") + if result.returncode != 0: + raise RuntimeError(f"命令执行失败: {result}") + + i = 0 + while i < self.wait_time * 10: + OK, info, status = await self.get_device_info(idx) + if status == DeviceStatus.ERROR or status == DeviceStatus.UNKNOWN: + self.logger.error(f"模拟器{idx}启动失败,状态码: {status}") + return False, status, {} + if status == DeviceStatus.ONLINE: + self.logger.debug(info) + if OK and isinstance(info, EmulatorInfo): + pid: int = info.vbox_pid + adb_port = "" + adb_host_ip = await self.get_adb_ports(pid) + print(adb_host_ip) + if adb_host_ip: + return ( + True, + status, + {"adb_port": adb_port, "adb_host_ip": adb_host_ip}, + ) + + return True, status, {} + await asyncio.sleep(0.1) + i += 1 + return False, DeviceStatus.UNKNOWN, {} + + async def close(self, idx: str) -> tuple[bool, int]: + """ + 关闭指定模拟器 + Returns: + - tuple[bool, int]: 是否成功, 当前状态码 + + 参考命令行:dnconsole.exe quit --index 0 + """ + OK, info, status = await self.get_device_info(idx) + if status != DeviceStatus.ONLINE and status != DeviceStatus.STARTING: + return False, DeviceStatus.NOT_FOUND + result = self.runner.run( + "quit", + "--index", + idx, + ) + # 参考命令 dnconsole.exe quit --index 0 + if result.returncode != 0: + return True, DeviceStatus.OFFLINE + i = 0 + while i < self.wait_time * 10: + OK, info, status = await self.get_device_info(idx) + if status == DeviceStatus.ERROR or status == DeviceStatus.UNKNOWN: + return False, status + if status == DeviceStatus.OFFLINE: + return True, DeviceStatus.OFFLINE + await asyncio.sleep(0.1) + i += 1 + + return False, DeviceStatus.UNKNOWN + + async def get_status(self, idx: str) -> int: + """ + 获取指定模拟器当前状态 + 返回值: 状态码 + """ + _, _, status = await self.get_device_info(idx) + return status + + async def get_device_info( + self, + idx: str, + data: dict[int, EmulatorInfo] | None = None, + ) -> tuple[Literal[True], EmulatorInfo, int] | tuple[Literal[False], dict, int]: + """ + 获取指定模拟器的信息和状态 + Returns: + - tuple[bool, EmulatorInfo | dict, int]: 是否成功, 模拟器信息或空字典, 状态码 + + 参考命令行:dnconsole.exe list2 + """ + if not data: + result = await self._get_all_info() + else: + result = data + + try: + emulator_info = result.get(int(idx)) + if not emulator_info: + self.logger.error(f"未找到模拟器{idx}的信息") + return False, {}, DeviceStatus.UNKNOWN + + self.logger.debug(f"获取模拟器{idx}信息: {emulator_info}") + + # 计算状态码 + if emulator_info.in_android == 1: + status = DeviceStatus.STARTING + elif emulator_info.in_android == 2: + if emulator_info.vbox_pid > 0: + status = DeviceStatus.ONLINE + else: + status = DeviceStatus.STARTING + elif emulator_info.in_android == 0: + status = DeviceStatus.OFFLINE + else: + status = DeviceStatus.UNKNOWN + + self.logger.debug(f"获取模拟器{idx}状态: {status}") + return True, emulator_info, status + except: # noqa: E722 + self.logger.error(f"获取模拟器{idx}信息失败") + return False, {}, DeviceStatus.UNKNOWN + + async def _get_all_info(self) -> dict[int, EmulatorInfo]: + result = self.runner.run("list2") + # self.logger.debug(f"全部信息{result.stdout.strip()}") + if result.returncode != 0: + raise RuntimeError(f"命令执行失败: {result}") + + emulators: dict[int, EmulatorInfo] = {} + data = result.stdout.strip() + + for line in data.strip().splitlines(): + parts = line.strip().split(",") + if len(parts) != 10: + raise ValueError(f"数据格式错误: {line}") + try: + info = EmulatorInfo( + idx=int(parts[0]), + title=parts[1], + top_hwnd=int(parts[2]), + bind_hwnd=int(parts[3]), + in_android=int(parts[4]), + pid=int(parts[5]), + vbox_pid=int(parts[6]), + width=int(parts[7]), + height=int(parts[8]), + density=int(parts[9]), + ) + emulators[info.idx] = info + except Exception as e: + self.logger.warning(f"解析失败: {line}, 错误: {e}") + pass + return emulators + + # ?wk雷电你都返回了什么啊 + + async def get_all_info(self) -> dict[str, dict[str, str]]: + """ + 解析_emulator_info字典,提取idx和title,便于前端显示 + """ + raw_data = await self._get_all_info() + result: dict[str, dict[str, str]] = {} + for info in raw_data.values(): + OK, device_info, status = await self.get_device_info( + str(info.idx), raw_data + ) + result[str(info.idx)] = {"title": info.title, "status": str(status)} + return result + + async def send_boss_key( + self, + idx: str, + boss_keys: list[int], + result: EmulatorInfo, + is_show: bool = False, # True: 显示, False: 隐藏 + ) -> bool: + """ + 发送BOSS键 + + Args: + idx (str): 模拟器索引 + boss_keys (list[int]): BOSS键的虚拟键码列表 + result (EmulatorInfo): 模拟器信息 + is_show (bool, optional): 隐藏或显示窗口,默认为 False(隐藏)。 + """ + hwnd = result.top_hwnd + if not hwnd: + return False + + await post_keys_to_hwnd(hwnd, boss_keys) + await asyncio.sleep(0.5) + if win32gui.IsWindowVisible(hwnd) == (not is_show): + return True + else: + status = await post_keys_to_hwnd_sync(hwnd, boss_keys) + return status == (not is_show) + + async def hide_device( + self, + idx: str, + boss_keys: list[int] = [], + ) -> tuple[bool, int]: + """隐藏设备窗口""" + OK, result, status = await self.get_device_info(idx) + if not OK or not isinstance(result, EmulatorInfo): + return False, DeviceStatus.UNKNOWN + if status != DeviceStatus.ONLINE: + return False, status + + return await self.send_boss_key(idx, boss_keys, result, False), status + + async def show_device( + self, + idx: str, + boss_keys: list[int] = [], + ) -> tuple[bool, int]: + """显示设备窗口""" + OK, result, status = await self.get_device_info(idx) + if not OK or not isinstance(result, EmulatorInfo): + return False, DeviceStatus.UNKNOWN + if status != DeviceStatus.ONLINE: + return False, status + + return await self.send_boss_key(idx, boss_keys, result, True), status + + async def get_adb_ports(self, pid: int) -> int: + """使用psutil获取adb端口""" + try: + process = psutil.Process(pid) + connections = process.connections(kind="inet") + for conn in connections: + if conn.status == psutil.CONN_LISTEN and conn.laddr.port != 2222: + return conn.laddr.port + return 0 # 如果没有找到合适的端口,返回0 + except: # noqa: E722 + return 0 + + +if __name__ == "__main__": + MANAGER_PATH = ( + r"C:\leidian\LDPlayer9\dnconsole.exe" # 替换为实际的dnconsole.exe路径 + ) + idx = "0" # 替换为实际存在的模拟器实例索 + + manager = LDManager(MANAGER_PATH) + # asyncio.run(manager._get_all_info()) + a = asyncio.run(manager.start("0")) + print(a) diff --git a/app/utils/device_manager/mumu.py b/app/utils/device_manager/mumu.py new file mode 100644 index 0000000..82d3e52 --- /dev/null +++ b/app/utils/device_manager/mumu.py @@ -0,0 +1,220 @@ +import asyncio +import json +from app.utils.device_manager.utils import BaseDevice, ExeRunner, DeviceStatus +from app.utils.logger import get_logger + + +class MumuManager(BaseDevice): + """ + 基于MuMuManager.exe的模拟器管理 + """ + + def __init__(self, exe_path: str) -> None: + """_summary_ + + Args: + exe_path (str): MuMuManager.exe的绝对路径 + """ + self.runner = ExeRunner(exe_path, "utf-8") + self.logger = get_logger("MuMu管理器") + self.wait_time = 60 # 配置获取 后续改一下 单位为s + + async def start(self, idx: str, package_name="") -> tuple[bool, int, dict]: + """ + 启动指定模拟器 + Returns: + tuple[bool, int, str]: 是否成功, 当前状态码, ADB端口信息 + """ + status = await self.get_status(idx) + if status != DeviceStatus.OFFLINE: + self.logger.error( + f"模拟器{idx}未处于关闭状态,当前状态码: {status}, 需求状态码: {DeviceStatus.OFFLINE}" + ) + return False, status, {} + if package_name: + result = self.runner.run( + "control", + "-v", + idx, + "launch", + "-pkg", + package_name, + ) + else: + result = self.runner.run( + "control", + "-v", + idx, + "launch", + ) + # 参考命令 MuMuManager.exe control -v 2 launch + self.logger.debug(f"启动结果:{result}") + if result.returncode != 0: + raise RuntimeError(f"命令执行失败: {result}") + + i = 0 + while i < self.wait_time * 10: + status = await self.get_status(idx) + if status == DeviceStatus.ERROR or status == DeviceStatus.UNKNOWN: + self.logger.error(f"模拟器{idx}启动失败,状态码: {status}") + return False, status, {} + if status == DeviceStatus.ONLINE: + OK, info = await self.get_device_info(idx) + self.logger.debug(info) + if OK: + data = json.loads(info) + adb_port = data.get("adb_port") + adb_host_ip = data.get("adb_host_ip") + if adb_port and adb_host_ip: + return ( + True, + status, + {"adb_port": adb_port, "adb_host_ip": adb_host_ip}, + ) + + return True, status, {} + await asyncio.sleep(0.1) + i += 1 + return False, DeviceStatus.UNKNOWN, {} + + async def close(self, idx: str) -> tuple[bool, int]: + """ + 关闭指定模拟器 + Returns: + tuple[bool, int]: 是否成功, 当前状态码 + """ + status = await self.get_status(idx) + if status != DeviceStatus.ONLINE and status != DeviceStatus.STARTING: + return False, DeviceStatus.NOT_FOUND + result = self.runner.run( + "control", + "-v", + idx, + "shutdown", + ) + # 参考命令 MuMuManager.exe control -v 2 shutdown + if result.returncode != 0: + return True, DeviceStatus.OFFLINE + i = 0 + while i < self.wait_time * 10: + status = await self.get_status(idx) + if status == DeviceStatus.ERROR or status == DeviceStatus.UNKNOWN: + return False, status + if status == DeviceStatus.OFFLINE: + return True, DeviceStatus.OFFLINE + await asyncio.sleep(0.1) + i += 1 + + return False, DeviceStatus.UNKNOWN + + async def get_status(self, idx: str, data: str | None = None) -> int: + if not data: + OK, result_str = await self.get_device_info(idx) + self.logger.debug(f"获取状态结果{result_str}") + else: + OK, result_str = True, data + + try: + result_json = json.loads(result_str) + + if OK: + if result_json["is_android_started"]: + return DeviceStatus.STARTING + elif result_json["is_process_started"]: + return DeviceStatus.ONLINE + else: + return DeviceStatus.OFFLINE + + else: + if result_json["errmsg"] == "unknown error": + return DeviceStatus.UNKNOWN + else: + return DeviceStatus.ERROR + + except json.JSONDecodeError as e: + self.logger.error(f"JSON解析错误: {e}") + return DeviceStatus.UNKNOWN + + async def get_device_info(self, idx: str) -> tuple[bool, str]: + result = self.runner.run( + "info", + "-v", + idx, + ) + self.logger.debug(f"获取模拟器{idx}信息: {result}") + if result.returncode != 0: + return False, result.stdout.strip() + else: + return True, result.stdout.strip() + + async def _get_all_info(self) -> str: + result = self.runner.run( + "info", + "-v", + "all", + ) + # self.logger.debug(f"result{result.stdout.strip()}") + if result.returncode != 0: + raise RuntimeError(f"命令执行失败: {result}") + return result.stdout.strip() + + async def get_all_info(self) -> dict[str, dict[str, str]]: + json_data = await self._get_all_info() + data = json.loads(json_data) + + result: dict[str, dict[str, str]] = {} + + if not data: + return result + + if isinstance(data, dict) and "index" in data and "name" in data: + index = data["index"] + name = data["name"] + status = self.get_status(index, json_data) + result[index] = { + "title": name, + "status": str(status), + } + + elif isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, dict) and "index" in value and "name" in value: + index = value["index"] + name = value["name"] + status = await self.get_status(index) + result[index] = { + "title": name, + "status": str(status), + } + + return result + + async def hide_device(self, idx: str) -> tuple[bool, int]: + """隐藏设备窗口""" + status = await self.get_status(idx) + if status != DeviceStatus.ONLINE: + return False, status + result = self.runner.run( + "control", + "-v", + idx, + "hide_window", + ) + if result.returncode != 0: + return False, status + return True, DeviceStatus.ONLINE + + async def show_device(self, idx: str) -> tuple[bool, int]: + """显示设备窗口""" + status = await self.get_status(idx) + if status != DeviceStatus.ONLINE: + return False, status + result = self.runner.run( + "control", + "-v", + idx, + "show_window", + ) + if result.returncode != 0: + return False, status + return True, DeviceStatus.ONLINE diff --git a/app/utils/device_manager/postMessage.py b/app/utils/device_manager/postMessage.py new file mode 100644 index 0000000..0f9b0f3 --- /dev/null +++ b/app/utils/device_manager/postMessage.py @@ -0,0 +1,156 @@ +import ctypes +import asyncio + +# 加载 user32.dll +user32 = ctypes.windll.user32 + +# Windows 消息常量 +WM_KEYDOWN = 0x0100 +WM_KEYUP = 0x0101 + + +async def post_key_to_hwnd(hwnd: int, vk_code: int) -> bool: + """ + 使用 PostMessage 向指定窗口句柄发送一个完整的按键(按下 + 释放)。 + !由于 PostMessage 是异步的,并不能保证按键一定被处理。 + + 参数: + hwnd (int): 目标窗口句柄 + vk_code (int): 虚拟键码 + + 注意: + - 此函数不检查 hwnd 是否有效,也不等待消息处理(异步)。 + - 如果 hwnd 无效,PostMessage 会静默失败。 + """ + if hwnd <= 0: + raise ValueError("hwnd 必须是正整数") + if not (0 <= vk_code <= 0xFFFF): + raise ValueError("vk_code 必须是有效的虚拟键码(0~65535)") + + user32.PostMessageW(hwnd, WM_KEYDOWN, vk_code, 0) + user32.PostMessageW(hwnd, WM_KEYUP, vk_code, 0) + return True + + +async def send_key_to_hwnd_sync(hwnd: int, vk_code: int) -> bool: + """ + 使用 SendMessage 向指定窗口句柄同步发送一个完整的按键(按下 + 释放)。 + + 参数: + hwnd (int): 目标窗口句柄 + vk_code (int): 虚拟键码 + + 返回: + bool: + - 对于 SendMessage,返回值是目标窗口过程(WindowProc)对消息的返回值。 + - 通常非零表示成功,但具体含义由目标窗口定义。 + - 如果 hwnd 无效,会返回 0。 + + 注意: + - 此调用是**同步阻塞**的:当前线程会等待目标窗口处理完消息才返回。 + - 如果目标窗口无响应(hung),当前程序也会卡住! + """ + if hwnd <= 0: + raise ValueError("hwnd 必须是正整数") + if not (0 <= vk_code <= 0xFFFF): + raise ValueError("vk_code 必须是有效的虚拟键码(0~65535)") + + # 发送 WM_KEYDOWN + result_down = user32.SendMessageW(hwnd, WM_KEYDOWN, vk_code, 0) + # 发送 WM_KEYUP + result_up = user32.SendMessageW(hwnd, WM_KEYUP, vk_code, 0) + + return bool(result_down and result_up) + + +async def post_keys_to_hwnd( + hwnd: int, vk_codes: list[int], hold_time: float = 0.05 +) -> bool: + """ + 使用 PostMessage 向指定窗口句柄同时发送多个按键 + !由于 PostMessage 是异步的,并不能保证按键一定被处理。 + + 参数: + hwnd (int): 目标窗口句柄 + vk_codes (List[int]): 虚拟键码列表 + hold_time (float): 按键保持时间(秒),默认 0.05 秒 + + 返回: + bool: 总是返回 True(PostMessage 不提供错误反馈) + + 注意: + - 此函数不检查 hwnd 是否有效,也不等待消息处理(异步)。 + - 如果 hwnd 无效,PostMessage 会静默失败。 + - 按键顺序:先按下所有键,等待,然后按相反顺序释放所有键。 + """ + if hwnd <= 0: + raise ValueError("hwnd 必须是正整数") + if not vk_codes: + raise ValueError("vk_codes 不能为空") + + # 验证所有虚拟键码 + for vk_code in vk_codes: + if not (0 <= vk_code <= 0xFFFF): + raise ValueError(f"vk_code {vk_code} 必须是有效的虚拟键码(0~65535)") + + # 按下所有按键 + for vk_code in vk_codes: + user32.PostMessageW(hwnd, WM_KEYDOWN, vk_code, 0) + + # 保持按键状态 + await asyncio.sleep(hold_time) + + # 按相反顺序释放所有按键(模拟真实按键行为) + for vk_code in reversed(vk_codes): + user32.PostMessageW(hwnd, WM_KEYUP, vk_code, 0) + + return True + + +async def post_keys_to_hwnd_sync( + hwnd: int, vk_codes: list[int], hold_time: float = 0.05 +) -> bool: + """ + 使用 SendMessage 向指定窗口句柄同步发送多个按键 + 先按下所有按键,等待指定时间,然后释放所有按键。 + + 参数: + hwnd (int): 目标窗口句柄 + vk_codes (List[int]): 虚拟键码列表 + hold_time (float): 按键保持时间(秒),默认 0.05 秒 + + 返回: + bool: 如果所有消息都成功发送则返回 True + + 注意: + - 此调用是**同步阻塞**的:当前线程会等待目标窗口处理完每个消息才继续。 + - 如果目标窗口无响应(hung),当前程序也会卡住! + - 按键顺序:先按下所有键,等待,然后按相反顺序释放所有键。 + """ + if hwnd <= 0: + raise ValueError("hwnd 必须是正整数") + if not vk_codes: + raise ValueError("vk_codes 不能为空") + + # 验证所有虚拟键码 + for vk_code in vk_codes: + if not (0 <= vk_code <= 0xFFFF): + raise ValueError(f"vk_code {vk_code} 必须是有效的虚拟键码(0~65535)") + + # 按下所有按键 + down_results = [] + for vk_code in vk_codes: + result = user32.SendMessageW(hwnd, WM_KEYDOWN, vk_code, 0) + down_results.append(result) + + # 保持按键状态 + await asyncio.sleep(hold_time) + + # 按相反顺序释放所有按键(模拟真实按键行为) + up_results = [] + for vk_code in reversed(vk_codes): + result = user32.SendMessageW(hwnd, WM_KEYUP, vk_code, 0) + up_results.append(result) + + # 如果所有按键操作都成功,则返回 True + return all(down_results) and all(up_results) diff --git a/app/utils/device_manager/utils.py b/app/utils/device_manager/utils.py new file mode 100644 index 0000000..4392028 --- /dev/null +++ b/app/utils/device_manager/utils.py @@ -0,0 +1,104 @@ +import subprocess +import os +from abc import ABC, abstractmethod +from enum import IntEnum + + +class DeviceStatus(IntEnum): + ONLINE = 0 + """设备在线""" + OFFLINE = 1 + """设备离线""" + STARTING = 2 + """设备开启中""" + CLOSEING = 3 + """设备关闭中""" + ERROR = 4 + """错误""" + NOT_FOUND = 5 + """未找到设备""" + UNKNOWN = 10 + + +class ExeRunner: + def __init__(self, exe_path, encoding) -> None: + """ + 指定 exe 路径 + !请传入绝对路径,使用/分隔路径 + """ + if not os.path.isfile(exe_path): + raise FileNotFoundError(f"找不到文件: {exe_path}") + self.exe_path = os.path.abspath(exe_path) # 转为绝对路径 + self.encoding = encoding + + def run(self, *args) -> subprocess.CompletedProcess[str]: + """ + 执行命令,返回结果 + """ + cmd = [self.exe_path] + list(args) + print(f"执行: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding=self.encoding, + errors="replace", + ) + return result + + +class BaseDevice(ABC): + @abstractmethod + async def start(self, idx: str, package_name: str) -> tuple[bool, int, dict]: + """ + 启动设备 + 返回值: (是否成功, 状态码, 启动信息) + """ + ... + + @abstractmethod + async def close(self, idx: str) -> tuple[bool, int]: + """ + 关闭设备或服务 + 返回值: (是否成功, 状态码) + """ + ... + + @abstractmethod + async def get_status(self, idx: str) -> int: + """ + 获取指定模拟器当前状态 + 返回值: 状态码 + """ + ... + + @abstractmethod + async def hide_device(self, idx: str) -> tuple[bool, int]: + """ + 隐藏设备窗口 + 返回值: (是否成功, 状态码) + """ + ... + + @abstractmethod + async def show_device(self, idx: str) -> tuple[bool, int]: + """ + 显示设备窗口 + 返回值: (是否成功, 状态码) + """ + ... + + async def get_all_info(self) -> dict[str, dict[str, str]]: + """ + 获取设备信息 + 返回值: 设备字典 + 结构示例: + { + "0":{ + "title": 模拟器名字, + "status": "1" + } + } + """ + ...