From 27e5d238a80d6488a6a6f695ac252f861bb860ee Mon Sep 17 00:00:00 2001 From: hyb-oyqq <1512383570@qq.com> Date: Wed, 16 Jul 2025 16:18:39 +0800 Subject: [PATCH] =?UTF-8?q?refactor(source):=20=E9=87=8D=E6=9E=84=20Main.p?= =?UTF-8?q?y=20=E6=96=87=E4=BB=B6=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 删除了大量未使用的导入语句 - 移除了冗长的配置信息和全局变量定义 - 精简了主窗口类 MainWindow 的代码 - 提取了部分代码到新的文件 main_window.py 中 --- source/Main.py | 891 +----------------------------------------- source/config.py | 64 +++ source/download.py | 167 ++++++++ source/main_window.py | 471 ++++++++++++++++++++++ source/utils.py | 197 ++++++++++ 5 files changed, 902 insertions(+), 888 deletions(-) create mode 100644 source/config.py create mode 100644 source/download.py create mode 100644 source/main_window.py create mode 100644 source/utils.py diff --git a/source/Main.py b/source/Main.py index 750babe..de09298 100644 --- a/source/Main.py +++ b/source/Main.py @@ -1,894 +1,9 @@ -import os -import py7zr -import requests -import shutil -import hashlib import sys -import base64 -import psutil -import ctypes -import concurrent.futures -import webbrowser -from PySide6.QtGui import QIcon, QAction -from collections import deque -from pic_data import img_data - -from PySide6 import QtCore, QtWidgets -from PySide6.QtCore import (Qt, Signal, QThread, QTimer) -from PySide6.QtGui import (QIcon, QPixmap, ) -from PySide6.QtWidgets import (QApplication, QLabel, QMainWindow, - QProgressBar, QVBoxLayout, QFileDialog, QDialog) - -from Ui_install import Ui_MainWindows -from animations import MultiStageAnimations -import sys -import os - -def resource_path(relative_path): - if getattr(sys, 'frozen', False): - if hasattr(sys, '_MEIPASS'): - base_path = sys._MEIPASS # type: ignore - else: - base_path = os.path.dirname(sys.executable) - else: - base_path = os.path.dirname(os.path.abspath(__file__)) - return os.path.join(base_path, relative_path) - -def load_base64_image(base64_str): - pixmap = QPixmap() - pixmap.loadFromData(base64.b64decode(base64_str)) - return pixmap - -# 配置信息 -app_data = { - "APP_VERSION": "1.0.0", - "APP_NAME": "FRAISEMOE Addons Installer NEXT", - "TEMP": "TEMP", - "CACHE": "FRAISEMOE", - "PLUGIN": "PLUGIN", - "CONFIG_URL": "aHR0cHM6Ly9hcmNoaXZlLm92b2Zpc2guY29tL2FwaS93aWRnZXQvbmVrb3BhcmEvZG93bmxvYWRfdXJsX2RlYnVnLmpzb24=", - "UA": "TW96aWxsYS81LjAgKExpbnV4IGRlYmlhbjEyIEZyYWlzZU1vZTItQWNjZXB0KSBHZWNrby8yMDEwMDEwMSBGaXJlZm94LzExNC4wIEZyYWlzZU1vZTIvMS4wLjA=", - "game_info": { - "NEKOPARA Vol.1": { - "exe": "nekopara_vol1.exe", - "hash": "04b48b231a7f34431431e5027fcc7b27affaa951b8169c541709156acf754f3e", - "install_path": "NEKOPARA Vol. 1/adultsonly.xp3", - "plugin_path": "vol.1/adultsonly.xp3", - }, - "NEKOPARA Vol.2": { - "exe": "nekopara_vol2.exe", - "hash": "b9c00a2b113a1e768bf78400e4f9075ceb7b35349cdeca09be62eb014f0d4b42", - "install_path": "NEKOPARA Vol. 2/adultsonly.xp3", - "plugin_path": "vol.2/adultsonly.xp3", - }, - "NEKOPARA Vol.3": { - "exe": "NEKOPARAvol3.exe", - "hash": "2ce7b223c84592e1ebc3b72079dee1e5e8d064ade15723328a64dee58833b9d5", - "install_path": "NEKOPARA Vol. 3/update00.int", - "plugin_path": "vol.3/update00.int", - }, - "NEKOPARA Vol.4": { - "exe": "nekopara_vol4.exe", - "hash": "4a4a9ae5a75a18aacbe3ab0774d7f93f99c046afe3a777ee0363e8932b90f36a", - "install_path": "NEKOPARA Vol. 4/vol4adult.xp3", - "plugin_path": "vol.4/vol4adult.xp3", - }, - "NEKOPARA After": { - "exe": "nekopara_after.exe", - "hash": "eb26ff6850096a240af8340ba21c5c3232e90f29fb8191e24b6ce701acae0aa9", - "install_path": "NEKOPARA After/afteradult.xp3", - "plugin_path": "after/afteradult.xp3", - "sig_path": "after/afteradult.xp3.sig" - }, - }, -} - -# Base64解码 -def decode_base64(encoded_str): - return base64.b64decode(encoded_str).decode("utf-8") - -# 全局变量 -APP_VERSION = app_data["APP_VERSION"] -APP_NAME = app_data["APP_NAME"] -TEMP = os.getenv(app_data["TEMP"]) or app_data["TEMP"] -CACHE = os.path.join(TEMP, app_data["CACHE"]) -PLUGIN = os.path.join(CACHE, app_data["PLUGIN"]) -CONFIG_URL = decode_base64(app_data["CONFIG_URL"]) -UA = decode_base64(app_data["UA"]) -GAME_INFO = app_data["game_info"] -BLOCK_SIZE = 67108864 -HASH_SIZE = 134217728 -PLUGIN_HASH = {game: info["hash"] for game, info in GAME_INFO.items()} -PROCESS_INFO = {info["exe"]: game for game, info in GAME_INFO.items()} - -def msgbox_frame(title, text, buttons=QtWidgets.QMessageBox.StandardButton.NoButton): - msg_box = QtWidgets.QMessageBox() - msg_box.setWindowTitle(title) - msg_box.setWindowModality(Qt.WindowModality.WindowModal) - - # 设置弹窗图标 - icon_data = img_data.get("icon") - if icon_data: - pixmap = load_base64_image(icon_data) - if not pixmap.isNull(): - msg_box.setWindowIcon(QIcon(pixmap)) - msg_box.setIconPixmap(pixmap.scaled(64, 64, QtCore.Qt.AspectRatioMode.KeepAspectRatio)) - else: - msg_box.setIcon(QtWidgets.QMessageBox.Icon.Information) - - msg_box.setText(text) - msg_box.setStandardButtons(buttons) - return msg_box - -# 哈希值计算类 -class HashManager: - def __init__(self, HASH_SIZE): - self.HASH_SIZE = HASH_SIZE - - def hash_calculate(self, file_path): - sha256_hash = hashlib.sha256() - with open(file_path, "rb") as f: - for byte_block in iter(lambda: f.read(self.HASH_SIZE), b""): - sha256_hash.update(byte_block) - return sha256_hash.hexdigest() - - def calculate_hashes_in_parallel(self, file_paths): - with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_file = { - executor.submit(self.hash_calculate, path): path for path in file_paths - } - results = {} - for future in concurrent.futures.as_completed(future_to_file): - file_path = future_to_file[future] - try: - results[file_path] = future.result() - except Exception as e: - results[file_path] = None - msg_box = msgbox_frame( - f"错误 - {APP_NAME}", - f"\n文件哈希值计算失败\n\n【错误信息】:{e}\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - return results - - def hash_pop_window(self): - msg_box = msgbox_frame(f"通知 - {APP_NAME}", "\n正在检验文件状态...\n") - msg_box.open() - QApplication.processEvents() - return msg_box - - def cfg_pre_hash_compare(self, install_path, game_version, plugin_hash, installed_status): - if not os.path.exists(install_path): - installed_status[game_version] = False - return - file_hash = self.hash_calculate(install_path) - if file_hash == plugin_hash[game_version]: - installed_status[game_version] = True - else: - reply = msgbox_frame( - f"文件校验 - {APP_NAME}", - f"\n检测到 {game_version} 的文件哈希值不匹配,是否重新安装?\n", - QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, - ).exec() - if reply == QtWidgets.QMessageBox.StandardButton.Yes: - installed_status[game_version] = False - else: - installed_status[game_version] = True - - def cfg_after_hash_compare(self, install_paths, plugin_hash, installed_status): - passed = True - file_paths = [ - install_paths[game] for game in plugin_hash if installed_status.get(game) - ] - hash_results = self.calculate_hashes_in_parallel(file_paths) - - for game, hash_value in plugin_hash.items(): - if installed_status.get(game): - file_hash = hash_results.get(install_paths[game]) - if file_hash != hash_value: - msg_box = msgbox_frame( - f"文件校验 - {APP_NAME}", - f"\n检测到 {game} 的文件哈希值不匹配\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - installed_status[game] = False - passed = False - break - return passed - -# 管理员权限检查类 -class AdminPrivileges: - def __init__(self): - self.required_exes = [ - "nekopara_vol1.exe", - "nekopara_vol2.exe", - "NEKOPARAvol3.exe", - "nekopara_vol4.exe", - "nekopara_after.exe", - ] - - def is_admin(self): - try: - return ctypes.windll.shell32.IsUserAnAdmin() - except: - return False - - def request_admin_privileges(self): - if not self.is_admin(): - msg_box = msgbox_frame( - f"权限检测 - {APP_NAME}", - "\n需要管理员权限运行此程序\n", - QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, - ) - reply = msg_box.exec() - if reply == QtWidgets.QMessageBox.StandardButton.Yes: - try: - ctypes.windll.shell32.ShellExecuteW( - None, "runas", sys.executable, " ".join(sys.argv), None, 1 - ) - except Exception as e: - msg_box = msgbox_frame( - f"错误 - {APP_NAME}", - f"\n请求管理员权限失败\n\n【错误信息】:{e}\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - sys.exit(1) - else: - msg_box = msgbox_frame( - f"权限检测 - {APP_NAME}", - "\n无法获取管理员权限,程序将退出\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - sys.exit(1) - - def check_and_terminate_processes(self): - for proc in psutil.process_iter(["pid", "name"]): - if proc.info["name"] in self.required_exes: - msg_box = msgbox_frame( - f"进程检测 - {APP_NAME}", - f"\n检测到游戏正在运行: {proc.info['name']} \n\n是否终止?\n", - QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, - ) - reply = msg_box.exec() - if reply == QtWidgets.QMessageBox.StandardButton.Yes: - try: - proc.terminate() - proc.wait(timeout=3) - except psutil.AccessDenied: - msg_box = msgbox_frame( - f"错误 - {APP_NAME}", - f"\n无法关闭游戏: {proc.info['name']} \n\n请手动关闭后重启应用\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - sys.exit(1) - else: - msg_box = msgbox_frame( - f"进程检测 - {APP_NAME}", - f"\n未关闭的游戏: {proc.info['name']} \n\n请手动关闭后重启应用\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - sys.exit(1) - -# 下载线程类 -class DownloadThread(QThread): - progress = Signal(dict) - finished = Signal(bool, str) - - def __init__(self, url, _7z_path, game_version, parent=None): - super().__init__(parent) - self.url = url - self._7z_path = _7z_path - self.game_version = game_version - self.process = None - self.is_running = True - - def stop(self): - if self.process and self.process.poll() is None: - self.is_running = False - import subprocess - # 使用 taskkill 强制终止进程及其子进程,并隐藏窗口 - subprocess.run(['taskkill', '/F', '/T', '/PID', str(self.process.pid)], check=True, creationflags=subprocess.CREATE_NO_WINDOW) - self.finished.emit(False, "下载已手动停止。") - - def run(self): - import subprocess - import re - from urllib.parse import urlparse - - try: - aria2c_path = resource_path("aria2c.exe") - download_dir = os.path.dirname(self._7z_path) - file_name = os.path.basename(self._7z_path) - - parsed_url = urlparse(self.url) - referer = f"{parsed_url.scheme}://{parsed_url.netloc}/" - - command = [ - aria2c_path, - '--dir', download_dir, - '--out', file_name, - '--user-agent', UA, - '--referer', referer, - '--header', f'Origin: {referer.rstrip("/")}', - '--header', 'Accept: */*', - '--header', 'Accept-Language: zh-CN,zh;q=0.9,en;q=0.8', - '--header', 'Accept-Encoding: gzip, deflate, br', - '--header', 'Cache-Control: no-cache', - '--header', 'Pragma: no-cache', - '--header', 'DNT: 1', - '--header', 'Sec-Fetch-Dest: empty', - '--header', 'Sec-Fetch-Mode: cors', - '--header', 'Sec-Fetch-Site: same-origin', - '--http-accept-gzip=true', - '--console-log-level=info', - '--summary-interval=1', - '--log-level=info', - '--max-tries=3', - '--retry-wait=2', - '--connect-timeout=60', - '--timeout=60', - '--auto-file-renaming=false', - '--split=16', - '--max-connection-per-server=16', - self.url - ] - - creation_flags = subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 - self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', creationflags=creation_flags) - - # 正则表达式用于解析aria2c的输出 - # 例如: #1 GID[...]( 5%) CN:1 DL:10.5MiB/s ETA:1m30s - progress_pattern = re.compile(r'\((\d{1,3})%\).*?CN:(\d+).*?DL:\s*([^\s]+).*?ETA:\s*([^\s]+)') - - full_output = [] - while self.is_running and self.process.poll() is None: - if self.process.stdout: - line = self.process.stdout.readline() - if not line: - break - else: - break - - full_output.append(line) - print(line.strip()) # 在控制台输出实时日志 - - match = progress_pattern.search(line) - if match: - percent = int(match.group(1)) - threads = match.group(2) - speed = match.group(3) - eta = match.group(4) - self.progress.emit({ - "game": self.game_version, - "percent": percent, - "threads": threads, - "speed": speed, - "eta": eta - }) - - return_code = self.process.wait() - - if not self.is_running: # 如果是手动停止的 - return - - if return_code == 0: - self.progress.emit({ - "game": self.game_version, - "percent": 100, - "threads": "N/A", - "speed": "N/A", - "eta": "完成" - }) - self.finished.emit(True, "") - else: - error_message = f"\nAria2c下载失败,退出码: {return_code}\n\n--- Aria2c 输出 ---\n{''.join(full_output)}\n---------------------\n" - self.finished.emit(False, error_message) - - except Exception as e: - if self.is_running: - self.finished.emit(False, f"\n下载时发生未知错误\n\n【错误信息】: {e}\n") - -# 下载进度窗口类 -class ProgressWindow(QDialog): - def __init__(self, parent=None): - super(ProgressWindow, self).__init__(parent) - self.setWindowTitle(f"下载进度 - {APP_NAME}") - self.resize(450, 180) - self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowCloseButtonHint) - self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowSystemMenuHint) - - layout = QVBoxLayout() - self.game_label = QLabel("正在准备下载...") - self.progress_bar = QProgressBar() - self.progress_bar.setValue(0) - self.stats_label = QLabel("速度: - | 线程: - | 剩余时间: -") - self.stop_button = QtWidgets.QPushButton("停止下载") - - layout.addWidget(self.game_label) - layout.addWidget(self.progress_bar) - layout.addWidget(self.stats_label) - layout.addWidget(self.stop_button) - self.setLayout(layout) - - def update_progress(self, data): - game_version = data.get("game", "未知游戏") - percent = data.get("percent", 0) - speed = data.get("speed", "-") - threads = data.get("threads", "-") - eta = data.get("eta", "-") - - self.game_label.setText(f"正在下载: {game_version}") - self.progress_bar.setValue(int(percent)) - self.stats_label.setText(f"速度: {speed} | 线程: {threads} | 剩余时间: {eta}") - - if percent == 100: - self.stop_button.setEnabled(False) - self.stop_button.setText("下载完成") - QTimer.singleShot(1500, self.accept) - - def closeEvent(self, event): - # 覆盖默认的关闭事件,防止用户通过其他方式关闭窗口 - # 如果需要,可以在这里添加逻辑,例如询问用户是否要停止下载 - event.ignore() - - -# 主窗口类 -class MainWindow(QMainWindow): - def __init__(self): - super().__init__() - - # 初始化UI (从Ui_install.py导入) - self.ui = Ui_MainWindows() - self.ui.setupUi(self) - - icon_data = img_data.get("icon") - if icon_data: - pixmap = load_base64_image(icon_data) - self.setWindowIcon(QIcon(pixmap)) - - # 设置窗口标题为APP_NAME加版本号 - self.setWindowTitle(f"{APP_NAME} v1.0.0") - - # 初始化动画系统 (从animations.py导入) - self.animator = MultiStageAnimations(self.ui) - - # 初始化功能变量 - self.selected_folder = "" - self.installed_status = {f"NEKOPARA Vol.{i}": False for i in range(1, 5)} - self.installed_status["NEKOPARA After"] = False # 添加After的状态 - self.download_queue = deque() - self.current_download_thread = None - self.hash_manager = HashManager(BLOCK_SIZE) - - # 检查管理员权限和进程 - admin_privileges = AdminPrivileges() - admin_privileges.request_admin_privileges() - admin_privileges.check_and_terminate_processes() - - # 创建缓存目录 - if not os.path.exists(PLUGIN): - try: - os.makedirs(PLUGIN) - except OSError as e: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n无法创建缓存位置\n\n使用管理员身份运行或检查文件读写权限\n\n【错误信息】:{e}\n", - ) - sys.exit(1) - - # 连接信号 (使用Ui_install.py中的组件名称) - self.ui.start_install_btn.clicked.connect(self.file_dialog) - self.ui.exit_btn.clicked.connect(self.shutdown_app) - - # “关于”菜单 - about_action = QAction("项目主页", self) - about_action.triggered.connect(self.open_about_page) - version_action = QAction("版本信息", self) - version_action.triggered.connect(self.show_version_info) - self.ui.menu_2.addAction(version_action) - self.ui.menu_2.addAction(about_action) - - # 在窗口显示前设置初始状态 - self.animator.initialize() - - # 窗口显示后延迟100ms启动动画 - QTimer.singleShot(100, self.start_animations) - - def start_animations(self): - self.animator.start_animations() - - def get_install_paths(self): - return { - game: os.path.join(self.selected_folder, info["install_path"]) - for game, info in GAME_INFO.items() - } - - def file_dialog(self): - self.selected_folder = QFileDialog.getExistingDirectory( - self, f"选择游戏所在【上级目录】 {APP_NAME}" - ) - if not self.selected_folder: - QtWidgets.QMessageBox.warning( - self, f"通知 - {APP_NAME}", "\n未选择任何目录,请重新选择\n" - ) - return - self.download_action() - - def get_download_url(self) -> dict: - try: - headers = {"User-Agent": UA} - response = requests.get(CONFIG_URL, headers=headers, timeout=10) - response.raise_for_status() - config_data = response.json() - # 修正键名检查,确保所有必需的键都存在 - required_keys = [f"vol.{i+1}.data" for i in range(4)] + ["after.data"] - if not all(key in config_data for key in required_keys): - missing_keys = [key for key in required_keys if key not in config_data] - raise ValueError(f"配置文件缺少必要的键: {', '.join(missing_keys)}") - - # 修正提取URL的逻辑,确保使用正确的键 - return { - f"vol{i+1}": config_data[f"vol.{i+1}.data"]["url"] for i in range(4) - } | { - "after": config_data["after.data"]["url"] - } - except requests.exceptions.RequestException as e: - status_code = e.response.status_code if e.response is not None else "未知" - try: - error_response = e.response.json() if e.response else {} - json_title = error_response.get("title", "无错误类型") - json_message = error_response.get("message", "无附加错误信息") - except (ValueError, AttributeError): - json_title = "配置文件异常,无法解析错误类型" - json_message = "配置文件异常,无法解析错误信息" - - print(f"获取下载配置时出错: {e}") # 添加详细错误日志 - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n下载配置获取失败\n\n【HTTP状态】:{status_code}\n【错误类型】:{json_title}\n【错误信息】:{json_message}\n", - ) - return {} - except ValueError as e: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n配置文件格式异常\n\n【错误信息】:{e}\n", - ) - return {} - - def download_setting(self, url, game_folder, game_version, _7z_path, plugin_path): - game_exe = { - game: os.path.join( - self.selected_folder, info["install_path"].split("/")[0], info["exe"] - ) - for game, info in GAME_INFO.items() - } - - if ( - game_version not in game_exe - or not os.path.exists(game_exe[game_version]) - or self.installed_status[game_version] - ): - self.installed_status[game_version] = False - self.show_result() - return - - self.progress_window = ProgressWindow(self) - - self.current_download_thread = DownloadThread(url, _7z_path, game_version, self) - self.current_download_thread.progress.connect(self.progress_window.update_progress) - self.current_download_thread.finished.connect( - lambda success, error: self.install_setting( - success, - error, - self.progress_window, - url, - game_folder, - game_version, - _7z_path, - plugin_path, - ) - ) - - # 连接停止按钮的信号 - self.progress_window.stop_button.clicked.connect(self.current_download_thread.stop) - self.current_download_thread.start() - self.progress_window.exec() # 使用exec()以模态方式显示对话框 - - def install_setting( - self, - success, - error, - progress_window, - url, - game_folder, - game_version, - _7z_path, - plugin_path, - ): - if progress_window.isVisible(): - progress_window.reject() - - if not success: # 处理所有失败情况,包括手动停止 - print(f"--- Download Failed: {game_version} ---") - print(error) - print("------------------------------------") - msg_box = QtWidgets.QMessageBox(self) - msg_box.setWindowTitle(f"下载失败 - {APP_NAME}") - # 如果是手动停止,显示特定信息 - if error == "下载已手动停止。": - msg_box.setText(f"\n下载已手动终止: {game_version}\n\n是否重试?") - else: - msg_box.setText(f"\n文件获取失败: {game_version}\n\n是否重试?") - - retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) - next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) - end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) - - icon_data = img_data.get("icon") - if icon_data: - pixmap = load_base64_image(icon_data) - if not pixmap.isNull(): - msg_box.setWindowIcon(QIcon(pixmap)) - - msg_box.exec() - clicked_button = msg_box.clickedButton() - - if clicked_button == retry_button: - self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) - elif clicked_button == next_button: - self.next_download_task() - else: # End button or closed dialog - self.download_queue.clear() - self.after_hash_compare(PLUGIN_HASH) - return # 确保失败后不再执行成功逻辑 - - if success: - try: - msg_box = self.hash_manager.hash_pop_window() - QApplication.processEvents() - with py7zr.SevenZipFile(_7z_path, mode="r") as archive: - archive.extractall(path=PLUGIN) - - # 创建游戏目录(如果不存在) - os.makedirs(game_folder, exist_ok=True) - - # 复制主文件 - shutil.copy(plugin_path, game_folder) - - # 如果是After版本,还需要复制签名文件 - if game_version == "NEKOPARA After": - sig_path = os.path.join(PLUGIN, GAME_INFO[game_version]["sig_path"]) - shutil.copy(sig_path, game_folder) - - self.installed_status[game_version] = True - except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", - ) - finally: - msg_box.close() - self.next_download_task() - self.progress_window.close() - - if success: - try: - msg_box = self.hash_manager.hash_pop_window() - QApplication.processEvents() - with py7zr.SevenZipFile(_7z_path, mode="r") as archive: - archive.extractall(path=PLUGIN) - - # 创建游戏目录(如果不存在) - os.makedirs(game_folder, exist_ok=True) - - # 复制主文件 - shutil.copy(plugin_path, game_folder) - - # 如果是After版本,还需要复制签名文件 - if game_version == "NEKOPARA After": - sig_path = os.path.join(PLUGIN, GAME_INFO[game_version]["sig_path"]) - shutil.copy(sig_path, game_folder) - - self.installed_status[game_version] = True - except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", - ) - finally: - msg_box.close() - self.next_download_task() - else: - print(f"--- Download Failed: {game_version} ---") - print(error) - print("------------------------------------") - msg_box = QtWidgets.QMessageBox(self) - msg_box.setWindowTitle(f"下载失败 {APP_NAME}") - msg_box.setText(f"\n文件获取失败: {game_version}\n\n是否重试?") - - retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) - next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) - end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) - - icon_data = img_data.get("icon") - if icon_data: - pixmap = load_base64_image(icon_data) - if not pixmap.isNull(): - msg_box.setWindowIcon(QIcon(pixmap)) - - msg_box.exec() - clicked_button = msg_box.clickedButton() - - if clicked_button == retry_button: - self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) - elif clicked_button == next_button: - self.next_download_task() - else: # End button or closed dialog - self.download_queue.clear() - self.after_hash_compare(PLUGIN_HASH) - - def pre_hash_compare(self, install_path, game_version, plugin_hash): - msg_box = self.hash_manager.hash_pop_window() - self.hash_manager.cfg_pre_hash_compare( - install_path, game_version, plugin_hash, self.installed_status - ) - msg_box.close() - - def download_action(self): - install_paths = self.get_install_paths() - for game_version, install_path in install_paths.items(): - self.pre_hash_compare(install_path, game_version, PLUGIN_HASH) - - config = self.get_download_url() - if not config: - QtWidgets.QMessageBox.critical( - self, f"错误 - {APP_NAME}", "\n网络状态异常或服务器故障,请重试\n" - ) - return - - # 处理1-4卷 - for i in range(1, 5): - game_version = f"NEKOPARA Vol.{i}" - if not self.installed_status[game_version]: - url = config[f"vol{i}"] - game_folder = os.path.join(self.selected_folder, f"NEKOPARA Vol. {i}") - _7z_path = os.path.join(PLUGIN, f"vol.{i}.7z") - plugin_path = os.path.join( - PLUGIN, GAME_INFO[game_version]["plugin_path"] - ) - self.download_queue.append( - (url, game_folder, game_version, _7z_path, plugin_path) - ) - - # 处理After - game_version = "NEKOPARA After" - if not self.installed_status[game_version]: - url = config["after"] - game_folder = os.path.join(self.selected_folder, "NEKOPARA After") - _7z_path = os.path.join(PLUGIN, "after.7z") - plugin_path = os.path.join( - PLUGIN, GAME_INFO[game_version]["plugin_path"] - ) - self.download_queue.append( - (url, game_folder, game_version, _7z_path, plugin_path) - ) - - self.next_download_task() - - def next_download_task(self): - if not self.download_queue: - self.after_hash_compare(PLUGIN_HASH) - return - # 检查下载线程是否仍在运行,以避免在手动停止后立即开始下一个任务 - if self.current_download_thread and self.current_download_thread.isRunning(): - return - url, game_folder, game_version, _7z_path, plugin_path = self.download_queue.popleft() - self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) - - def on_download_stopped(self): - """当用户点击停止按钮时调用的槽函数""" - # 清空下载队列,因为用户决定停止 - self.download_queue.clear() - # 可以在这里决定是否立即进行哈希比较或显示结果 - self.after_hash_compare(PLUGIN_HASH) - - def after_hash_compare(self, plugin_hash): - msg_box = self.hash_manager.hash_pop_window() - result = self.hash_manager.cfg_after_hash_compare( - self.get_install_paths(), plugin_hash, self.installed_status - ) - msg_box.close() - self.show_result() - return result - - def show_result(self): - installed_version = "\n".join( - [i for i in self.installed_status if self.installed_status[i]] - ) - failed_ver = "\n".join( - [i for i in self.installed_status if not self.installed_status[i]] - ) - QtWidgets.QMessageBox.information( - self, - f"完成 - {APP_NAME}", - f"\n安装结果:\n安装成功数:{len(installed_version.splitlines())} 安装失败数:{len(failed_ver.splitlines())}\n" - f"安装成功的版本:\n{installed_version}\n尚未持有或未使用本工具安装补丁的版本:\n{failed_ver}\n", - ) - - def show_version_info(self): - """显示版本信息对话框""" - msg_box = msgbox_frame( - f"版本信息 - {APP_NAME}", - f"\n{APP_NAME}\n\n版本: {APP_VERSION}\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() - - def open_about_page(self): - webbrowser.open("https://github.com/hyb-oyqq/FRAISEMOE-Addons-Installer-NEXT") - - def closeEvent(self, event): - self.shutdown_app(event) - - def shutdown_app(self, event=None): - reply = QtWidgets.QMessageBox.question( - self, - "退出程序", - "\n是否确定退出?\n", - QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, - QtWidgets.QMessageBox.StandardButton.No, - ) - - if reply == QtWidgets.QMessageBox.StandardButton.Yes: - if ( - self.current_download_thread - and self.current_download_thread.isRunning() - ): - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - "\n当前有下载任务正在进行,完成后再试\n", - ) - if event: - event.ignore() - return - - if os.path.exists(PLUGIN): - for attempt in range(3): - try: - shutil.rmtree(PLUGIN) - break - except Exception as e: - if attempt == 2: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n清理缓存失败\n\n【错误信息】:{e}\n", - ) - if event: - event.accept() - sys.exit(1) - if event: - event.accept() - else: - sys.exit(0) - else: - if event: - event.ignore() +from PySide6.QtWidgets import QApplication +from main_window import MainWindow if __name__ == "__main__": - app = QApplication([]) + app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec()) \ No newline at end of file diff --git a/source/config.py b/source/config.py new file mode 100644 index 0000000..ad66105 --- /dev/null +++ b/source/config.py @@ -0,0 +1,64 @@ +import os +import base64 + +# 配置信息 +app_data = { + "APP_VERSION": "1.0.0", + "APP_NAME": "FRAISEMOE Addons Installer NEXT", + "TEMP": "TEMP", + "CACHE": "FRAISEMOE", + "PLUGIN": "PLUGIN", + "CONFIG_URL": "aHR0cHM6Ly9hcmNoaXZlLm92b2Zpc2guY29tL2FwaS93aWRnZXQvbmVrb3BhcmEvZG93bmxvYWRfdXJsX2RlYnVnLmpzb24=", + "UA": "TW96aWxsYS81LjAgKExpbnV4IGRlYmlhbjEyIEZyYWlzZU1vZTItQWNjZXB0KSBHZWNrby8yMDEwMDEwMSBGaXJlZm94LzExNC4wIEZyYWlzZU1vZTIvMS4wLjA=", + "game_info": { + "NEKOPARA Vol.1": { + "exe": "nekopara_vol1.exe", + "hash": "04b48b231a7f34431431e5027fcc7b27affaa951b8169c541709156acf754f3e", + "install_path": "NEKOPARA Vol. 1/adultsonly.xp3", + "plugin_path": "vol.1/adultsonly.xp3", + }, + "NEKOPARA Vol.2": { + "exe": "nekopara_vol2.exe", + "hash": "b9c00a2b113a1e768bf78400e4f9075ceb7b35349cdeca09be62eb014f0d4b42", + "install_path": "NEKOPARA Vol. 2/adultsonly.xp3", + "plugin_path": "vol.2/adultsonly.xp3", + }, + "NEKOPARA Vol.3": { + "exe": "NEKOPARAvol3.exe", + "hash": "2ce7b223c84592e1ebc3b72079dee1e5e8d064ade15723328a64dee58833b9d5", + "install_path": "NEKOPARA Vol. 3/update00.int", + "plugin_path": "vol.3/update00.int", + }, + "NEKOPARA Vol.4": { + "exe": "nekopara_vol4.exe", + "hash": "4a4a9ae5a75a18aacbe3ab0774d7f93f99c046afe3a777ee0363e8932b90f36a", + "install_path": "NEKOPARA Vol. 4/vol4adult.xp3", + "plugin_path": "vol.4/vol4adult.xp3", + }, + "NEKOPARA After": { + "exe": "nekopara_after.exe", + "hash": "eb26ff6850096a240af8340ba21c5c3232e90f29fb8191e24b6ce701acae0aa9", + "install_path": "NEKOPARA After/afteradult.xp3", + "plugin_path": "after/afteradult.xp3", + "sig_path": "after/afteradult.xp3.sig" + }, + }, +} + +# Base64解码 +def decode_base64(encoded_str): + return base64.b64decode(encoded_str).decode("utf-8") + +# 全局变量 +APP_VERSION = app_data["APP_VERSION"] +APP_NAME = app_data["APP_NAME"] +TEMP = os.getenv(app_data["TEMP"]) or app_data["TEMP"] +CACHE = os.path.join(TEMP, app_data["CACHE"]) +PLUGIN = os.path.join(CACHE, app_data["PLUGIN"]) +CONFIG_URL = decode_base64(app_data["CONFIG_URL"]) +UA = decode_base64(app_data["UA"]) +GAME_INFO = app_data["game_info"] +BLOCK_SIZE = 67108864 +HASH_SIZE = 134217728 +PLUGIN_HASH = {game: info["hash"] for game, info in GAME_INFO.items()} +PROCESS_INFO = {info["exe"]: game for game, info in GAME_INFO.items()} \ No newline at end of file diff --git a/source/download.py b/source/download.py new file mode 100644 index 0000000..e6038b4 --- /dev/null +++ b/source/download.py @@ -0,0 +1,167 @@ +import os +import sys +import subprocess +import re +from urllib.parse import urlparse +from PySide6 import QtCore, QtWidgets +from PySide6.QtCore import (Qt, Signal, QThread, QTimer) +from PySide6.QtWidgets import (QLabel, QProgressBar, QVBoxLayout, QDialog) +from utils import resource_path +from config import APP_NAME, UA + +# 下载线程类 +class DownloadThread(QThread): + progress = Signal(dict) + finished = Signal(bool, str) + + def __init__(self, url, _7z_path, game_version, parent=None): + super().__init__(parent) + self.url = url + self._7z_path = _7z_path + self.game_version = game_version + self.process = None + self.is_running = True + + def stop(self): + if self.process and self.process.poll() is None: + self.is_running = False + # 使用 taskkill 强制终止进程及其子进程,并隐藏窗口 + subprocess.run(['taskkill', '/F', '/T', '/PID', str(self.process.pid)], check=True, creationflags=subprocess.CREATE_NO_WINDOW) + self.finished.emit(False, "下载已手动停止。") + + def run(self): + try: + aria2c_path = resource_path("aria2c.exe") + download_dir = os.path.dirname(self._7z_path) + file_name = os.path.basename(self._7z_path) + + parsed_url = urlparse(self.url) + referer = f"{parsed_url.scheme}://{parsed_url.netloc}/" + + command = [ + aria2c_path, + '--dir', download_dir, + '--out', file_name, + '--user-agent', UA, + '--referer', referer, + '--header', f'Origin: {referer.rstrip("/")}', + '--header', 'Accept: */*', + '--header', 'Accept-Language: zh-CN,zh;q=0.9,en;q=0.8', + '--header', 'Accept-Encoding: gzip, deflate, br', + '--header', 'Cache-Control: no-cache', + '--header', 'Pragma: no-cache', + '--header', 'DNT: 1', + '--header', 'Sec-Fetch-Dest: empty', + '--header', 'Sec-Fetch-Mode: cors', + '--header', 'Sec-Fetch-Site: same-origin', + '--http-accept-gzip=true', + '--console-log-level=info', + '--summary-interval=1', + '--log-level=info', + '--max-tries=3', + '--retry-wait=2', + '--connect-timeout=60', + '--timeout=60', + '--auto-file-renaming=false', + '--split=16', + '--max-connection-per-server=16', + self.url + ] + + creation_flags = subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 + self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', creationflags=creation_flags) + + # 正则表达式用于解析aria2c的输出 + # 例如: #1 GID[...]( 5%) CN:1 DL:10.5MiB/s ETA:1m30s + progress_pattern = re.compile(r'\((\d{1,3})%\).*?CN:(\d+).*?DL:\s*([^\s]+).*?ETA:\s*([^\s]+)') + + full_output = [] + while self.is_running and self.process.poll() is None: + if self.process.stdout: + line = self.process.stdout.readline() + if not line: + break + else: + break + + full_output.append(line) + print(line.strip()) # 在控制台输出实时日志 + + match = progress_pattern.search(line) + if match: + percent = int(match.group(1)) + threads = match.group(2) + speed = match.group(3) + eta = match.group(4) + self.progress.emit({ + "game": self.game_version, + "percent": percent, + "threads": threads, + "speed": speed, + "eta": eta + }) + + return_code = self.process.wait() + + if not self.is_running: # 如果是手动停止的 + return + + if return_code == 0: + self.progress.emit({ + "game": self.game_version, + "percent": 100, + "threads": "N/A", + "speed": "N/A", + "eta": "完成" + }) + self.finished.emit(True, "") + else: + error_message = f"\nAria2c下载失败,退出码: {return_code}\n\n--- Aria2c 输出 ---\n{''.join(full_output)}\n---------------------\n" + self.finished.emit(False, error_message) + + except Exception as e: + if self.is_running: + self.finished.emit(False, f"\n下载时发生未知错误\n\n【错误信息】: {e}\n") + +# 下载进度窗口类 +class ProgressWindow(QDialog): + def __init__(self, parent=None): + super(ProgressWindow, self).__init__(parent) + self.setWindowTitle(f"下载进度 - {APP_NAME}") + self.resize(450, 180) + self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowCloseButtonHint) + self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowSystemMenuHint) + + layout = QVBoxLayout() + self.game_label = QLabel("正在准备下载...") + self.progress_bar = QProgressBar() + self.progress_bar.setValue(0) + self.stats_label = QLabel("速度: - | 线程: - | 剩余时间: -") + self.stop_button = QtWidgets.QPushButton("停止下载") + + layout.addWidget(self.game_label) + layout.addWidget(self.progress_bar) + layout.addWidget(self.stats_label) + layout.addWidget(self.stop_button) + self.setLayout(layout) + + def update_progress(self, data): + game_version = data.get("game", "未知游戏") + percent = data.get("percent", 0) + speed = data.get("speed", "-") + threads = data.get("threads", "-") + eta = data.get("eta", "-") + + self.game_label.setText(f"正在下载: {game_version}") + self.progress_bar.setValue(int(percent)) + self.stats_label.setText(f"速度: {speed} | 线程: {threads} | 剩余时间: {eta}") + + if percent == 100: + self.stop_button.setEnabled(False) + self.stop_button.setText("下载完成") + QTimer.singleShot(1500, self.accept) + + def closeEvent(self, event): + # 覆盖默认的关闭事件,防止用户通过其他方式关闭窗口 + # 如果需要,可以在这里添加逻辑,例如询问用户是否要停止下载 + event.ignore() \ No newline at end of file diff --git a/source/main_window.py b/source/main_window.py new file mode 100644 index 0000000..a8ce0ab --- /dev/null +++ b/source/main_window.py @@ -0,0 +1,471 @@ +import os +import sys +import shutil +import webbrowser +import requests +import py7zr +from collections import deque +from PySide6 import QtWidgets +from PySide6.QtCore import QTimer +from PySide6.QtGui import QIcon, QAction +from PySide6.QtWidgets import QMainWindow, QFileDialog, QApplication + +from Ui_install import Ui_MainWindows +from animations import MultiStageAnimations +from config import ( + APP_NAME, APP_VERSION, PLUGIN, GAME_INFO, BLOCK_SIZE, + PLUGIN_HASH, UA, CONFIG_URL +) +from utils import ( + load_base64_image, HashManager, AdminPrivileges, msgbox_frame +) +from download import DownloadThread, ProgressWindow +from pic_data import img_data + +class MainWindow(QMainWindow): + def __init__(self): + super().__init__() + + # 初始化UI (从Ui_install.py导入) + self.ui = Ui_MainWindows() + self.ui.setupUi(self) + + icon_data = img_data.get("icon") + if icon_data: + pixmap = load_base64_image(icon_data) + self.setWindowIcon(QIcon(pixmap)) + + # 设置窗口标题为APP_NAME加版本号 + self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}") + + # 初始化动画系统 (从animations.py导入) + self.animator = MultiStageAnimations(self.ui) + + # 初始化功能变量 + self.selected_folder = "" + self.installed_status = {f"NEKOPARA Vol.{i}": False for i in range(1, 5)} + self.installed_status["NEKOPARA After"] = False # 添加After的状态 + self.download_queue = deque() + self.current_download_thread = None + self.hash_manager = HashManager(BLOCK_SIZE) + + # 检查管理员权限和进程 + admin_privileges = AdminPrivileges() + admin_privileges.request_admin_privileges() + admin_privileges.check_and_terminate_processes() + + # 创建缓存目录 + if not os.path.exists(PLUGIN): + try: + os.makedirs(PLUGIN) + except OSError as e: + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + f"\n无法创建缓存位置\n\n使用管理员身份运行或检查文件读写权限\n\n【错误信息】:{e}\n", + ) + sys.exit(1) + + # 连接信号 (使用Ui_install.py中的组件名称) + self.ui.start_install_btn.clicked.connect(self.file_dialog) + self.ui.exit_btn.clicked.connect(self.shutdown_app) + + # “关于”菜单 + about_action = QAction("项目主页", self) + about_action.triggered.connect(self.open_about_page) + version_action = QAction("版本信息", self) + version_action.triggered.connect(self.show_version_info) + self.ui.menu_2.addAction(version_action) + self.ui.menu_2.addAction(about_action) + + # 在窗口显示前设置初始状态 + self.animator.initialize() + + # 窗口显示后延迟100ms启动动画 + QTimer.singleShot(100, self.start_animations) + + def start_animations(self): + self.animator.start_animations() + + def get_install_paths(self): + return { + game: os.path.join(self.selected_folder, info["install_path"]) + for game, info in GAME_INFO.items() + } + + def file_dialog(self): + self.selected_folder = QFileDialog.getExistingDirectory( + self, f"选择游戏所在【上级目录】 {APP_NAME}" + ) + if not self.selected_folder: + QtWidgets.QMessageBox.warning( + self, f"通知 - {APP_NAME}", "\n未选择任何目录,请重新选择\n" + ) + return + self.download_action() + + def get_download_url(self) -> dict: + try: + headers = {"User-Agent": UA} + response = requests.get(CONFIG_URL, headers=headers, timeout=10) + response.raise_for_status() + config_data = response.json() + # 修正键名检查,确保所有必需的键都存在 + required_keys = [f"vol.{i+1}.data" for i in range(4)] + ["after.data"] + if not all(key in config_data for key in required_keys): + missing_keys = [key for key in required_keys if key not in config_data] + raise ValueError(f"配置文件缺少必要的键: {', '.join(missing_keys)}") + + # 修正提取URL的逻辑,确保使用正确的键 + return { + f"vol{i+1}": config_data[f"vol.{i+1}.data"]["url"] for i in range(4) + } | { + "after": config_data["after.data"]["url"] + } + except requests.exceptions.RequestException as e: + status_code = e.response.status_code if e.response is not None else "未知" + try: + error_response = e.response.json() if e.response else {} + json_title = error_response.get("title", "无错误类型") + json_message = error_response.get("message", "无附加错误信息") + except (ValueError, AttributeError): + json_title = "配置文件异常,无法解析错误类型" + json_message = "配置文件异常,无法解析错误信息" + + print(f"获取下载配置时出错: {e}") # 添加详细错误日志 + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + f"\n下载配置获取失败\n\n【HTTP状态】:{status_code}\n【错误类型】:{json_title}\n【错误信息】:{json_message}\n", + ) + return {} + except ValueError as e: + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + f"\n配置文件格式异常\n\n【错误信息】:{e}\n", + ) + return {} + + def download_setting(self, url, game_folder, game_version, _7z_path, plugin_path): + game_exe = { + game: os.path.join( + self.selected_folder, info["install_path"].split("/")[0], info["exe"] + ) + for game, info in GAME_INFO.items() + } + + if ( + game_version not in game_exe + or not os.path.exists(game_exe[game_version]) + or self.installed_status[game_version] + ): + self.installed_status[game_version] = False + self.show_result() + return + + self.progress_window = ProgressWindow(self) + + self.current_download_thread = DownloadThread(url, _7z_path, game_version, self) + self.current_download_thread.progress.connect(self.progress_window.update_progress) + self.current_download_thread.finished.connect( + lambda success, error: self.install_setting( + success, + error, + self.progress_window, + url, + game_folder, + game_version, + _7z_path, + plugin_path, + ) + ) + + # 连接停止按钮的信号 + self.progress_window.stop_button.clicked.connect(self.current_download_thread.stop) + self.current_download_thread.start() + self.progress_window.exec() # 使用exec()以模态方式显示对话框 + + def install_setting( + self, + success, + error, + progress_window, + url, + game_folder, + game_version, + _7z_path, + plugin_path, + ): + if progress_window.isVisible(): + progress_window.reject() + + if not success: # 处理所有失败情况,包括手动停止 + print(f"--- Download Failed: {game_version} ---") + print(error) + print("------------------------------------") + msg_box = QtWidgets.QMessageBox(self) + msg_box.setWindowTitle(f"下载失败 - {APP_NAME}") + # 如果是手动停止,显示特定信息 + if error == "下载已手动停止。": + msg_box.setText(f"\n下载已手动终止: {game_version}\n\n是否重试?") + else: + msg_box.setText(f"\n文件获取失败: {game_version}\n\n是否重试?") + + retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) + next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) + end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) + + icon_data = img_data.get("icon") + if icon_data: + pixmap = load_base64_image(icon_data) + if not pixmap.isNull(): + msg_box.setWindowIcon(QIcon(pixmap)) + + msg_box.exec() + clicked_button = msg_box.clickedButton() + + if clicked_button == retry_button: + self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) + elif clicked_button == next_button: + self.next_download_task() + else: # End button or closed dialog + self.download_queue.clear() + self.after_hash_compare(PLUGIN_HASH) + return # 确保失败后不再执行成功逻辑 + + if success: + try: + msg_box = self.hash_manager.hash_pop_window() + QApplication.processEvents() + with py7zr.SevenZipFile(_7z_path, mode="r") as archive: + archive.extractall(path=PLUGIN) + + # 创建游戏目录(如果不存在) + os.makedirs(game_folder, exist_ok=True) + + # 复制主文件 + shutil.copy(plugin_path, game_folder) + + # 如果是After版本,还需要复制签名文件 + if game_version == "NEKOPARA After": + sig_path = os.path.join(PLUGIN, GAME_INFO[game_version]["sig_path"]) + shutil.copy(sig_path, game_folder) + + self.installed_status[game_version] = True + except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", + ) + finally: + msg_box.close() + self.next_download_task() + self.progress_window.close() + + if success: + try: + msg_box = self.hash_manager.hash_pop_window() + QApplication.processEvents() + with py7zr.SevenZipFile(_7z_path, mode="r") as archive: + archive.extractall(path=PLUGIN) + + # 创建游戏目录(如果不存在) + os.makedirs(game_folder, exist_ok=True) + + # 复制主文件 + shutil.copy(plugin_path, game_folder) + + # 如果是After版本,还需要复制签名文件 + if game_version == "NEKOPARA After": + sig_path = os.path.join(PLUGIN, GAME_INFO[game_version]["sig_path"]) + shutil.copy(sig_path, game_folder) + + self.installed_status[game_version] = True + except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", + ) + finally: + msg_box.close() + self.next_download_task() + else: + print(f"--- Download Failed: {game_version} ---") + print(error) + print("------------------------------------") + msg_box = QtWidgets.QMessageBox(self) + msg_box.setWindowTitle(f"下载失败 {APP_NAME}") + msg_box.setText(f"\n文件获取失败: {game_version}\n\n是否重试?") + + retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) + next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) + end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) + + icon_data = img_data.get("icon") + if icon_data: + pixmap = load_base64_image(icon_data) + if not pixmap.isNull(): + msg_box.setWindowIcon(QIcon(pixmap)) + + msg_box.exec() + clicked_button = msg_box.clickedButton() + + if clicked_button == retry_button: + self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) + elif clicked_button == next_button: + self.next_download_task() + else: # End button or closed dialog + self.download_queue.clear() + self.after_hash_compare(PLUGIN_HASH) + + def pre_hash_compare(self, install_path, game_version, plugin_hash): + msg_box = self.hash_manager.hash_pop_window() + self.hash_manager.cfg_pre_hash_compare( + install_path, game_version, plugin_hash, self.installed_status + ) + msg_box.close() + + def download_action(self): + install_paths = self.get_install_paths() + for game_version, install_path in install_paths.items(): + self.pre_hash_compare(install_path, game_version, PLUGIN_HASH) + + config = self.get_download_url() + if not config: + QtWidgets.QMessageBox.critical( + self, f"错误 - {APP_NAME}", "\n网络状态异常或服务器故障,请重试\n" + ) + return + + # 处理1-4卷 + for i in range(1, 5): + game_version = f"NEKOPARA Vol.{i}" + if not self.installed_status[game_version]: + url = config[f"vol{i}"] + game_folder = os.path.join(self.selected_folder, f"NEKOPARA Vol. {i}") + _7z_path = os.path.join(PLUGIN, f"vol.{i}.7z") + plugin_path = os.path.join( + PLUGIN, GAME_INFO[game_version]["plugin_path"] + ) + self.download_queue.append( + (url, game_folder, game_version, _7z_path, plugin_path) + ) + + # 处理After + game_version = "NEKOPARA After" + if not self.installed_status[game_version]: + url = config["after"] + game_folder = os.path.join(self.selected_folder, "NEKOPARA After") + _7z_path = os.path.join(PLUGIN, "after.7z") + plugin_path = os.path.join( + PLUGIN, GAME_INFO[game_version]["plugin_path"] + ) + self.download_queue.append( + (url, game_folder, game_version, _7z_path, plugin_path) + ) + + self.next_download_task() + + def next_download_task(self): + if not self.download_queue: + self.after_hash_compare(PLUGIN_HASH) + return + # 检查下载线程是否仍在运行,以避免在手动停止后立即开始下一个任务 + if self.current_download_thread and self.current_download_thread.isRunning(): + return + url, game_folder, game_version, _7z_path, plugin_path = self.download_queue.popleft() + self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) + + def on_download_stopped(self): + """当用户点击停止按钮时调用的槽函数""" + # 清空下载队列,因为用户决定停止 + self.download_queue.clear() + # 可以在这里决定是否立即进行哈希比较或显示结果 + self.after_hash_compare(PLUGIN_HASH) + + def after_hash_compare(self, plugin_hash): + msg_box = self.hash_manager.hash_pop_window() + result = self.hash_manager.cfg_after_hash_compare( + self.get_install_paths(), plugin_hash, self.installed_status + ) + msg_box.close() + self.show_result() + return result + + def show_result(self): + installed_version = "\n".join( + [i for i in self.installed_status if self.installed_status[i]] + ) + failed_ver = "\n".join( + [i for i in self.installed_status if not self.installed_status[i]] + ) + QtWidgets.QMessageBox.information( + self, + f"完成 - {APP_NAME}", + f"\n安装结果:\n安装成功数:{len(installed_version.splitlines())} 安装失败数:{len(failed_ver.splitlines())}\n" + f"安装成功的版本:\n{installed_version}\n尚未持有或未使用本工具安装补丁的版本:\n{failed_ver}\n", + ) + + def show_version_info(self): + """显示版本信息对话框""" + msg_box = msgbox_frame( + f"版本信息 - {APP_NAME}", + f"\n{APP_NAME}\n\n版本: {APP_VERSION}\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + + def open_about_page(self): + webbrowser.open("https://github.com/hyb-oyqq/FRAISEMOE-Addons-Installer-NEXT") + + def closeEvent(self, event): + self.shutdown_app(event) + + def shutdown_app(self, event=None): + reply = QtWidgets.QMessageBox.question( + self, + "退出程序", + "\n是否确定退出?\n", + QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, + QtWidgets.QMessageBox.StandardButton.No, + ) + + if reply == QtWidgets.QMessageBox.StandardButton.Yes: + if ( + self.current_download_thread + and self.current_download_thread.isRunning() + ): + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + "\n当前有下载任务正在进行,完成后再试\n", + ) + if event: + event.ignore() + return + + if os.path.exists(PLUGIN): + for attempt in range(3): + try: + shutil.rmtree(PLUGIN) + break + except Exception as e: + if attempt == 2: + QtWidgets.QMessageBox.critical( + self, + f"错误 - {APP_NAME}", + f"\n清理缓存失败\n\n【错误信息】:{e}\n", + ) + if event: + event.accept() + sys.exit(1) + if event: + event.accept() + else: + sys.exit(0) + else: + if event: + event.ignore() \ No newline at end of file diff --git a/source/utils.py b/source/utils.py new file mode 100644 index 0000000..215437a --- /dev/null +++ b/source/utils.py @@ -0,0 +1,197 @@ +import os +import sys +import base64 +import hashlib +import concurrent.futures +import ctypes +import psutil +from PySide6 import QtCore, QtWidgets +from PySide6.QtGui import QIcon, QPixmap +from pic_data import img_data +from config import APP_NAME + +def resource_path(relative_path): + if getattr(sys, 'frozen', False): + if hasattr(sys, '_MEIPASS'): + base_path = sys._MEIPASS # type: ignore + else: + base_path = os.path.dirname(sys.executable) + else: + base_path = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(base_path, relative_path) + +def load_base64_image(base64_str): + pixmap = QPixmap() + pixmap.loadFromData(base64.b64decode(base64_str)) + return pixmap + +def msgbox_frame(title, text, buttons=QtWidgets.QMessageBox.StandardButton.NoButton): + msg_box = QtWidgets.QMessageBox() + msg_box.setWindowTitle(title) + msg_box.setWindowModality(QtCore.Qt.WindowModality.WindowModal) + + icon_data = img_data.get("icon") + if icon_data: + pixmap = load_base64_image(icon_data) + if not pixmap.isNull(): + msg_box.setWindowIcon(QIcon(pixmap)) + msg_box.setIconPixmap(pixmap.scaled(64, 64, QtCore.Qt.AspectRatioMode.KeepAspectRatio)) + else: + msg_box.setIcon(QtWidgets.QMessageBox.Icon.Information) + + msg_box.setText(text) + msg_box.setStandardButtons(buttons) + return msg_box + +class HashManager: + def __init__(self, HASH_SIZE): + self.HASH_SIZE = HASH_SIZE + + def hash_calculate(self, file_path): + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(self.HASH_SIZE), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() + + def calculate_hashes_in_parallel(self, file_paths): + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_file = { + executor.submit(self.hash_calculate, path): path for path in file_paths + } + results = {} + for future in concurrent.futures.as_completed(future_to_file): + file_path = future_to_file[future] + try: + results[file_path] = future.result() + except Exception as e: + results[file_path] = None + msg_box = msgbox_frame( + f"错误 - {APP_NAME}", + f"\n文件哈希值计算失败\n\n【错误信息】:{e}\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + return results + + def hash_pop_window(self): + msg_box = msgbox_frame(f"通知 - {APP_NAME}", "\n正在检验文件状态...\n") + msg_box.open() + QtWidgets.QApplication.processEvents() + return msg_box + + def cfg_pre_hash_compare(self, install_path, game_version, plugin_hash, installed_status): + if not os.path.exists(install_path): + installed_status[game_version] = False + return + file_hash = self.hash_calculate(install_path) + if file_hash == plugin_hash[game_version]: + installed_status[game_version] = True + else: + reply = msgbox_frame( + f"文件校验 - {APP_NAME}", + f"\n检测到 {game_version} 的文件哈希值不匹配,是否重新安装?\n", + QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, + ).exec() + if reply == QtWidgets.QMessageBox.StandardButton.Yes: + installed_status[game_version] = False + else: + installed_status[game_version] = True + + def cfg_after_hash_compare(self, install_paths, plugin_hash, installed_status): + passed = True + file_paths = [ + install_paths[game] for game in plugin_hash if installed_status.get(game) + ] + hash_results = self.calculate_hashes_in_parallel(file_paths) + + for game, hash_value in plugin_hash.items(): + if installed_status.get(game): + file_hash = hash_results.get(install_paths[game]) + if file_hash != hash_value: + msg_box = msgbox_frame( + f"文件校验 - {APP_NAME}", + f"\n检测到 {game} 的文件哈希值不匹配\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + installed_status[game] = False + passed = False + break + return passed + +class AdminPrivileges: + def __init__(self): + self.required_exes = [ + "nekopara_vol1.exe", + "nekopara_vol2.exe", + "NEKOPARAvol3.exe", + "nekopara_vol4.exe", + "nekopara_after.exe", + ] + + def is_admin(self): + try: + return ctypes.windll.shell32.IsUserAnAdmin() + except: + return False + + def request_admin_privileges(self): + if not self.is_admin(): + msg_box = msgbox_frame( + f"权限检测 - {APP_NAME}", + "\n需要管理员权限运行此程序\n", + QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, + ) + reply = msg_box.exec() + if reply == QtWidgets.QMessageBox.StandardButton.Yes: + try: + ctypes.windll.shell32.ShellExecuteW( + None, "runas", sys.executable, " ".join(sys.argv), None, 1 + ) + except Exception as e: + msg_box = msgbox_frame( + f"错误 - {APP_NAME}", + f"\n请求管理员权限失败\n\n【错误信息】:{e}\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + sys.exit(1) + else: + msg_box = msgbox_frame( + f"权限检测 - {APP_NAME}", + "\n无法获取管理员权限,程序将退出\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + sys.exit(1) + + def check_and_terminate_processes(self): + for proc in psutil.process_iter(["pid", "name"]): + if proc.info["name"] in self.required_exes: + msg_box = msgbox_frame( + f"进程检测 - {APP_NAME}", + f"\n检测到游戏正在运行: {proc.info['name']} \n\n是否终止?\n", + QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, + ) + reply = msg_box.exec() + if reply == QtWidgets.QMessageBox.StandardButton.Yes: + try: + proc.terminate() + proc.wait(timeout=3) + except psutil.AccessDenied: + msg_box = msgbox_frame( + f"错误 - {APP_NAME}", + f"\n无法关闭游戏: {proc.info['name']} \n\n请手动关闭后重启应用\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + sys.exit(1) + else: + msg_box = msgbox_frame( + f"进程检测 - {APP_NAME}", + f"\n未关闭的游戏: {proc.info['name']} \n\n请手动关闭后重启应用\n", + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + sys.exit(1) \ No newline at end of file