From 363a64c56698e330d45d258e64e31ea59f769802 Mon Sep 17 00:00:00 2001 From: hyb-oyqq <1512383570@qq.com> Date: Thu, 17 Jul 2025 18:02:37 +0800 Subject: [PATCH] =?UTF-8?q?refactor(source):=20=E9=87=8D=E6=9E=84=20cloudf?= =?UTF-8?q?lare=E4=BC=98=E5=8C=96=E5=99=A8=E5=B9=B6=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=E4=B8=8B=E8=BD=BD=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 重构 IpOptimizer 类,优化 CloudflareSpeedTest 工具的调用和处理 - 改进下载功能,包括手动停止下载、错误处理和日志记录 - 更新配置文件,增加日志文件路径和用户代理模板 --- .gitignore | 1 + source/config.py | 8 +- source/download.py | 48 ++-- source/ip_optimizer.py | 216 +++++++++-------- source/main_window.py | 533 ++++++++++++++++++++++++++++------------- source/utils.py | 222 +++++++++++++---- 6 files changed, 694 insertions(+), 334 deletions(-) diff --git a/.gitignore b/.gitignore index 2eb05fb..b8d2878 100644 --- a/.gitignore +++ b/.gitignore @@ -172,3 +172,4 @@ cython_debug/ nuitka-crash-report.xml build.bat +log.txt diff --git a/source/config.py b/source/config.py index e088f51..dd8cc61 100644 --- a/source/config.py +++ b/source/config.py @@ -3,13 +3,13 @@ import base64 # 配置信息 app_data = { - "APP_VERSION": "1.0.0", + "APP_VERSION": "1.1.0", "APP_NAME": "FRAISEMOE Addons Installer NEXT", "TEMP": "TEMP", "CACHE": "FRAISEMOE", "PLUGIN": "PLUGIN", "CONFIG_URL": "aHR0cHM6Ly9hcGkuMncyLnRvcC9hcGkvb3V5YW5ncWlxaS9uZWtvcGFyYS9kb3dubG9hZF91cmwuanNvbg==", - "UA": "TW96aWxsYS81LjAgKExpbnV4IGRlYmlhbjEyIEZyYWlzZU1vZTItQWNjZXB0LU5leHQpIEdlY2tvLzIwMTAwMTAxIEZpcmVmb3gvMTE0LjAgRnJhaXNlTW9lMi8xLjAuMA==", + "UA_TEMPLATE": "Mozilla/5.0 (Linux debian12 FraiseMoe2-Accept-Next) Gecko/20100101 Firefox/114.0 FraiseMoe2/{}", "game_info": { "NEKOPARA Vol.1": { "exe": "nekopara_vol1.exe", @@ -54,9 +54,11 @@ APP_VERSION = app_data["APP_VERSION"] APP_NAME = app_data["APP_NAME"] TEMP = os.getenv(app_data["TEMP"]) or app_data["TEMP"] CACHE = os.path.join(TEMP, app_data["CACHE"]) +CONFIG_FILE = os.path.join(CACHE, "config.json") +LOG_FILE = "log.txt" PLUGIN = os.path.join(CACHE, app_data["PLUGIN"]) CONFIG_URL = decode_base64(app_data["CONFIG_URL"]) -UA = decode_base64(app_data["UA"]) +UA = app_data["UA_TEMPLATE"].format(APP_VERSION) GAME_INFO = app_data["game_info"] BLOCK_SIZE = 67108864 HASH_SIZE = 134217728 diff --git a/source/download.py b/source/download.py index 79f184f..3b98426 100644 --- a/source/download.py +++ b/source/download.py @@ -14,24 +14,30 @@ class DownloadThread(QThread): progress = Signal(dict) finished = Signal(bool, str) - def __init__(self, url, _7z_path, game_version, preferred_ip=None, parent=None): + def __init__(self, url, _7z_path, game_version, parent=None): super().__init__(parent) self.url = url self._7z_path = _7z_path self.game_version = game_version - self.preferred_ip = preferred_ip self.process = None - self.is_running = True + self._is_running = True def stop(self): if self.process and self.process.poll() is None: - self.is_running = False - # 使用 taskkill 强制终止进程及其子进程,并隐藏窗口 - subprocess.run(['taskkill', '/F', '/T', '/PID', str(self.process.pid)], check=True, creationflags=subprocess.CREATE_NO_WINDOW) - self.finished.emit(False, "下载已手动停止。") + self._is_running = False + try: + # 使用 taskkill 强制终止进程及其子进程,并隐藏窗口 + subprocess.run(['taskkill', '/F', '/T', '/PID', str(self.process.pid)], check=True, creationflags=subprocess.CREATE_NO_WINDOW) + except (subprocess.CalledProcessError, FileNotFoundError) as e: + print(f"停止下载进程时出错: {e}") + def run(self): try: + if not self._is_running: + self.finished.emit(False, "下载已手动停止。") + return + aria2c_path = resource_path("aria2c.exe") download_dir = os.path.dirname(self._7z_path) file_name = os.path.basename(self._7z_path) @@ -43,13 +49,6 @@ class DownloadThread(QThread): aria2c_path, ] - # 如果有优选IP,则添加到 aaric2 命令中 - if self.preferred_ip: - hostname = parsed_url.hostname - port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) - command.extend(['--resolve', f'{hostname}:{port}:{self.preferred_ip}']) - print(f"已应用优选IP: {hostname} -> {self.preferred_ip}") - command.extend([ '--dir', download_dir, '--out', file_name, @@ -74,11 +73,19 @@ class DownloadThread(QThread): '--connect-timeout=60', '--timeout=60', '--auto-file-renaming=false', + '--allow-overwrite=true', '--split=16', - '--max-connection-per-server=16', - self.url + '--max-connection-per-server=16' ]) + # 证书验证现在总是需要,因为我们依赖hosts文件 + command.append('--check-certificate=false') + + command.append(self.url) + + # 打印将要执行的命令,用于调试 + print(f"即将执行的 Aria2c 命令: {' '.join(command)}") + creation_flags = subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', creationflags=creation_flags) @@ -87,7 +94,7 @@ class DownloadThread(QThread): progress_pattern = re.compile(r'\((\d{1,3})%\).*?CN:(\d+).*?DL:\s*([^\s]+).*?ETA:\s*([^\s]+)') full_output = [] - while self.is_running and self.process.poll() is None: + while self._is_running and self.process.poll() is None: if self.process.stdout: line = self.process.stdout.readline() if not line: @@ -114,7 +121,8 @@ class DownloadThread(QThread): return_code = self.process.wait() - if not self.is_running: # 如果是手动停止的 + if not self._is_running: # 如果是手动停止的 + self.finished.emit(False, "下载已手动停止。") return if return_code == 0: @@ -131,7 +139,7 @@ class DownloadThread(QThread): self.finished.emit(False, error_message) except Exception as e: - if self.is_running: + if self._is_running: self.finished.emit(False, f"\n下载时发生未知错误\n\n【错误信息】: {e}\n") # 下载进度窗口类 @@ -144,7 +152,7 @@ class ProgressWindow(QDialog): self.setWindowFlags(self.windowFlags() & ~Qt.WindowType.WindowSystemMenuHint) layout = QVBoxLayout() - self.game_label = QLabel("正在准备下载...") + self.game_label = QLabel("正在启动下载,请稍后...") self.progress_bar = QProgressBar() self.progress_bar.setValue(0) self.stats_label = QLabel("速度: - | 线程: - | 剩余时间: -") diff --git a/source/ip_optimizer.py b/source/ip_optimizer.py index 47cd2dc..af9ac68 100644 --- a/source/ip_optimizer.py +++ b/source/ip_optimizer.py @@ -2,128 +2,144 @@ import os import re import subprocess import sys +import time from urllib.parse import urlparse from utils import resource_path -def get_optimal_ip(url: str) -> str | None: - """ - 使用 CloudflareSpeedTest 工具获取给定 URL 的最优 Cloudflare IP。 +class IpOptimizer: + def __init__(self): + self.process = None - Args: - url: 需要进行优选的下载链接。 + def get_optimal_ip(self, url: str) -> str | None: + """ + 使用 CloudflareSpeedTest 工具获取给定 URL 的最优 Cloudflare IP。 - Returns: - 最优的 IP 地址字符串,如果找不到则返回 None。 - """ - try: - # 1. 定位 CloudflareSpeedTest 工具路径,使用新的文件名 cfst.exe - cst_path = resource_path("cfst.exe") - if not os.path.exists(cst_path): - print(f"错误: cfst.exe 未在资源路径中找到。") - return None + Args: + url: 需要进行优选的下载链接。 - # 2. 构建命令行参数 - # -p 1: 只输出最快的一个 IP - # -o "": 不生成 result.csv 文件 - # -url: 指定我们自己的测速链接 - # -f: 指定 ip.txt 的路径 - ip_txt_path = resource_path("ip.txt") - command = [ - cst_path, - "-p", "1", - "-o", "", - "-url", url, - "-f", ip_txt_path, - "-dd", - ] + Returns: + 最优的 IP 地址字符串,如果找不到则返回 None。 + """ + try: + cst_path = resource_path("cfst.exe") + if not os.path.exists(cst_path): + print(f"错误: cfst.exe 未在资源路径中找到。") + return None - # 3. 执行命令并捕获输出 - # 使用 CREATE_NO_WINDOW 标志来隐藏控制台窗口 - creation_flags = subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - encoding='utf-8', - errors='replace', - creationflags=creation_flags, - bufsize=1, # 使用行缓冲 - ) + ip_txt_path = resource_path("ip.txt") + command = [ + cst_path, + "-p", "1", + "-o", "", + "-url", url, + "-f", ip_txt_path, + "-dd", + ] - # 4. 实时读取、打印并解析输出 - print("--- CloudflareSpeedTest 实时输出 ---") - - if not process.stdout: - print("错误: 无法获取子进程的输出流。") - return None + creation_flags = subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 + + print("--- CloudflareSpeedTest 开始执行 ---") + + self.process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding='utf-8', + errors='replace', + creationflags=creation_flags, + bufsize=0 + ) - # 根据用户提供的最新格式更新正则表达式 - # 格式: IP Sent Recv Loss Avg-Latency DL-Speed Region - ip_pattern = re.compile(r'^\s*([\d\.]+)\s+\d+\s+\d+\s+[\d\.]+%?\s+[\d\.]+\s+[\d\.]+\s+.*$') - fd = process.stdout.fileno() - buffer = b'' - - while process.poll() is None: - try: - chunk = os.read(fd, 1024) - if not chunk: + # 立即向 stdin 发送换行符,以便程序在 Windows 下正常退出 + if self.process.stdin: + try: + self.process.stdin.write('\n') + self.process.stdin.flush() + except: + pass + finally: + self.process.stdin.close() + + ip_pattern = re.compile(r'^\s*([\d\.]+)\s+\d+\s+\d+\s+[\d\.]+%?\s+[\d\.]+\s+[\d\.]+\s+.*$') + + stdout = self.process.stdout + if not stdout: + print("错误: 无法获取子进程的输出流。") + return None + + optimal_ip = None + timeout_counter = 0 + max_timeout = 60 + + while True: + if self.process.poll() is not None: break - buffer += chunk - - while b'\n' in buffer or b'\r' in buffer: - end_index_n = buffer.find(b'\n') - end_index_r = buffer.find(b'\r') - end_index = min(end_index_n, end_index_r) if end_index_n != -1 and end_index_r != -1 else max(end_index_n, end_index_r) + try: + ready = True + try: + line = stdout.readline() + except: + ready = False + + if not ready or not line: + timeout_counter += 1 + if timeout_counter > max_timeout: + print("超时: CloudflareSpeedTest 响应超时") + break + time.sleep(1) + continue - line_bytes = buffer[:end_index] - line = line_bytes.decode('utf-8', errors='replace').strip() + timeout_counter = 0 - if line: - print(line) - match = ip_pattern.match(line) + cleaned_line = line.strip() + if cleaned_line: + print(cleaned_line) + match = ip_pattern.match(cleaned_line) if match: optimal_ip = match.group(1) print(f"找到最优 IP: {optimal_ip}, 正在终止测速进程...") - print("------------------------------------") - process.terminate() # 终止进程 - return optimal_ip - - buffer = buffer[end_index+1:] + break + + except Exception as e: + print(f"读取输出时发生错误: {e}") + break + + self.stop() + + print("--- CloudflareSpeedTest 执行结束 ---") + return optimal_ip - except (IOError, OSError): - break - - # 处理可能残留在缓冲区的数据 - if buffer: - line = buffer.decode('utf-8', errors='replace').strip() - if line: - print(line) - match = ip_pattern.match(line) - if match: - optimal_ip = match.group(1) - print(f"找到最优 IP: {optimal_ip}") - print("------------------------------------") - process.terminate() # 确保在返回前终止进程 - return optimal_ip + except Exception as e: + print(f"执行 CloudflareSpeedTest 时发生错误: {e}") + return None - print("------------------------------------") - - # 5. 在循环结束后,检查是否找到了 IP - # (IP 在循环内部找到并返回) - process.wait() # 等待进程完全终止 - print("警告: 未能在 CloudflareSpeedTest 输出中找到最优 IP。") - return None - - except Exception as e: - print(f"执行 CloudflareSpeedTest 时发生错误: {e}") - return None + def stop(self): + if self.process and self.process.poll() is None: + print("正在终止 CloudflareSpeedTest 进程...") + try: + if self.process.stdin and not self.process.stdin.closed: + self.process.stdin.write('\n') + self.process.stdin.flush() + self.process.stdin.close() + except: + pass + + try: + self.process.terminate() + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait() + print("CloudflareSpeedTest 进程已终止。") if __name__ == '__main__': # 用于直接测试此模块 test_url = "https://speed.cloudflare.com/__down?during=download&bytes=104857600" - ip = get_optimal_ip(test_url) + optimizer = IpOptimizer() + ip = optimizer.get_optimal_ip(test_url) if ip: print(f"为 {test_url} 找到的最优 IP 是: {ip}") else: diff --git a/source/main_window.py b/source/main_window.py index 16fd8aa..134c795 100644 --- a/source/main_window.py +++ b/source/main_window.py @@ -4,25 +4,115 @@ import shutil import webbrowser import requests import py7zr +import json +from urllib.parse import urlparse from collections import deque from PySide6 import QtWidgets -from PySide6.QtCore import QTimer, Qt +from PySide6.QtCore import QTimer, Qt, QThread, Signal from PySide6.QtGui import QIcon, QAction -from PySide6.QtWidgets import QMainWindow, QFileDialog, QApplication +from PySide6.QtWidgets import QMainWindow, QFileDialog, QApplication, QMessageBox, QPushButton from Ui_install import Ui_MainWindows from animations import MultiStageAnimations from config import ( - APP_NAME, APP_VERSION, PLUGIN, GAME_INFO, BLOCK_SIZE, - PLUGIN_HASH, UA, CONFIG_URL + APP_NAME, APP_VERSION, PLUGIN, GAME_INFO, BLOCK_SIZE, + PLUGIN_HASH, UA, CONFIG_URL, LOG_FILE ) from utils import ( - load_base64_image, HashManager, AdminPrivileges, msgbox_frame + load_base64_image, HashManager, AdminPrivileges, msgbox_frame, + load_config, save_config, HostsManager, censor_url ) from download import DownloadThread, ProgressWindow -from ip_optimizer import get_optimal_ip +from ip_optimizer import IpOptimizer from pic_data import img_data + +class Logger: + def __init__(self, filename, stream): + self.terminal = stream + self.log = open(filename, "w", encoding="utf-8") + + def write(self, message): + censored_message = censor_url(message) + self.terminal.write(censored_message) + self.log.write(censored_message) + self.flush() + + def flush(self): + self.terminal.flush() + self.log.flush() + + def close(self): + self.log.close() + +class IpOptimizerThread(QThread): + finished = Signal(str) + + def __init__(self, url, parent=None): + super().__init__(parent) + self.url = url + self.optimizer = IpOptimizer() + + def run(self): + optimal_ip = self.optimizer.get_optimal_ip(self.url) + self.finished.emit(optimal_ip if optimal_ip else "") + + def stop(self): + self.optimizer.stop() + +class HashThread(QThread): + pre_finished = Signal(dict) + after_finished = Signal(dict) + + def __init__(self, mode, install_paths, plugin_hash, installed_status, parent=None): + super().__init__(parent) + self.mode = mode + self.install_paths = install_paths + self.plugin_hash = plugin_hash + self.installed_status = installed_status + # 每个线程都应该有自己的HashManager实例 + self.hash_manager = HashManager(BLOCK_SIZE) + + def run(self): + if self.mode == "pre": + updated_status = self.hash_manager.cfg_pre_hash_compare( + self.install_paths, self.plugin_hash, self.installed_status + ) + self.pre_finished.emit(updated_status) + elif self.mode == "after": + result = self.hash_manager.cfg_after_hash_compare( + self.install_paths, self.plugin_hash, self.installed_status + ) + self.after_finished.emit(result) + + +class ExtractionThread(QThread): + finished = Signal(bool, str, str) # success, error_message, game_version + + def __init__(self, _7z_path, game_folder, plugin_path, game_version, parent=None): + super().__init__(parent) + self._7z_path = _7z_path + self.game_folder = game_folder + self.plugin_path = plugin_path + self.game_version = game_version + + def run(self): + try: + with py7zr.SevenZipFile(self._7z_path, mode="r") as archive: + archive.extractall(path=PLUGIN) + + os.makedirs(self.game_folder, exist_ok=True) + shutil.copy(self.plugin_path, self.game_folder) + + if self.game_version == "NEKOPARA After": + sig_path = os.path.join(PLUGIN, GAME_INFO[self.game_version]["sig_path"]) + shutil.copy(sig_path, self.game_folder) + + self.finished.emit(True, "", self.game_version) + except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: + self.finished.emit(False, f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", self.game_version) + + class MainWindow(QMainWindow): def __init__(self): super().__init__() @@ -49,12 +139,25 @@ class MainWindow(QMainWindow): self.download_queue = deque() self.current_download_thread = None self.hash_manager = HashManager(BLOCK_SIZE) + self.hash_thread = None + self.extraction_thread = None + self.hash_msg_box = None + self.optimized_ip = None + self.optimization_done = False # 标记是否已执行过优选 + self.logger = None + self.hosts_manager = HostsManager() # 实例化HostsManager + + # 加载配置 + self.config = load_config() # 检查管理员权限和进程 admin_privileges = AdminPrivileges() admin_privileges.request_admin_privileges() admin_privileges.check_and_terminate_processes() + # 备份hosts文件 + self.hosts_manager.backup() + # 创建缓存目录 if not os.path.exists(PLUGIN): try: @@ -78,6 +181,16 @@ class MainWindow(QMainWindow): about_action.triggered.connect(self.show_about_dialog) self.ui.menu_2.addAction(project_home_action) self.ui.menu_2.addAction(about_action) + + # “设置”菜单 + self.debug_action = QAction("Debug模式", self, checkable=True) + self.debug_action.setChecked(self.config.get("debug_mode", False)) + self.debug_action.triggered.connect(self.toggle_debug_mode) + self.ui.menu.addAction(self.debug_action) + + # 根据初始配置决定是否开启Debug模式 + if self.debug_action.isChecked(): + self.start_logging() # 在窗口显示前设置初始状态 self.animator.initialize() @@ -87,6 +200,39 @@ class MainWindow(QMainWindow): def start_animations(self): self.animator.start_animations() + + def toggle_debug_mode(self, checked): + self.config["debug_mode"] = checked + save_config(self.config) + if checked: + self.start_logging() + else: + self.stop_logging() + + def start_logging(self): + if self.logger is None: + try: + if os.path.exists(LOG_FILE): + os.remove(LOG_FILE) + # 保存原始的 stdout 和 stderr + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + # 创建 Logger 实例 + self.logger = Logger(LOG_FILE, self.original_stdout) + sys.stdout = self.logger + sys.stderr = self.logger + print("--- Debug mode enabled ---") + except (IOError, OSError) as e: + QtWidgets.QMessageBox.critical(self, "错误", f"无法创建日志文件: {e}") + self.logger = None + + def stop_logging(self): + if self.logger: + print("--- Debug mode disabled ---") + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + self.logger.close() + self.logger = None def get_install_paths(self): return { @@ -108,9 +254,32 @@ class MainWindow(QMainWindow): def get_download_url(self) -> dict: try: headers = {"User-Agent": UA} + if self.debug_action.isChecked(): + print("--- Starting to get download URL ---") + print(f"DEBUG: Requesting URL: {CONFIG_URL}") + print(f"DEBUG: Using Headers: {headers}") + response = requests.get(CONFIG_URL, headers=headers, timeout=10) + + if self.debug_action.isChecked(): + print(f"DEBUG: Response Status Code: {response.status_code}") + print(f"DEBUG: Response Headers: {response.headers}") + print(f"DEBUG: Response Text: {response.text}") + response.raise_for_status() - config_data = response.json() + + # 从响应文本中提取有效的 JSON 部分 + response_text = response.text + json_start_index = response_text.find('{') + if json_start_index == -1: + raise ValueError("响应中未找到有效的 JSON 对象") + + json_text = response_text[json_start_index:] + config_data = json.loads(json_text) + + if self.debug_action.isChecked(): + print(f"DEBUG: Parsed JSON data: {json.dumps(config_data, indent=2)}") + # 修正键名检查,确保所有必需的键都存在 required_keys = [f"vol.{i+1}.data" for i in range(4)] + ["after.data"] if not all(key in config_data for key in required_keys): @@ -118,11 +287,16 @@ class MainWindow(QMainWindow): raise ValueError(f"配置文件缺少必要的键: {', '.join(missing_keys)}") # 修正提取URL的逻辑,确保使用正确的键 - return { + urls = { f"vol{i+1}": config_data[f"vol.{i+1}.data"]["url"] for i in range(4) } | { "after": config_data["after.data"]["url"] } + if self.debug_action.isChecked(): + print(f"DEBUG: Extracted URLs: {urls}") + print("--- Finished getting download URL successfully ---") + return urls + except requests.exceptions.RequestException as e: status_code = e.response.status_code if e.response is not None else "未知" try: @@ -133,7 +307,9 @@ class MainWindow(QMainWindow): json_title = "配置文件异常,无法解析错误类型" json_message = "配置文件异常,无法解析错误信息" - print(f"获取下载配置时出错: {e}") # 添加详细错误日志 + if self.debug_action.isChecked(): + print(f"ERROR: Failed to get download config due to RequestException: {e}") + QtWidgets.QMessageBox.critical( self, f"错误 - {APP_NAME}", @@ -141,6 +317,9 @@ class MainWindow(QMainWindow): ) return {} except ValueError as e: + if self.debug_action.isChecked(): + print(f"ERROR: Failed to parse download config due to ValueError: {e}") + QtWidgets.QMessageBox.critical( self, f"错误 - {APP_NAME}", @@ -166,20 +345,38 @@ class MainWindow(QMainWindow): return self.progress_window = ProgressWindow(self) + self.start_download_with_ip(self.optimized_ip, url, _7z_path, game_version, game_folder, plugin_path) - # --- IP 优选逻辑 --- - self.progress_window.game_label.setText("正在优化下载线路,请稍候...") - QApplication.processEvents() # 刷新UI以显示上述消息 - - preferred_ip = get_optimal_ip(url) + + def on_optimization_and_hosts_finished(self, ip): + self.optimized_ip = ip + self.optimization_done = True + if hasattr(self, 'optimizing_msg_box') and self.optimizing_msg_box: + if self.optimizing_msg_box.isVisible(): + self.optimizing_msg_box.accept() + self.optimizing_msg_box = None + + if not ip: + QtWidgets.QMessageBox.warning(self, f"优选失败 - {APP_NAME}", "\n未能找到合适的Cloudflare IP,将使用默认网络进行下载。\n") + else: + if self.download_queue: + first_url = self.download_queue[0][0] + hostname = urlparse(first_url).hostname + if self.hosts_manager.apply_ip(hostname, ip): + QtWidgets.QMessageBox.information(self, f"成功 - {APP_NAME}", f"\n已将优选IP ({ip}) 应用到hosts文件。\n") + else: + QtWidgets.QMessageBox.critical(self, f"错误 - {APP_NAME}", "\n修改hosts文件失败,请检查程序是否以管理员权限运行。\n") + self.setEnabled(True) + self.next_download_task() + + def start_download_with_ip(self, preferred_ip, url, _7z_path, game_version, game_folder, plugin_path): if preferred_ip: print(f"已为 {game_version} 获取到优选IP: {preferred_ip}") else: print(f"未能为 {game_version} 获取优选IP,将使用默认线路。") - # --- IP 优选逻辑结束 --- - self.current_download_thread = DownloadThread(url, _7z_path, game_version, preferred_ip, self) + self.current_download_thread = DownloadThread(url, _7z_path, game_version, self) self.current_download_thread.progress.connect(self.progress_window.update_progress) self.current_download_thread.finished.connect( lambda success, error: self.install_setting( @@ -194,10 +391,9 @@ class MainWindow(QMainWindow): ) ) - # 连接停止按钮的信号 self.progress_window.stop_button.clicked.connect(self.current_download_thread.stop) self.current_download_thread.start() - self.progress_window.exec() # 使用exec()以模态方式显示对话框 + self.progress_window.exec() def install_setting( self, @@ -213,174 +409,131 @@ class MainWindow(QMainWindow): if progress_window.isVisible(): progress_window.reject() - if not success: # 处理所有失败情况,包括手动停止 + if not success: print(f"--- Download Failed: {game_version} ---") print(error) print("------------------------------------") msg_box = QtWidgets.QMessageBox(self) msg_box.setWindowTitle(f"下载失败 - {APP_NAME}") - # 如果是手动停止,显示特定信息 - if error == "下载已手动停止。": - msg_box.setText(f"\n下载已手动终止: {game_version}\n\n是否重试?") + msg_box.setText(f"\n文件获取失败: {game_version}\n错误: {error}\n\n是否重试?") + + retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) + next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) + end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) + + msg_box.exec() + clicked_button = msg_box.clickedButton() + + if clicked_button == retry_button: + self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) + elif clicked_button == next_button: + self.next_download_task() else: - msg_box.setText(f"\n文件获取失败: {game_version}\n\n是否重试?") - - retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) - next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) - end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) - - icon_data = img_data.get("icon") - if icon_data: - pixmap = load_base64_image(icon_data) - if not pixmap.isNull(): - msg_box.setWindowIcon(QIcon(pixmap)) + self.on_download_stopped() + return - msg_box.exec() - clicked_button = msg_box.clickedButton() + # --- Start Extraction in a new thread --- + self.hash_msg_box = self.hash_manager.hash_pop_window() + self.setEnabled(False) + + self.extraction_thread = ExtractionThread(_7z_path, game_folder, plugin_path, game_version, self) + self.extraction_thread.finished.connect(self.on_extraction_finished) + self.extraction_thread.start() - if clicked_button == retry_button: - self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) - elif clicked_button == next_button: - self.next_download_task() - else: # End button or closed dialog - self.download_queue.clear() - self.after_hash_compare(PLUGIN_HASH) - return # 确保失败后不再执行成功逻辑 + def on_extraction_finished(self, success, error_message, game_version): + if self.hash_msg_box and self.hash_msg_box.isVisible(): + self.hash_msg_box.close() + self.setEnabled(True) - if success: - try: - msg_box = self.hash_manager.hash_pop_window() - QApplication.processEvents() - with py7zr.SevenZipFile(_7z_path, mode="r") as archive: - archive.extractall(path=PLUGIN) - - # 创建游戏目录(如果不存在) - os.makedirs(game_folder, exist_ok=True) - - # 复制主文件 - shutil.copy(plugin_path, game_folder) - - # 如果是After版本,还需要复制签名文件 - if game_version == "NEKOPARA After": - sig_path = os.path.join(PLUGIN, GAME_INFO[game_version]["sig_path"]) - shutil.copy(sig_path, game_folder) - - self.installed_status[game_version] = True - except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", - ) - finally: - msg_box.close() - self.next_download_task() - self.progress_window.close() - - if success: - try: - msg_box = self.hash_manager.hash_pop_window() - QApplication.processEvents() - with py7zr.SevenZipFile(_7z_path, mode="r") as archive: - archive.extractall(path=PLUGIN) - - # 创建游戏目录(如果不存在) - os.makedirs(game_folder, exist_ok=True) - - # 复制主文件 - shutil.copy(plugin_path, game_folder) - - # 如果是After版本,还需要复制签名文件 - if game_version == "NEKOPARA After": - sig_path = os.path.join(PLUGIN, GAME_INFO[game_version]["sig_path"]) - shutil.copy(sig_path, game_folder) - - self.installed_status[game_version] = True - except (py7zr.Bad7zFile, FileNotFoundError, Exception) as e: - QtWidgets.QMessageBox.critical( - self, - f"错误 - {APP_NAME}", - f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", - ) - finally: - msg_box.close() - self.next_download_task() + if not success: + QtWidgets.QMessageBox.critical(self, f"错误 - {APP_NAME}", error_message) + self.installed_status[game_version] = False else: - print(f"--- Download Failed: {game_version} ---") - print(error) - print("------------------------------------") - msg_box = QtWidgets.QMessageBox(self) - msg_box.setWindowTitle(f"下载失败 {APP_NAME}") - msg_box.setText(f"\n文件获取失败: {game_version}\n\n是否重试?") - - retry_button = msg_box.addButton("重试", QtWidgets.QMessageBox.ButtonRole.YesRole) - next_button = msg_box.addButton("下一个", QtWidgets.QMessageBox.ButtonRole.NoRole) - end_button = msg_box.addButton("结束", QtWidgets.QMessageBox.ButtonRole.RejectRole) - - icon_data = img_data.get("icon") - if icon_data: - pixmap = load_base64_image(icon_data) - if not pixmap.isNull(): - msg_box.setWindowIcon(QIcon(pixmap)) - - msg_box.exec() - clicked_button = msg_box.clickedButton() - - if clicked_button == retry_button: - self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) - elif clicked_button == next_button: - self.next_download_task() - else: # End button or closed dialog - self.download_queue.clear() - self.after_hash_compare(PLUGIN_HASH) - - def pre_hash_compare(self, install_path, game_version, plugin_hash): - msg_box = self.hash_manager.hash_pop_window() - self.hash_manager.cfg_pre_hash_compare( - install_path, game_version, plugin_hash, self.installed_status - ) - msg_box.close() + self.installed_status[game_version] = True + + self.next_download_task() def download_action(self): + # 询问用户是否使用Cloudflare加速 + msg_box = QMessageBox(self) + msg_box.setWindowTitle(f"下载优化 - {APP_NAME}") + msg_box.setText("\n是否愿意通过Cloudflare加速来优化下载速度?\n\n这将临时修改系统的hosts文件,并需要管理员权限。") + msg_box.setIcon(QMessageBox.Icon.Question) + + yes_button = msg_box.addButton("是,开启加速", QMessageBox.ButtonRole.YesRole) + no_button = msg_box.addButton("否,直接下载", QMessageBox.ButtonRole.NoRole) + + msg_box.exec() + + use_optimization = msg_box.clickedButton() == yes_button + + self.hash_msg_box = self.hash_manager.hash_pop_window() + self.setEnabled(False) + install_paths = self.get_install_paths() - for game_version, install_path in install_paths.items(): - self.pre_hash_compare(install_path, game_version, PLUGIN_HASH) + + self.hash_thread = HashThread("pre", install_paths, PLUGIN_HASH, self.installed_status, self) + # 将用户选择传递给哈希完成后的处理函数 + self.hash_thread.pre_finished.connect(lambda status: self.on_pre_hash_finished(status, use_optimization)) + self.hash_thread.start() + + def on_pre_hash_finished(self, updated_status, use_optimization): + self.installed_status = updated_status + if self.hash_msg_box and self.hash_msg_box.isVisible(): + self.hash_msg_box.accept() + self.hash_msg_box = None config = self.get_download_url() if not config: QtWidgets.QMessageBox.critical( self, f"错误 - {APP_NAME}", "\n网络状态异常或服务器故障,请重试\n" ) + self.setEnabled(True) return - # 处理1-4卷 + # --- 填充下载队列 --- for i in range(1, 5): game_version = f"NEKOPARA Vol.{i}" - if not self.installed_status[game_version]: - url = config[f"vol{i}"] + if not self.installed_status.get(game_version, False): + url = config.get(f"vol{i}") + if not url: continue game_folder = os.path.join(self.selected_folder, f"NEKOPARA Vol. {i}") _7z_path = os.path.join(PLUGIN, f"vol.{i}.7z") - plugin_path = os.path.join( - PLUGIN, GAME_INFO[game_version]["plugin_path"] - ) - self.download_queue.append( - (url, game_folder, game_version, _7z_path, plugin_path) - ) - - # 处理After - game_version = "NEKOPARA After" - if not self.installed_status[game_version]: - url = config["after"] - game_folder = os.path.join(self.selected_folder, "NEKOPARA After") - _7z_path = os.path.join(PLUGIN, "after.7z") - plugin_path = os.path.join( - PLUGIN, GAME_INFO[game_version]["plugin_path"] - ) - self.download_queue.append( - (url, game_folder, game_version, _7z_path, plugin_path) - ) + plugin_path = os.path.join(PLUGIN, GAME_INFO[game_version]["plugin_path"]) + self.download_queue.append((url, game_folder, game_version, _7z_path, plugin_path)) - self.next_download_task() + game_version = "NEKOPARA After" + if not self.installed_status.get(game_version, False): + url = config.get("after") + if url: + game_folder = os.path.join(self.selected_folder, "NEKOPARA After") + _7z_path = os.path.join(PLUGIN, "after.7z") + plugin_path = os.path.join(PLUGIN, GAME_INFO[game_version]["plugin_path"]) + self.download_queue.append((url, game_folder, game_version, _7z_path, plugin_path)) + + if not self.download_queue: + self.after_hash_compare(PLUGIN_HASH) + return + + if use_optimization and not self.optimization_done: + first_url = self.download_queue[0][0] + self.optimizing_msg_box = msgbox_frame( + f"通知 - {APP_NAME}", + "\n正在优选Cloudflare IP,请稍候...\n\n这可能需要5-10分钟,请耐心等待喵~" + ) + # 我们不再提供“跳过”按钮,因为用户已经做出了选择 + self.optimizing_msg_box.setStandardButtons(QMessageBox.StandardButton.NoButton) + self.optimizing_msg_box.setWindowModality(Qt.WindowModality.ApplicationModal) + self.optimizing_msg_box.open() + + self.ip_optimizer_thread = IpOptimizerThread(first_url) + # 优选完成后,需要修改hosts并开始下载 + self.ip_optimizer_thread.finished.connect(self.on_optimization_and_hosts_finished) + self.ip_optimizer_thread.start() + else: + # 如果用户选择不优化,或已经优化过,直接开始下载 + self.setEnabled(True) + self.next_download_task() def next_download_task(self): if not self.download_queue: @@ -389,24 +542,66 @@ class MainWindow(QMainWindow): # 检查下载线程是否仍在运行,以避免在手动停止后立即开始下一个任务 if self.current_download_thread and self.current_download_thread.isRunning(): return + + # 在开始下载前,确保hosts文件已修改(如果需要) + # 这里的逻辑保持不变,因为hosts文件应该在队列开始前就被修改了 url, game_folder, game_version, _7z_path, plugin_path = self.download_queue.popleft() self.download_setting(url, game_folder, game_version, _7z_path, plugin_path) def on_download_stopped(self): - """当用户点击停止按钮时调用的槽函数""" + """当用户点击停止按钮或选择结束时调用的槽函数""" + # 停止IP优选线程 + if hasattr(self, 'ip_optimizer_thread') and self.ip_optimizer_thread and self.ip_optimizer_thread.isRunning(): + self.ip_optimizer_thread.stop() + self.ip_optimizer_thread.wait() + if hasattr(self, 'optimizing_msg_box') and self.optimizing_msg_box: + if self.optimizing_msg_box.isVisible(): + self.optimizing_msg_box.accept() + self.optimizing_msg_box = None + + # 停止当前可能仍在运行的下载线程 + if self.current_download_thread and self.current_download_thread.isRunning(): + self.current_download_thread.stop() + self.current_download_thread.wait() # 等待线程完全终止 + # 清空下载队列,因为用户决定停止 self.download_queue.clear() + + # 确保进度窗口已关闭 + if hasattr(self, 'progress_window') and self.progress_window.isVisible(): + self.progress_window.reject() + # 可以在这里决定是否立即进行哈希比较或显示结果 - self.after_hash_compare(PLUGIN_HASH) + print("下载已全部停止。") + self.setEnabled(True) # 恢复主窗口交互 + self.show_result() def after_hash_compare(self, plugin_hash): - msg_box = self.hash_manager.hash_pop_window() - result = self.hash_manager.cfg_after_hash_compare( - self.get_install_paths(), plugin_hash, self.installed_status - ) - msg_box.close() + self.hash_msg_box = self.hash_manager.hash_pop_window() + self.setEnabled(False) + + install_paths = self.get_install_paths() + + self.hash_thread = HashThread("after", install_paths, plugin_hash, self.installed_status, self) + self.hash_thread.after_finished.connect(self.on_after_hash_finished) + self.hash_thread.start() + + def on_after_hash_finished(self, result): + if self.hash_msg_box and self.hash_msg_box.isVisible(): + self.hash_msg_box.close() + self.setEnabled(True) + + if not result["passed"]: + game = result.get("game", "未知游戏") + message = result.get("message", "发生未知错误。") + msg_box = msgbox_frame( + f"文件校验失败 - {APP_NAME}", + message, + QtWidgets.QMessageBox.StandardButton.Ok, + ) + msg_box.exec() + self.show_result() - return result def show_result(self): installed_version = "\n".join( @@ -445,6 +640,8 @@ class MainWindow(QMainWindow): self.shutdown_app(event) def shutdown_app(self, event=None): + self.hosts_manager.restore() # 恢复hosts文件 + self.stop_logging() # 确保在退出时停止日志记录 reply = QtWidgets.QMessageBox.question( self, "退出程序", diff --git a/source/utils.py b/source/utils.py index 215437a..28ecc65 100644 --- a/source/utils.py +++ b/source/utils.py @@ -4,19 +4,21 @@ import base64 import hashlib import concurrent.futures import ctypes +import json import psutil from PySide6 import QtCore, QtWidgets +import re from PySide6.QtGui import QIcon, QPixmap from pic_data import img_data -from config import APP_NAME +from config import APP_NAME, CONFIG_FILE def resource_path(relative_path): + """获取资源的绝对路径,适用于开发环境和PyInstaller打包环境""" if getattr(sys, 'frozen', False): - if hasattr(sys, '_MEIPASS'): - base_path = sys._MEIPASS # type: ignore - else: - base_path = os.path.dirname(sys.executable) + # PyInstaller创建的临时文件夹,并将路径存储在_MEIPASS中 + base_path = getattr(sys, '_MEIPASS', os.path.dirname(sys.executable)) else: + # 在开发环境中运行 base_path = os.path.dirname(os.path.abspath(__file__)) return os.path.join(base_path, relative_path) @@ -35,7 +37,7 @@ def msgbox_frame(title, text, buttons=QtWidgets.QMessageBox.StandardButton.NoBut pixmap = load_base64_image(icon_data) if not pixmap.isNull(): msg_box.setWindowIcon(QIcon(pixmap)) - msg_box.setIconPixmap(pixmap.scaled(64, 64, QtCore.Qt.AspectRatioMode.KeepAspectRatio)) + msg_box.setIconPixmap(pixmap.scaled(64, 64, QtCore.Qt.AspectRatioMode.KeepAspectRatio, QtCore.Qt.TransformationMode.SmoothTransformation)) else: msg_box.setIcon(QtWidgets.QMessageBox.Icon.Information) @@ -43,6 +45,24 @@ def msgbox_frame(title, text, buttons=QtWidgets.QMessageBox.StandardButton.NoBut msg_box.setStandardButtons(buttons) return msg_box +def load_config(): + if not os.path.exists(CONFIG_FILE): + return {} + try: + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + +def save_config(config): + try: + os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) + with open(CONFIG_FILE, "w", encoding="utf-8") as f: + json.dump(config, f, indent=4) + except IOError as e: + print(f"Error saving config: {e}") + + class HashManager: def __init__(self, HASH_SIZE): self.HASH_SIZE = HASH_SIZE @@ -65,13 +85,8 @@ class HashManager: try: results[file_path] = future.result() except Exception as e: - results[file_path] = None - msg_box = msgbox_frame( - f"错误 - {APP_NAME}", - f"\n文件哈希值计算失败\n\n【错误信息】:{e}\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() + results[file_path] = None # Mark as failed + print(f"Error calculating hash for {file_path}: {e}") return results def hash_pop_window(self): @@ -80,26 +95,26 @@ class HashManager: QtWidgets.QApplication.processEvents() return msg_box - def cfg_pre_hash_compare(self, install_path, game_version, plugin_hash, installed_status): - if not os.path.exists(install_path): - installed_status[game_version] = False - return - file_hash = self.hash_calculate(install_path) - if file_hash == plugin_hash[game_version]: - installed_status[game_version] = True - else: - reply = msgbox_frame( - f"文件校验 - {APP_NAME}", - f"\n检测到 {game_version} 的文件哈希值不匹配,是否重新安装?\n", - QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No, - ).exec() - if reply == QtWidgets.QMessageBox.StandardButton.Yes: - installed_status[game_version] = False - else: - installed_status[game_version] = True + def cfg_pre_hash_compare(self, install_paths, plugin_hash, installed_status): + status_copy = installed_status.copy() + + for game_version, install_path in install_paths.items(): + if not os.path.exists(install_path): + status_copy[game_version] = False + continue + + try: + file_hash = self.hash_calculate(install_path) + if file_hash == plugin_hash.get(game_version): + status_copy[game_version] = True + else: + status_copy[game_version] = False + except Exception: + status_copy[game_version] = False + + return status_copy def cfg_after_hash_compare(self, install_paths, plugin_hash, installed_status): - passed = True file_paths = [ install_paths[game] for game in plugin_hash if installed_status.get(game) ] @@ -107,18 +122,25 @@ class HashManager: for game, hash_value in plugin_hash.items(): if installed_status.get(game): - file_hash = hash_results.get(install_paths[game]) - if file_hash != hash_value: - msg_box = msgbox_frame( - f"文件校验 - {APP_NAME}", - f"\n检测到 {game} 的文件哈希值不匹配\n", - QtWidgets.QMessageBox.StandardButton.Ok, - ) - msg_box.exec() + file_path = install_paths[game] + file_hash = hash_results.get(file_path) + + if file_hash is None: installed_status[game] = False - passed = False - break - return passed + return { + "passed": False, + "game": game, + "message": f"\n无法计算 {game} 的文件哈希值,文件可能已损坏或被占用。\n" + } + + if file_hash != hash_value: + installed_status[game] = False + return { + "passed": False, + "game": game, + "message": f"\n检测到 {game} 的文件哈希值不匹配。\n" + } + return {"passed": True} class AdminPrivileges: def __init__(self): @@ -194,4 +216,118 @@ class AdminPrivileges: QtWidgets.QMessageBox.StandardButton.Ok, ) msg_box.exec() - sys.exit(1) \ No newline at end of file + sys.exit(1) + +class HostsManager: + def __init__(self): + self.hosts_path = os.path.join(os.environ['SystemRoot'], 'System32', 'drivers', 'etc', 'hosts') + self.backup_path = os.path.join(os.path.dirname(self.hosts_path), f'hosts.bak.{APP_NAME}') + self.original_content = None + self.modified = False + + def backup(self): + if not AdminPrivileges().is_admin(): + print("需要管理员权限来备份hosts文件。") + return False + try: + with open(self.hosts_path, 'r', encoding='utf-8') as f: + self.original_content = f.read() + with open(self.backup_path, 'w', encoding='utf-8') as f: + f.write(self.original_content) + print(f"Hosts文件已备份到: {self.backup_path}") + return True + except IOError as e: + print(f"备份hosts文件失败: {e}") + msg_box = msgbox_frame(f"错误 - {APP_NAME}", f"\n无法备份hosts文件,请检查权限。\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok) + msg_box.exec() + return False + + def apply_ip(self, hostname, ip_address): + if not self.original_content: + if not self.backup(): + return False + + if not self.original_content: # 再次检查,确保backup成功 + print("无法读取hosts文件内容,操作中止。") + return False + + if not AdminPrivileges().is_admin(): + print("需要管理员权限来修改hosts文件。") + return False + + try: + lines = self.original_content.splitlines() + new_lines = [line for line in lines if not (hostname in line and line.strip().startswith(ip_address))] + + new_entry = f"{ip_address}\t{hostname}" + new_lines.append(f"\n# Added by {APP_NAME}") + new_lines.append(new_entry) + + with open(self.hosts_path, 'w', encoding='utf-8') as f: + f.write('\n'.join(new_lines)) + + self.modified = True + print(f"Hosts文件已更新: {new_entry}") + return True + except IOError as e: + print(f"修改hosts文件失败: {e}") + msg_box = msgbox_frame(f"错误 - {APP_NAME}", f"\n无法修改hosts文件,请检查权限。\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok) + msg_box.exec() + return False + + def restore(self): + if not self.modified: + if os.path.exists(self.backup_path): + try: + os.remove(self.backup_path) + except OSError: + pass + return True + + if not AdminPrivileges().is_admin(): + print("需要管理员权限来恢复hosts文件。") + return False + + if self.original_content: + try: + with open(self.hosts_path, 'w', encoding='utf-8') as f: + f.write(self.original_content) + self.modified = False + print("Hosts文件已从内存恢复。") + if os.path.exists(self.backup_path): + try: + os.remove(self.backup_path) + except OSError: + pass + return True + except IOError as e: + print(f"从内存恢复hosts文件失败: {e}") + return self.restore_from_backup_file() + else: + return self.restore_from_backup_file() + + def restore_from_backup_file(self): + if not os.path.exists(self.backup_path): + print("未找到hosts备份文件,无法恢复。") + return False + try: + with open(self.backup_path, 'r', encoding='utf-8') as bf: + backup_content = bf.read() + with open(self.hosts_path, 'w', encoding='utf-8') as hf: + hf.write(backup_content) + os.remove(self.backup_path) + self.modified = False + print("Hosts文件已从备份文件恢复。") + return True + except (IOError, OSError) as e: + print(f"从备份文件恢复hosts失败: {e}") + msg_box = msgbox_frame(f"警告 - {APP_NAME}", f"\n自动恢复hosts文件失败,请手动从 {self.backup_path} 恢复。\n\n【错误信息】:{e}\n", QtWidgets.QMessageBox.StandardButton.Ok) + msg_box.exec() + return False + +def censor_url(text): + """Censors URLs in a given text string.""" + if not isinstance(text, str): + text = str(text) + url_pattern = re.compile(r'https?://[^\s/$.?#].[^\s]*') + return url_pattern.sub('***URL HIDDEN***', text) \ No newline at end of file