feat(maa): 森空岛签到功能上线

This commit is contained in:
DLmaster361
2025-06-02 02:35:01 +08:00
parent 2d72ca66a4
commit e505ea8c51
7 changed files with 354 additions and 22 deletions

View File

@@ -32,14 +32,18 @@ import subprocess
import shutil
import time
import re
import hashlib
import hmac
import requests
import win32com.client
from datetime import datetime, timedelta
from pathlib import Path
from urllib import parse
from jinja2 import Environment, FileSystemLoader
from typing import Union, List, Dict
from app.core import Config, MaaConfig, MaaUserConfig
from app.services import Notify, System
from app.services import Notify, Crypto, System
class MaaManager(QObject):
@@ -220,6 +224,34 @@ class MaaManager(QObject):
user_logs_list = []
user_start_time = datetime.now()
if user_data["Info"]["IfSkland"] and user_data["Info"]["SklandToken"]:
if user_data["Data"]["LastSklandDate"] != datetime.now().strftime(
"%Y-%m-%d"
):
self.update_log_text.emit("正在执行森空岛签到中\n请稍候~")
skland_result = self.skland_sign(
Crypto.win_decryptor(user_data["Info"]["SklandToken"])
)
if skland_result["total"] > 0 and skland_result["fail"] == 0:
user_data["Data"][
"LastSklandDate"
] = datetime.now().strftime("%Y-%m-%d")
self.play_sound.emit("森空岛签到成功")
else:
self.play_sound.emit("森空岛签到失败")
elif user_data["Info"]["IfSkland"]:
logger.warning(
f"{self.name} | 用户: {user[0]} - 未配置森空岛签到Token跳过森空岛签到"
)
self.push_info_bar.emit(
"warning", "森空岛签到失败", "未配置鹰角网络通行证登录凭证", -1
)
# 剿灭-日常模式循环
for mode in ["Annihilation", "Routine"]:
@@ -1747,6 +1779,216 @@ class MaaManager(QObject):
with self.maa_tasks_path.open(mode="w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def skland_sign(self, token) -> dict:
"""森空岛签到"""
app_code = "4ca99fa6b56cc2ba"
# 用于获取grant code
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 用于获取cred
cred_code_url = (
"https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
)
# 查询角色绑定
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 签到接口
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 基础请求头
header = {
"cred": "",
"User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 34;) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
header_login = header.copy()
header_for_sign = {
"platform": "1",
"timestamp": "",
"dId": "",
"vName": "1.5.1",
}
# 生成签名
def generate_signature(token_for_sign: str, path, body_or_query):
"""
生成请求签名
:param token_for_sign: 用于加密的token
:param path: 请求路径(如 /api/v1/game/player/binding
:param body_or_query: GET用query字符串POST用body字符串
:return: (sign, 新的header_for_sign字典)
"""
t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
token_bytes = token_for_sign.encode("utf-8")
header_ca = dict(header_for_sign)
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str # 拼接原始字符串
# HMAC-SHA256 + MD5得到最终sign
hex_s = hmac.new(token_bytes, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
# 获取带签名的header
def get_sign_header(url: str, method, body, old_header, sign_token):
"""
获取带签名的请求头
:param url: 请求完整url
:param method: 请求方式 GET/POST
:param body: POST请求体或GET时为None
:param old_header: 原始请求头
:param sign_token: 当前会话的签名token
:return: 新请求头
"""
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
sign, header_ca = generate_signature(sign_token, p.path, p.query)
else:
sign, header_ca = generate_signature(
sign_token, p.path, json.dumps(body) if body else ""
)
h["sign"] = sign
for i in header_ca:
h[i] = header_ca[i]
return h
# 复制请求头并添加cred
def copy_header(cred):
v = json.loads(json.dumps(header))
v["cred"] = cred
return v
# 使用token一步步拿到cred和sign_token
def login_by_token(token_code):
"""
:param token_code: 你的skyland token
:return: (cred, sign_token)
"""
try:
# token为json对象时提取data.content
t = json.loads(token_code)
token_code = t["data"]["content"]
except:
pass
grant_code = get_grant_code(token_code)
return get_cred(grant_code)
# 通过grant code换cred和sign_token
def get_cred(grant):
rsp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=header_login
).json()
if rsp["code"] != 0:
raise Exception(f'获得cred失败{rsp.get("messgae")}')
sign_token = rsp["data"]["token"]
cred = rsp["data"]["cred"]
return cred, sign_token
# 通过token换grant code
def get_grant_code(token):
rsp = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=header_login,
).json()
if rsp["status"] != 0:
raise Exception(
f'使用token: {token} 获得认证代码失败:{rsp.get("msg")}'
)
return rsp["data"]["code"]
# 获取已绑定的角色列表
def get_binding_list(cred, sign_token):
"""
查询绑定的角色
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 角色列表
"""
v = []
rsp = requests.get(
binding_url,
headers=get_sign_header(
binding_url, "get", None, copy_header(cred), sign_token
),
).json()
if rsp["code"] != 0:
logger.error(f"{self.name} | 请求角色列表出现问题:{rsp['message']}")
if rsp.get("message") == "用户未登录":
logger.error(f"{self.name} | 用户登录可能失效了,请重新登录!")
return v
# 只取明日方舟arknights的绑定账号
for i in rsp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
# 执行签到
def do_sign(cred, sign_token):
"""
对所有绑定的角色进行签到
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: (签到成功数量, 失败数量)
"""
characters = get_binding_list(cred, sign_token)
success_num = 0
fail_num = 0
for i in characters:
body = {"uid": i.get("uid"), "gameId": i.get("channelMasterId")}
rsp = requests.post(
sign_url,
headers=get_sign_header(
sign_url, "post", body, copy_header(cred), sign_token
),
json=body,
).json()
if rsp["code"] != 0:
if rsp.get("message") == "请勿重复签到!":
self.push_info_bar.emit(
"warning",
"森空岛重复签到",
f"{i.get("nickName")}{i.get("channelName")}",
-1,
)
success_num += 1
continue
else:
self.push_info_bar.emit(
"warning",
"森空岛签到失败",
f"{i.get("nickName")}{i.get("channelName")}",
-1,
)
fail_num += 1
continue
awards = rsp["data"]["awards"]
for j in awards:
res = j["resource"]
self.push_info_bar.emit(
"success",
"森空岛签到成功",
f"{i.get("nickName")}{i.get("channelName")}",
3,
)
success_num += 1
return success_num, fail_num
# 主流程
try:
# 拿到cred和sign_token
cred, sign_token = login_by_token(token)
self.sleep(1)
# 依次签到
success, fail = do_sign(cred, sign_token)
self.sleep(3)
return {"success": success, "fail": fail, "total": success + fail}
except Exception as e:
logger.error(f"{self.name} | 森空岛签到失败: {e}")
return {"success": 0, "fail": 0, "total": 0}
def push_notification(
self,
mode: str,