refactor(skland): 森空岛签到功能拆分独立

This commit is contained in:
DLmaster361
2025-06-02 14:05:31 +08:00
parent e505ea8c51
commit 73c15b5e93
4 changed files with 272 additions and 220 deletions

View File

@@ -30,20 +30,15 @@ from PySide6.QtCore import QObject, Signal, QEventLoop, QFileSystemWatcher, QTim
import json
import subprocess
import shutil
import time
import re
import hashlib
import hmac
import requests
import win32com.client
from datetime import datetime, timedelta
from pathlib import Path
from urllib import parse
from jinja2 import Environment, FileSystemLoader
from typing import Union, List, Dict
from app.core import Config, MaaConfig, MaaUserConfig
from app.services import Notify, Crypto, System
from app.services import Notify, Crypto, System, skland_sign_in
class MaaManager(QObject):
@@ -232,11 +227,28 @@ class MaaManager(QObject):
self.update_log_text.emit("正在执行森空岛签到中\n请稍候~")
skland_result = self.skland_sign(
skland_result = skland_sign_in(
Crypto.win_decryptor(user_data["Info"]["SklandToken"])
)
if skland_result["total"] > 0 and skland_result["fail"] == 0:
for type, user_list in skland_result.items():
if type != "总计" and len(user_list) > 0:
logger.info(
f"{self.name} | 用户: {user[0]} - 森空岛签到{type}: {''.join(user_list)}"
)
self.push_info_bar.emit(
"info",
f"森空岛签到{type}",
"".join(user_list),
-1,
)
if (
skland_result["总计"] > 0
and len(skland_result["失败"]) == 0
):
user_data["Data"][
"LastSklandDate"
] = datetime.now().strftime("%Y-%m-%d")
@@ -942,6 +954,9 @@ class MaaManager(QObject):
self.sleep(self.wait_time)
if self.isInterruptionRequested:
return None
# 移除静默进程标记
Config.silence_list.remove(self.emulator_path)
@@ -1011,7 +1026,8 @@ class MaaManager(QObject):
else:
logger.info(f"{self.name} | 无法连接到ADB地址{ADB_address}")
self.play_sound.emit("ADB失败")
if not self.isInterruptionRequested:
self.play_sound.emit("ADB失败")
def refresh_maa_log(self) -> None:
"""刷新MAA日志"""
@@ -1779,216 +1795,6 @@ class MaaManager(QObject):
with self.maa_tasks_path.open(mode="w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def skland_sign(self, token) -> dict:
"""森空岛签到"""
app_code = "4ca99fa6b56cc2ba"
# 用于获取grant code
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 用于获取cred
cred_code_url = (
"https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
)
# 查询角色绑定
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 签到接口
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 基础请求头
header = {
"cred": "",
"User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 34;) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
header_login = header.copy()
header_for_sign = {
"platform": "1",
"timestamp": "",
"dId": "",
"vName": "1.5.1",
}
# 生成签名
def generate_signature(token_for_sign: str, path, body_or_query):
"""
生成请求签名
:param token_for_sign: 用于加密的token
:param path: 请求路径(如 /api/v1/game/player/binding
:param body_or_query: GET用query字符串POST用body字符串
:return: (sign, 新的header_for_sign字典)
"""
t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
token_bytes = token_for_sign.encode("utf-8")
header_ca = dict(header_for_sign)
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str # 拼接原始字符串
# HMAC-SHA256 + MD5得到最终sign
hex_s = hmac.new(token_bytes, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
# 获取带签名的header
def get_sign_header(url: str, method, body, old_header, sign_token):
"""
获取带签名的请求头
:param url: 请求完整url
:param method: 请求方式 GET/POST
:param body: POST请求体或GET时为None
:param old_header: 原始请求头
:param sign_token: 当前会话的签名token
:return: 新请求头
"""
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
sign, header_ca = generate_signature(sign_token, p.path, p.query)
else:
sign, header_ca = generate_signature(
sign_token, p.path, json.dumps(body) if body else ""
)
h["sign"] = sign
for i in header_ca:
h[i] = header_ca[i]
return h
# 复制请求头并添加cred
def copy_header(cred):
v = json.loads(json.dumps(header))
v["cred"] = cred
return v
# 使用token一步步拿到cred和sign_token
def login_by_token(token_code):
"""
:param token_code: 你的skyland token
:return: (cred, sign_token)
"""
try:
# token为json对象时提取data.content
t = json.loads(token_code)
token_code = t["data"]["content"]
except:
pass
grant_code = get_grant_code(token_code)
return get_cred(grant_code)
# 通过grant code换cred和sign_token
def get_cred(grant):
rsp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=header_login
).json()
if rsp["code"] != 0:
raise Exception(f'获得cred失败{rsp.get("messgae")}')
sign_token = rsp["data"]["token"]
cred = rsp["data"]["cred"]
return cred, sign_token
# 通过token换grant code
def get_grant_code(token):
rsp = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=header_login,
).json()
if rsp["status"] != 0:
raise Exception(
f'使用token: {token} 获得认证代码失败:{rsp.get("msg")}'
)
return rsp["data"]["code"]
# 获取已绑定的角色列表
def get_binding_list(cred, sign_token):
"""
查询绑定的角色
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 角色列表
"""
v = []
rsp = requests.get(
binding_url,
headers=get_sign_header(
binding_url, "get", None, copy_header(cred), sign_token
),
).json()
if rsp["code"] != 0:
logger.error(f"{self.name} | 请求角色列表出现问题:{rsp['message']}")
if rsp.get("message") == "用户未登录":
logger.error(f"{self.name} | 用户登录可能失效了,请重新登录!")
return v
# 只取明日方舟arknights的绑定账号
for i in rsp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
# 执行签到
def do_sign(cred, sign_token):
"""
对所有绑定的角色进行签到
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: (签到成功数量, 失败数量)
"""
characters = get_binding_list(cred, sign_token)
success_num = 0
fail_num = 0
for i in characters:
body = {"uid": i.get("uid"), "gameId": i.get("channelMasterId")}
rsp = requests.post(
sign_url,
headers=get_sign_header(
sign_url, "post", body, copy_header(cred), sign_token
),
json=body,
).json()
if rsp["code"] != 0:
if rsp.get("message") == "请勿重复签到!":
self.push_info_bar.emit(
"warning",
"森空岛重复签到",
f"{i.get("nickName")}{i.get("channelName")}",
-1,
)
success_num += 1
continue
else:
self.push_info_bar.emit(
"warning",
"森空岛签到失败",
f"{i.get("nickName")}{i.get("channelName")}",
-1,
)
fail_num += 1
continue
awards = rsp["data"]["awards"]
for j in awards:
res = j["resource"]
self.push_info_bar.emit(
"success",
"森空岛签到成功",
f"{i.get("nickName")}{i.get("channelName")}",
3,
)
success_num += 1
return success_num, fail_num
# 主流程
try:
# 拿到cred和sign_token
cred, sign_token = login_by_token(token)
self.sleep(1)
# 依次签到
success, fail = do_sign(cred, sign_token)
self.sleep(3)
return {"success": success, "fail": fail, "total": success + fail}
except Exception as e:
logger.error(f"{self.name} | 森空岛签到失败: {e}")
return {"success": 0, "fail": 0, "total": 0}
def push_notification(
self,
mode: str,

View File

@@ -32,5 +32,6 @@ __license__ = "GPL-3.0 license"
from .notification import Notify
from .security import Crypto
from .system import System
from .skland import skland_sign_in
__all__ = ["Notify", "Crypto", "System"]
__all__ = ["Notify", "Crypto", "System", "skland_sign_in"]

239
app/services/skland.py Normal file
View File

@@ -0,0 +1,239 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file incorporates work covered by the following copyright and
# permission notice:
#
# skland-checkin-ghaction Copyright © 2023 Yanstory
# https://github.com/Yanstory/skland-checkin-ghaction
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA森空岛服务
v4.3
作者DLmaster_361、ClozyA
"""
from loguru import logger
import time
import json
import hmac
import hashlib
import requests
from urllib import parse
def skland_sign_in(token) -> dict:
"""森空岛签到"""
app_code = "4ca99fa6b56cc2ba"
# 用于获取grant code
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 用于获取cred
cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
# 查询角色绑定
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 签到接口
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 基础请求头
header = {
"cred": "",
"User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 34;) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
header_login = header.copy()
header_for_sign = {
"platform": "1",
"timestamp": "",
"dId": "",
"vName": "1.5.1",
}
# 生成签名
def generate_signature(token_for_sign: str, path, body_or_query):
"""
生成请求签名
:param token_for_sign: 用于加密的token
:param path: 请求路径(如 /api/v1/game/player/binding
:param body_or_query: GET用query字符串POST用body字符串
:return: (sign, 新的header_for_sign字典)
"""
t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
token_bytes = token_for_sign.encode("utf-8")
header_ca = dict(header_for_sign)
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str # 拼接原始字符串
# HMAC-SHA256 + MD5得到最终sign
hex_s = hmac.new(token_bytes, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
# 获取带签名的header
def get_sign_header(url: str, method, body, old_header, sign_token):
"""
获取带签名的请求头
:param url: 请求完整url
:param method: 请求方式 GET/POST
:param body: POST请求体或GET时为None
:param old_header: 原始请求头
:param sign_token: 当前会话的签名token
:return: 新请求头
"""
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
sign, header_ca = generate_signature(sign_token, p.path, p.query)
else:
sign, header_ca = generate_signature(
sign_token, p.path, json.dumps(body) if body else ""
)
h["sign"] = sign
for i in header_ca:
h[i] = header_ca[i]
return h
# 复制请求头并添加cred
def copy_header(cred):
v = json.loads(json.dumps(header))
v["cred"] = cred
return v
# 使用token一步步拿到cred和sign_token
def login_by_token(token_code):
"""
:param token_code: 你的skyland token
:return: (cred, sign_token)
"""
try:
# token为json对象时提取data.content
t = json.loads(token_code)
token_code = t["data"]["content"]
except:
pass
grant_code = get_grant_code(token_code)
return get_cred(grant_code)
# 通过grant code换cred和sign_token
def get_cred(grant):
rsp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=header_login
).json()
if rsp["code"] != 0:
raise Exception(f'获得cred失败{rsp.get("messgae")}')
sign_token = rsp["data"]["token"]
cred = rsp["data"]["cred"]
return cred, sign_token
# 通过token换grant code
def get_grant_code(token):
rsp = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=header_login,
).json()
if rsp["status"] != 0:
raise Exception(f'使用token: {token} 获得认证代码失败:{rsp.get("msg")}')
return rsp["data"]["code"]
# 获取已绑定的角色列表
def get_binding_list(cred, sign_token):
"""
查询绑定的角色
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 角色列表
"""
v = []
rsp = requests.get(
binding_url,
headers=get_sign_header(
binding_url, "get", None, copy_header(cred), sign_token
),
).json()
if rsp["code"] != 0:
logger.error(f"森空岛服务 | 请求角色列表出现问题:{rsp['message']}")
if rsp.get("message") == "用户未登录":
logger.error(f"森空岛服务 | 用户登录可能失效了,请重新登录!")
return v
# 只取明日方舟arknights的绑定账号
for i in rsp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
# 执行签到
def do_sign(cred, sign_token) -> dict:
"""
对所有绑定的角色进行签到
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 签到结果字典
"""
characters = get_binding_list(cred, sign_token)
result = {"成功": [], "重复": [], "失败": [], "总计": len(characters)}
for character in characters:
body = {
"uid": character.get("uid"),
"gameId": character.get("channelMasterId"),
}
rsp = requests.post(
sign_url,
headers=get_sign_header(
sign_url, "post", body, copy_header(cred), sign_token
),
json=body,
).json()
if rsp["code"] != 0:
result[
"重复" if rsp.get("message") == "请勿重复签到!" else "失败"
].append(
f"{character.get("nickName")}{character.get("channelName")}"
)
else:
result["成功"].append(
f"{character.get("nickName")}{character.get("channelName")}"
)
time.sleep(3)
return result
# 主流程
try:
# 拿到cred和sign_token
cred, sign_token = login_by_token(token)
time.sleep(1)
# 依次签到
return do_sign(cred, sign_token)
except Exception as e:
logger.error(f"森空岛服务 | 森空岛签到失败: {e}")
return {"成功": [], "重复": [], "失败": [], "总计": 0}

View File

@@ -1,6 +1,12 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file incorporates work covered by the following copyright and
# permission notice:
#
# ZenlessZoneZero-OneDragon Copyright © 2024-2025 DoctorReid
# https://github.com/DoctorReid/ZenlessZoneZero-OneDragon
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify