import os import sys import base64 import hashlib import concurrent.futures import ctypes import json import psutil from PySide6 import QtCore, QtWidgets import re from PySide6.QtGui import QIcon, QPixmap from data.config import APP_NAME, CONFIG_FILE def resource_path(relative_path): """获取资源的绝对路径,适用于开发环境和Nuitka打包环境""" if getattr(sys, 'frozen', False): # Nuitka/PyInstaller创建的临时文件夹,并将路径存储在_MEIPASS中或与可执行文件同目录 if hasattr(sys, '_MEIPASS'): base_path = sys._MEIPASS else: base_path = os.path.dirname(sys.executable) else: # 在开发环境中运行 base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) # 处理特殊的可执行文件和数据文件路径 if relative_path in ("aria2c-fast_x64.exe", "cfst.exe"): return os.path.join(base_path, 'bin', relative_path) elif relative_path in ("ip.txt", "ipv6.txt"): return os.path.join(base_path, 'data', relative_path) return os.path.join(base_path, relative_path) def load_base64_image(base64_str): pixmap = QPixmap() pixmap.loadFromData(base64.b64decode(base64_str)) return pixmap def load_image_from_file(file_path): """加载图像文件到QPixmap Args: file_path: 图像文件路径 Returns: QPixmap: 加载的图像 """ if os.path.exists(file_path): return QPixmap(file_path) return QPixmap() def msgbox_frame(title, text, buttons=QtWidgets.QMessageBox.StandardButton.NoButton): msg_box = QtWidgets.QMessageBox() msg_box.setWindowTitle(title) msg_box.setWindowModality(QtCore.Qt.WindowModality.WindowModal) # 直接加载图标文件 icon_path = resource_path(os.path.join("IMG", "ICO", "icon.png")) if os.path.exists(icon_path): pixmap = QPixmap(icon_path) if not pixmap.isNull(): msg_box.setWindowIcon(QIcon(pixmap)) msg_box.setIconPixmap(pixmap.scaled(64, 64, QtCore.Qt.AspectRatioMode.KeepAspectRatio, QtCore.Qt.TransformationMode.SmoothTransformation)) else: msg_box.setIcon(QtWidgets.QMessageBox.Icon.Information) msg_box.setText(text) msg_box.setStandardButtons(buttons) return msg_box def load_config(): if not os.path.exists(CONFIG_FILE): return {} try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} def save_config(config): try: os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(config, f, indent=4) except IOError as e: print(f"Error saving config: {e}") class HashManager: def __init__(self, HASH_SIZE): self.HASH_SIZE = HASH_SIZE def hash_calculate(self, file_path): sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(self.HASH_SIZE), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def calculate_hashes_in_parallel(self, file_paths): with concurrent.futures.ThreadPoolExecutor() as executor: future_to_file = { executor.submit(self.hash_calculate, path): path for path in file_paths } results = {} for future in concurrent.futures.as_completed(future_to_file): file_path = future_to_file[future] try: results[file_path] = future.result() except Exception as e: results[file_path] = None # Mark as failed print(f"Error calculating hash for {file_path}: {e}") return results def hash_pop_window(self, check_type="default"): """显示文件检验窗口 Args: check_type: 检查类型,可以是 'pre'(预检查), 'after'(后检查), 'extraction'(解压后检查) Returns: QMessageBox: 消息框实例 """ message = "\n正在检验文件状态...\n" if check_type == "pre": message = "\n正在检查游戏文件以确定需要安装的补丁...\n" elif check_type == "after": message = "\n正在检验本地文件完整性...\n" elif check_type == "extraction": message = "\n正在验证下载的解压文件完整性...\n" msg_box = msgbox_frame(f"通知 - {APP_NAME}", message) msg_box.open() QtWidgets.QApplication.processEvents() return msg_box def cfg_pre_hash_compare(self, install_paths, plugin_hash, installed_status): status_copy = installed_status.copy() for game_version, install_path in install_paths.items(): if not os.path.exists(install_path): status_copy[game_version] = False continue try: file_hash = self.hash_calculate(install_path) if file_hash == plugin_hash.get(game_version): status_copy[game_version] = True else: status_copy[game_version] = False except Exception: status_copy[game_version] = False return status_copy def cfg_after_hash_compare(self, install_paths, plugin_hash, installed_status): file_paths = [ install_paths[game] for game in plugin_hash if installed_status.get(game) ] hash_results = self.calculate_hashes_in_parallel(file_paths) for game, hash_value in plugin_hash.items(): if installed_status.get(game): file_path = install_paths[game] file_hash = hash_results.get(file_path) if file_hash is None: installed_status[game] = False return { "passed": False, "game": game, "message": f"\n无法计算 {game} 的文件哈希值,文件可能已损坏或被占用。\n" } if file_hash != hash_value: installed_status[game] = False return { "passed": False, "game": game, "message": f"\n检测到 {game} 的文件哈希值不匹配。\n" } return {"passed": True} class AdminPrivileges: def __init__(self): self.required_exes = [ "nekopara_vol1.exe", "nekopara_vol2.exe", "NEKOPARAvol3.exe", "NEKOPARAvol3.exe.nocrack", "nekopara_vol4.exe", "nekopara_after.exe", ] def is_admin(self): try: return ctypes.windll.shell32.IsUserAnAdmin() except: return False def request_admin_privileges(self): if not self.is_admin(): msg_box = msgbox_frame( f"权限检测 - {APP_NAME}", "\n需要管理员权限运行此程序\n", QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, ) reply = msg_box.exec() if reply == QtWidgets.QMessageBox.StandardButton.Yes: try: ctypes.windll.shell32.ShellExecuteW( None, "runas", sys.executable, " ".join(sys.argv), None, 1 ) except Exception as e: msg_box = msgbox_frame( f"错误 - {APP_NAME}", f"\n请求管理员权限失败\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok, ) msg_box.exec() sys.exit(1) else: msg_box = msgbox_frame( f"权限检测 - {APP_NAME}", "\n无法获取管理员权限,程序将退出\n", QtWidgets.QMessageBox.StandardButton.Ok, ) msg_box.exec() sys.exit(1) def check_and_terminate_processes(self): for proc in psutil.process_iter(["pid", "name"]): proc_name = proc.info["name"].lower() if proc.info["name"] else "" # 检查进程名是否匹配任何需要终止的游戏进程 for exe in self.required_exes: if exe.lower() == proc_name: # 获取不带.nocrack的游戏名称用于显示 display_name = exe.replace(".nocrack", "") msg_box = msgbox_frame( f"进程检测 - {APP_NAME}", f"\n检测到游戏正在运行: {display_name} \n\n是否终止?\n", QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, ) reply = msg_box.exec() if reply == QtWidgets.QMessageBox.StandardButton.Yes: try: proc.terminate() proc.wait(timeout=3) except psutil.AccessDenied: msg_box = msgbox_frame( f"错误 - {APP_NAME}", f"\n无法关闭游戏: {display_name} \n\n请手动关闭后重启应用\n", QtWidgets.QMessageBox.StandardButton.Ok, ) msg_box.exec() sys.exit(1) else: msg_box = msgbox_frame( f"进程检测 - {APP_NAME}", f"\n未关闭的游戏: {display_name} \n\n请手动关闭后重启应用\n", QtWidgets.QMessageBox.StandardButton.Ok, ) msg_box.exec() sys.exit(1) class HostsManager: def __init__(self): self.hosts_path = os.path.join(os.environ['SystemRoot'], 'System32', 'drivers', 'etc', 'hosts') self.backup_path = os.path.join(os.path.dirname(self.hosts_path), f'hosts.bak.{APP_NAME}') self.original_content = None self.modified = False self.modified_hostnames = set() # 跟踪被修改的主机名 def backup(self): if not AdminPrivileges().is_admin(): print("需要管理员权限来备份hosts文件。") return False try: with open(self.hosts_path, 'r', encoding='utf-8') as f: self.original_content = f.read() with open(self.backup_path, 'w', encoding='utf-8') as f: f.write(self.original_content) print(f"Hosts文件已备份到: {self.backup_path}") return True except IOError as e: print(f"备份hosts文件失败: {e}") msg_box = msgbox_frame(f"错误 - {APP_NAME}", f"\n无法备份hosts文件,请检查权限。\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok) msg_box.exec() return False def clean_hostname_entries(self, hostname): """清理hosts文件中指定域名的所有记录 Args: hostname: 要清理的域名 Returns: bool: 清理是否成功 """ if not self.original_content: if not self.backup(): return False # 确保original_content不为None if not self.original_content: print("无法读取hosts文件内容,操作中止。") return False if not AdminPrivileges().is_admin(): print("需要管理员权限来修改hosts文件。") return False try: lines = self.original_content.splitlines() new_lines = [line for line in lines if hostname not in line] # 如果没有变化,不需要写入 if len(new_lines) == len(lines): print(f"Hosts文件中没有找到 {hostname} 的记录") return True with open(self.hosts_path, 'w', encoding='utf-8') as f: f.write('\n'.join(new_lines)) # 更新原始内容 self.original_content = '\n'.join(new_lines) print(f"已从hosts文件中清理 {hostname} 的记录") return True except IOError as e: print(f"清理hosts文件失败: {e}") return False def apply_ip(self, hostname, ip_address, clean=True): if not self.original_content: if not self.backup(): return False if not self.original_content: # 再次检查,确保backup成功 print("无法读取hosts文件内容,操作中止。") return False if not AdminPrivileges().is_admin(): print("需要管理员权限来修改hosts文件。") return False try: # 首先清理已有的同域名记录(如果需要) if clean: self.clean_hostname_entries(hostname) # 然后添加新记录 lines = self.original_content.splitlines() new_entry = f"{ip_address}\t{hostname}" lines.append(f"\n# Added by {APP_NAME}") lines.append(new_entry) with open(self.hosts_path, 'w', encoding='utf-8') as f: f.write('\n'.join(lines)) # 更新原始内容 self.original_content = '\n'.join(lines) self.modified = True # 记录被修改的主机名,用于最终清理 self.modified_hostnames.add(hostname) print(f"Hosts文件已更新: {new_entry}") return True except IOError as e: print(f"修改hosts文件失败: {e}") msg_box = msgbox_frame(f"错误 - {APP_NAME}", f"\n无法修改hosts文件,请检查权限。\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok) msg_box.exec() return False def check_and_clean_all_entries(self): """检查并清理所有由本应用程序添加的hosts记录 Returns: bool: 清理是否成功 """ if not AdminPrivileges().is_admin(): print("需要管理员权限来检查和清理hosts文件。") return False try: # 读取当前hosts文件内容 with open(self.hosts_path, 'r', encoding='utf-8') as f: current_content = f.read() lines = current_content.splitlines() new_lines = [] skip_next = False for line in lines: # 如果上一行是我们的注释标记,跳过当前行 if skip_next: skip_next = False continue # 检查是否是我们添加的注释行 if f"# Added by {APP_NAME}" in line: skip_next = True # 跳过下一行(实际的hosts记录) continue # 保留其他所有行 new_lines.append(line) # 检查是否有变化 if len(new_lines) == len(lines): print("Hosts文件中没有找到由本应用添加的记录") return True # 写回清理后的内容 with open(self.hosts_path, 'w', encoding='utf-8') as f: f.write('\n'.join(new_lines)) print(f"已清理所有由 {APP_NAME} 添加的hosts记录") return True except IOError as e: print(f"检查和清理hosts文件失败: {e}") return False def restore(self): if not self.modified: if os.path.exists(self.backup_path): try: os.remove(self.backup_path) except OSError: pass # 即使没有修改过,也检查一次是否有残留 self.check_and_clean_all_entries() return True if not AdminPrivileges().is_admin(): print("需要管理员权限来恢复hosts文件。") return False if self.original_content: try: with open(self.hosts_path, 'w', encoding='utf-8') as f: f.write(self.original_content) self.modified = False print("Hosts文件已从内存恢复。") if os.path.exists(self.backup_path): try: os.remove(self.backup_path) except OSError: pass # 恢复后再检查一次是否有残留 self.check_and_clean_all_entries() return True except IOError as e: print(f"从内存恢复hosts文件失败: {e}") return self.restore_from_backup_file() else: return self.restore_from_backup_file() def restore_from_backup_file(self): if not os.path.exists(self.backup_path): print("未找到hosts备份文件,无法恢复。") # 即使没有备份文件,也尝试清理可能的残留 self.check_and_clean_all_entries() return False try: with open(self.backup_path, 'r', encoding='utf-8') as bf: backup_content = bf.read() with open(self.hosts_path, 'w', encoding='utf-8') as hf: hf.write(backup_content) os.remove(self.backup_path) self.modified = False print("Hosts文件已从备份文件恢复。") # 恢复后再检查一次是否有残留 self.check_and_clean_all_entries() return True except (IOError, OSError) as e: print(f"从备份文件恢复hosts失败: {e}") msg_box = msgbox_frame(f"警告 - {APP_NAME}", f"\n自动恢复hosts文件失败,请手动从 {self.backup_path} 恢复。\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok) msg_box.exec() # 尽管恢复失败,仍然尝试清理可能的残留 self.check_and_clean_all_entries() return False def censor_url(text): """Censors URLs in a given text string.""" if not isinstance(text, str): text = str(text) url_pattern = re.compile(r'https?://[^\s/$.?#].[^\s]*') return url_pattern.sub('***URL HIDDEN***', text)