feat(core): 增强加载对话框和哈希验证功能

- 在主窗口中添加显示和隐藏加载对话框的方法,提升用户体验。
- 更新补丁切换处理程序,增加调试模式参数以优化批量操作。
- 在离线模式管理器中增强哈希校验失败的日志记录,提供更详细的错误信息。
- 优化解压线程,增加对签名文件的处理逻辑,确保补丁安装的完整性和准确性。
- 在哈希验证线程中添加超时检测和进度更新,提升验证过程的可控性和用户反馈。
This commit is contained in:
hyb-oyqq
2025-08-12 15:49:43 +08:00
parent 1b6d275433
commit 2c91319d5f
5 changed files with 473 additions and 62 deletions

View File

@@ -94,7 +94,7 @@ class PatchToggleHandler(QObject):
selected_game_dirs = {game: games_with_patch[game]["dir"] for game in selected_games if game in games_with_patch}
self._execute_batch_toggle(selected_game_dirs, operation)
self._execute_batch_toggle(selected_game_dirs, operation, self.debug_manager.debug_mode)
def _handle_multiple_games(self, game_dirs, debug_mode):
"""

View File

@@ -624,6 +624,10 @@ class OfflineModeManager:
self.main_window.close_hash_msg_box()
if not result["passed"]:
# 记录校验失败信息
logger.error(f"===== {game_version} 哈希校验失败 =====")
logger.error(f"校验失败消息: {result.get('message', '无错误消息')}")
# 校验失败,删除已解压的文件并提示重新安装
error_message = result["message"]
@@ -637,12 +641,38 @@ class OfflineModeManager:
if game_version in game_dirs and game_version in GAME_INFO:
game_dir = game_dirs[game_version]
install_path = os.path.join(game_dir, os.path.basename(GAME_INFO[game_version]["install_path"]))
logger.info(f"找到安装路径: {install_path}")
# 记录安装路径的文件信息
if os.path.exists(install_path):
file_size = os.path.getsize(install_path)
logger.info(f"文件存在,大小: {file_size} 字节")
# 检查是否为NEKOPARA After记录签名文件信息
if game_version == "NEKOPARA After":
sig_path = f"{install_path}.sig"
if os.path.exists(sig_path):
sig_size = os.path.getsize(sig_path)
logger.info(f"签名文件存在: {sig_path}, 大小: {sig_size} 字节")
else:
logger.info(f"签名文件不存在: {sig_path}")
else:
logger.warning(f"文件不存在: {install_path}")
else:
logger.warning(f"未找到 {game_version} 的安装路径")
# 如果找到安装路径,尝试删除已解压的文件
if install_path and os.path.exists(install_path):
try:
os.remove(install_path)
logger.info(f"已删除校验失败的文件: {install_path}")
# 检查是否为NEKOPARA After同时删除签名文件
if game_version == "NEKOPARA After":
sig_path = f"{install_path}.sig"
if os.path.exists(sig_path):
os.remove(sig_path)
logger.info(f"已删除签名文件: {sig_path}")
except Exception as e:
logger.error(f"删除文件失败: {e}")
@@ -658,6 +688,7 @@ class OfflineModeManager:
else:
# 校验通过,更新安装状态
self.main_window.installed_status[game_version] = True
logger.info(f"===== {game_version} 哈希校验通过 =====")
# 添加到已安装游戏列表
if game_version not in self.installed_games:

View File

@@ -744,6 +744,18 @@ class MainWindow(QMainWindow):
from workers.extraction_thread import ExtractionThread
return ExtractionThread(patch_file, game_folder, plugin_path, game_version, self)
def show_loading_dialog(self, message):
"""显示加载对话框
Args:
message: 要显示的消息
"""
self.ui_manager.show_loading_dialog(message)
def hide_loading_dialog(self):
"""隐藏加载对话框"""
self.ui_manager.hide_loading_dialog()

View File

@@ -1,8 +1,14 @@
import os
import shutil
import py7zr
import tempfile
import traceback
from PySide6.QtCore import QThread, Signal
from config.config import PLUGIN, GAME_INFO
import time # 用于时间计算
import threading
import queue
from concurrent.futures import TimeoutError
class ExtractionThread(QThread):
finished = Signal(bool, str, str) # success, error_message, game_version
@@ -27,6 +33,14 @@ class ExtractionThread(QThread):
except Exception:
pass
# 记录调试信息
from utils.logger import setup_logger
debug_logger = setup_logger("extraction_thread")
debug_logger.info(f"====== 开始处理 {self.game_version} 补丁文件 ======")
debug_logger.info(f"压缩包路径: {self._7z_path}")
debug_logger.info(f"游戏目录: {self.game_folder}")
debug_logger.info(f"插件路径: {self.plugin_path}")
update_progress(0, f"开始处理 {self.game_version} 的补丁文件...")
# 支持外部请求中断
@@ -46,85 +60,275 @@ class ExtractionThread(QThread):
# 对于NEKOPARA After还需要复制签名文件
if self.game_version == "NEKOPARA After":
try:
update_progress(70, f"正在处理 {self.game_version} 的签名文件...")
# 从已解压文件的目录中获取签名文件
extracted_dir = os.path.dirname(self.extracted_path)
sig_filename = os.path.basename(GAME_INFO[self.game_version]["sig_path"])
sig_path = os.path.join(extracted_dir, sig_filename)
# 尝试多种可能的签名文件路径
if not os.path.exists(sig_path):
# 尝试在同级目录查找
sig_path = os.path.join(os.path.dirname(extracted_dir), sig_filename)
# 如果签名文件存在,则复制它
if os.path.exists(sig_path):
shutil.copy(sig_path, self.game_folder)
target_sig = os.path.join(self.game_folder, sig_filename)
shutil.copy(sig_path, target_sig)
update_progress(80, f"签名文件复制完成")
else:
# 如果签名文件不存在,则使用原始路径
sig_path = os.path.join(PLUGIN, GAME_INFO[self.game_version]["sig_path"])
shutil.copy(sig_path, self.game_folder)
if os.path.exists(sig_path):
target_sig = os.path.join(self.game_folder, os.path.basename(sig_path))
shutil.copy(sig_path, target_sig)
update_progress(80, f"使用内置签名文件完成")
else:
update_progress(80, f"未找到签名文件,继续安装主补丁文件")
except Exception as sig_err:
# 签名文件处理失败时记录错误但不中断主流程
update_progress(80, f"签名文件处理失败: {str(sig_err)}")
update_progress(100, f"{self.game_version} 补丁文件处理完成")
self.finished.emit(True, "", self.game_version)
return
# 否则解压源压缩包到临时目录,再复制目标文件
target_filename = os.path.basename(self.plugin_path)
target_path = os.path.join(self.game_folder, target_filename)
update_progress(10, f"正在打开 {self.game_version} 的补丁压缩包...")
with py7zr.SevenZipFile(self._7z_path, mode="r") as archive:
# 获取压缩包内的文件列表
file_list = archive.getnames()
# 详细记录压缩包中的所有文件
debug_logger.info(f"压缩包内容分析:")
debug_logger.info(f"- 文件总数: {len(file_list)}")
for i, f in enumerate(file_list):
debug_logger.info(f" {i+1}. {f} - 类型: {'文件夹' if f.endswith('/') or f.endswith('\\') else '文件'}")
update_progress(20, f"正在分析 {self.game_version} 的补丁文件...")
# 查找压缩包内的目标文件
target_file_in_archive = None
for file_path in file_list:
if target_filename in file_path:
target_file_in_archive = file_path
break
if not target_file_in_archive:
raise FileNotFoundError(f"在压缩包中未找到目标文件 {target_filename}")
update_progress(30, f"正在解压 {self.game_version} 的补丁文件...\n(在此过程中可能会卡顿或无响应,请不要关闭软件)")
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
# 解压特定文件到临时目录
archive.extract(path=temp_dir, targets=[target_file_in_archive])
# 查找主补丁文件和签名文件
target_filename = os.path.basename(self.plugin_path)
sig_filename = target_filename + ".sig" # 签名文件名
debug_logger.info(f"查找主补丁文件: {target_filename}")
debug_logger.info(f"查找签名文件: {sig_filename}")
target_file_in_archive = None
sig_file_in_archive = None
# 对于NEKOPARA After增加特殊处理
if self.game_version == "NEKOPARA After":
# 增加专门的检查,同时识别主补丁和签名文件
debug_logger.info("执行NEKOPARA After特殊补丁文件识别")
# 查找主补丁和签名文件
for file_path in file_list:
basename = os.path.basename(file_path)
# 查找主补丁文件
if basename == "afteradult.xp3" and not basename.endswith('.sig'):
target_file_in_archive = file_path
debug_logger.info(f"找到精确匹配的After主补丁文件: {target_file_in_archive}")
# 查找签名文件
elif basename == "afteradult.xp3.sig" or basename.endswith('.sig'):
sig_file_in_archive = file_path
debug_logger.info(f"找到After签名文件: {sig_file_in_archive}")
# 如果没找到主补丁文件,寻找可能的替代文件
if not target_file_in_archive:
for file_path in file_list:
if "afteradult.xp3" in file_path and not file_path.endswith('.sig'):
target_file_in_archive = file_path
debug_logger.info(f"找到备选After主补丁文件: {target_file_in_archive}")
break
else:
# 标准处理逻辑
for file_path in file_list:
basename = os.path.basename(file_path)
# 查找主补丁文件
if basename == target_filename and not basename.endswith('.sig'):
target_file_in_archive = file_path
debug_logger.info(f"在压缩包中找到主补丁文件: {target_file_in_archive}")
# 查找签名文件
elif basename == sig_filename:
sig_file_in_archive = file_path
debug_logger.info(f"在压缩包中找到签名文件: {sig_file_in_archive}")
# 如果没有找到精确匹配的主补丁文件,使用更宽松的搜索
if not target_file_in_archive:
debug_logger.warning(f"没有找到精确匹配的主补丁文件,尝试更宽松的搜索")
for file_path in file_list:
if target_filename in file_path and not file_path.endswith('.sig'):
target_file_in_archive = file_path
debug_logger.info(f"在压缩包中找到可能的主补丁文件: {target_file_in_archive}")
break
# 如果找不到主补丁文件,使用回退方案:提取全部内容
if not target_file_in_archive:
debug_logger.warning(f"未能识别正确的主补丁文件,将提取所有文件并尝试查找")
# 提取所有文件到临时目录
update_progress(30, f"正在解压所有文件...")
archive.extractall(path=temp_dir)
debug_logger.info(f"已提取所有文件到临时目录")
# 在提取的文件中查找主补丁文件和签名文件
found_main = False
found_sig = False
for root, dirs, files in os.walk(temp_dir):
for file in files:
# 查找主补丁文件
if file == target_filename and not file.endswith('.sig'):
extracted_file_path = os.path.join(root, file)
file_size = os.path.getsize(extracted_file_path)
debug_logger.info(f"在提取的文件中找到主补丁文件: {extracted_file_path}, 大小: {file_size} 字节")
# 复制到目标位置
target_path = os.path.join(self.game_folder, target_filename)
shutil.copy2(extracted_file_path, target_path)
debug_logger.info(f"已复制主补丁文件到: {target_path}")
found_main = True
# 查找签名文件
elif file == sig_filename or file.endswith('.sig'):
extracted_sig_path = os.path.join(root, file)
sig_size = os.path.getsize(extracted_sig_path)
debug_logger.info(f"在提取的文件中找到签名文件: {extracted_sig_path}, 大小: {sig_size} 字节")
# 复制到目标位置
sig_target = os.path.join(self.game_folder, sig_filename)
shutil.copy2(extracted_sig_path, sig_target)
debug_logger.info(f"已复制签名文件到: {sig_target}")
found_sig = True
# 如果两个文件都找到,可以停止遍历
if found_main and found_sig:
debug_logger.info("已找到所有需要的文件,停止遍历")
break
if found_main and found_sig:
break
if not found_main:
debug_logger.error(f"无法找到主补丁文件,安装失败")
raise FileNotFoundError(f"在压缩包中未找到主补丁文件 {target_filename}")
# 签名文件没找到不影响主流程,但记录警告
if not found_sig:
debug_logger.warning(f"未找到签名文件 {sig_filename},将尝试使用内置签名文件")
# 尝试使用内置签名文件
self._try_use_builtin_signature(sig_filename, debug_logger, update_progress)
else:
# 准备要解压的文件列表
files_to_extract = [target_file_in_archive]
if sig_file_in_archive:
files_to_extract.append(sig_file_in_archive)
debug_logger.info(f"将同时解压主补丁文件和签名文件: {files_to_extract}")
else:
debug_logger.info(f"将仅解压主补丁文件: {files_to_extract}")
# 解压选定的文件到临时目录
debug_logger.info(f"开始解压选定文件到临时目录: {temp_dir}")
# 设置解压超时时间(秒)
extract_timeout = 180 # 3分钟超时
debug_logger.info(f"设置解压超时: {extract_timeout}")
# 创建子线程执行解压
import threading
import queue
extract_result = queue.Queue()
def extract_files():
try:
archive.extract(path=temp_dir, targets=files_to_extract)
extract_result.put(("success", None))
except Exception as e:
extract_result.put(("error", e))
extract_thread = threading.Thread(target=extract_files)
extract_thread.daemon = True
extract_thread.start()
# 每5秒更新一次进度最多等待设定的超时时间
total_waited = 0
while extract_thread.is_alive() and total_waited < extract_timeout:
update_progress(30 + int(30 * total_waited / extract_timeout),
f"正在解压文件...已等待{total_waited}")
extract_thread.join(5) # 等待5秒
total_waited += 5
# 检查是否超时
if extract_thread.is_alive():
debug_logger.error(f"解压超时(超过{extract_timeout}秒)")
raise TimeoutError(f"解压超时(超过{extract_timeout}秒),请检查补丁文件是否完整")
# 检查解压结果
if not extract_result.empty():
status, error = extract_result.get()
if status == "error":
debug_logger.error(f"解压错误: {error}")
raise error
debug_logger.info(f"文件解压完成")
update_progress(60, f"正在复制 {self.game_version} 的补丁文件...")
# 找到解压后的文件
# 复制主补丁文件到游戏目录
extracted_file_path = os.path.join(temp_dir, target_file_in_archive)
# 复制到目标位置
# 检查解压后的文件是否存在及其大小
if os.path.exists(extracted_file_path):
file_size = os.path.getsize(extracted_file_path)
debug_logger.info(f"解压后的主补丁文件存在: {extracted_file_path}, 大小: {file_size} 字节")
else:
debug_logger.error(f"解压后的主补丁文件不存在: {extracted_file_path}")
raise FileNotFoundError(f"解压后的文件不存在: {extracted_file_path}")
# 构建目标路径并复制
target_path = os.path.join(self.game_folder, target_filename)
debug_logger.info(f"复制主补丁文件: {extracted_file_path}{target_path}")
shutil.copy2(extracted_file_path, target_path)
update_progress(80, f"正在完成 {self.game_version} 的补丁安装...")
# 对于NEKOPARA After还需要复制签名文件
if self.game_version == "NEKOPARA After":
sig_filename = f"{target_filename}.sig"
sig_file_in_archive = None
# 查找签名文件
for file_path in file_list:
if sig_filename in file_path:
sig_file_in_archive = file_path
break
# 验证主补丁文件是否成功复制
if os.path.exists(target_path):
target_size = os.path.getsize(target_path)
debug_logger.info(f"主补丁文件成功复制: {target_path}, 大小: {target_size} 字节")
else:
debug_logger.error(f"主补丁文件复制失败: {target_path}")
raise FileNotFoundError(f"目标文件复制失败: {target_path}")
# 如果有找到签名文件,也复制它
if sig_file_in_archive:
# 解压签名文件
archive.extract(path=temp_dir, targets=[sig_file_in_archive])
update_progress(80, f"正在复制签名文件...")
extracted_sig_path = os.path.join(temp_dir, sig_file_in_archive)
if os.path.exists(extracted_sig_path):
sig_size = os.path.getsize(extracted_sig_path)
debug_logger.info(f"解压后的签名文件存在: {extracted_sig_path}, 大小: {sig_size} 字节")
# 复制签名文件到游戏目录
sig_target = os.path.join(self.game_folder, sig_filename)
shutil.copy2(extracted_sig_path, sig_target)
debug_logger.info(f"签名文件成功复制: {sig_target}")
else:
# 如果签名文件不存在,则使用原始路径
sig_path = os.path.join(PLUGIN, GAME_INFO[self.game_version]["sig_path"])
if os.path.exists(sig_path):
sig_target = os.path.join(self.game_folder, sig_filename)
shutil.copy2(sig_path, sig_target)
debug_logger.warning(f"解压后的签名文件不存在: {extracted_sig_path}")
# 尝试使用内置签名文件
self._try_use_builtin_signature(sig_filename, debug_logger, update_progress)
else:
debug_logger.warning(f"没有找到签名文件,尝试使用内置签名文件")
# 尝试使用内置签名文件
self._try_use_builtin_signature(sig_filename, debug_logger, update_progress)
update_progress(100, f"{self.game_version} 补丁文件解压完成")
self.finished.emit(True, "", self.game_version)
@@ -134,3 +338,43 @@ class ExtractionThread(QThread):
except Exception:
pass
self.finished.emit(False, f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n", self.game_version)
# 添加一个新的辅助方法用于使用内置签名文件
def _try_use_builtin_signature(self, sig_filename, debug_logger, update_progress):
"""尝试使用内置的签名文件"""
try:
# 如果签名文件不在压缩包中,则尝试使用原始路径
update_progress(85, f"尝试使用内置签名文件...")
sig_path = os.path.join(PLUGIN, GAME_INFO[self.game_version]["sig_path"])
debug_logger.info(f"内置签名文件路径: {sig_path}")
if os.path.exists(sig_path):
sig_size = os.path.getsize(sig_path)
debug_logger.info(f"内置签名文件存在,大小: {sig_size} 字节")
# 确保使用正确的签名文件名
sig_target = os.path.join(self.game_folder, sig_filename)
debug_logger.info(f"目标签名文件路径: {sig_target}")
# 增加异常处理
try:
shutil.copy2(sig_path, sig_target)
if os.path.exists(sig_target):
target_sig_size = os.path.getsize(sig_target)
debug_logger.info(f"内置签名文件成功复制到目标位置,大小: {target_sig_size} 字节")
else:
debug_logger.error(f"内置签名文件复制失败,目标文件不存在: {sig_target}")
except Exception as copy_err:
debug_logger.error(f"复制内置签名文件时出错: {str(copy_err)}")
debug_logger.error(f"错误详情: {traceback.format_exc()}")
update_progress(90, f"使用内置签名文件完成")
else:
debug_logger.warning(f"内置签名文件不存在: {sig_path}")
update_progress(85, f"未找到内置签名文件,继续安装主补丁文件")
except Exception as sig_err:
# 签名文件处理失败时记录错误但不中断主流程
debug_logger.error(f"内置签名文件处理异常: {str(sig_err)}")
debug_logger.error(f"异常详情: {traceback.format_exc()}")
update_progress(85, f"内置签名文件处理失败: {str(sig_err)}")

View File

@@ -3,6 +3,7 @@ import hashlib
import py7zr
import tempfile
import traceback
import time # Added for time.time()
from PySide6.QtCore import QThread, Signal
from PySide6.QtWidgets import QApplication
from utils.logger import setup_logger
@@ -35,10 +36,27 @@ class HashThread(QThread):
"""运行线程"""
debug_mode = False
# 设置超时限制(分钟)
timeout_minutes = 10
max_execution_time = timeout_minutes * 60 # 转换为秒
start_execution_time = time.time()
# 尝试检测是否处于调试模式
if self.main_window and hasattr(self.main_window, 'debug_manager'):
debug_mode = self.main_window.debug_manager._is_debug_mode()
if debug_mode:
logger.debug(f"DEBUG: 设置哈希计算超时时间: {timeout_minutes} 分钟")
# 在各个关键步骤添加超时检测
def check_timeout():
elapsed = time.time() - start_execution_time
if elapsed > max_execution_time:
if debug_mode:
logger.error(f"DEBUG: 哈希计算超时,已执行 {elapsed:.1f} 秒,超过限制的 {max_execution_time}")
return True
return False
if self.mode == "pre":
status_copy = self.installed_status.copy()
@@ -65,6 +83,13 @@ class HashThread(QThread):
while True:
if self.isInterruptionRequested():
break
# 检查超时
if check_timeout():
logger.error(f"哈希计算超时,强制终止")
result["passed"] = False
result["game"] = game_version
result["message"] = f"\n{game_version} 哈希计算超时,已超过 {timeout_minutes} 分钟。\n\n请考虑跳过哈希校验或稍后再试。\n"
break
chunk = f.read(1024 * 1024)
if not chunk:
break
@@ -112,18 +137,69 @@ class HashThread(QThread):
# 当没有预期哈希值时,跳过检查
continue
# 检查文件存在和可读性
if not os.path.exists(install_path):
logger.error(f"哈希校验失败 - 文件不存在: {install_path}")
result["passed"] = False
result["game"] = game_version
result["message"] = f"\n{game_version} 安装后的文件不存在,无法校验。\n\n文件路径: {install_path}\n"
break
# 记录文件大小信息
file_size = os.path.getsize(install_path)
logger.info(f"正在校验 {game_version} 补丁文件: {install_path}, 文件大小: {file_size} 字节")
# 增加块大小,提高大文件处理性能
# 文件越大块越大最大256MB
chunk_size = min(256 * 1024 * 1024, max(16 * 1024 * 1024, file_size // 20))
logger.info(f" 使用块大小: {chunk_size // (1024 * 1024)}MB")
# 分块读取,避免大文件一次性读取内存
hash_obj = hashlib.sha256()
bytes_read = 0
start_time = time.time()
last_progress_time = start_time
with open(install_path, "rb") as f:
while True:
if self.isInterruptionRequested():
break
chunk = f.read(1024 * 1024)
# 检查超时
if check_timeout():
logger.error(f"哈希计算超时,强制终止")
result["passed"] = False
result["game"] = game_version
result["message"] = f"\n{game_version} 哈希计算超时,已超过 {timeout_minutes} 分钟。\n\n请考虑跳过哈希校验或稍后再试。\n"
break
chunk = f.read(chunk_size)
if not chunk:
break
bytes_read += len(chunk)
hash_obj.update(chunk)
# 每秒更新一次进度
current_time = time.time()
if current_time - last_progress_time >= 1.0:
progress = bytes_read / file_size * 100
elapsed = current_time - start_time
speed = bytes_read / (elapsed if elapsed > 0 else 1) / (1024 * 1024) # MB/s
logger.info(f" 进度: {progress:.1f}% - 已处理: {bytes_read/(1024*1024):.1f}MB/{file_size/(1024*1024):.1f}MB - 速度: {speed:.1f}MB/s")
last_progress_time = current_time
# 计算最终的哈希值
file_hash = hash_obj.hexdigest()
# 记录总用时
total_time = time.time() - start_time
logger.info(f" 哈希计算完成,耗时: {total_time:.1f}秒,平均速度: {file_size/(total_time*1024*1024):.1f}MB/s")
# 详细记录哈希比较结果
logger.info(f"哈希校验结果 - {game_version}:")
logger.info(f" 文件: {install_path}")
logger.info(f" 读取字节数: {bytes_read} / {file_size}")
logger.info(f" 预期哈希: {expected_hash}")
logger.info(f" 实际哈希: {file_hash}")
logger.info(f" 匹配结果: {file_hash == expected_hash}")
if debug_mode:
logger.debug(f"DEBUG: 哈希后检查 - {game_version}")
logger.debug(f"DEBUG: 文件路径: {install_path}")
@@ -134,7 +210,7 @@ class HashThread(QThread):
if file_hash != expected_hash:
result["passed"] = False
result["game"] = game_version
result["message"] = f"\n{game_version} 安装后的文件校验失败。\n\n文件可能已损坏或被篡改,请重新安装。\n"
result["message"] = f"\n{game_version} 安装后的文件校验失败。\n\n文件可能已损坏或被篡改,请重新安装。\n预期哈希: {expected_hash[:10]}...\n实际哈希: {file_hash[:10]}...\n"
if debug_mode:
logger.debug(f"DEBUG: 哈希后检查 - {game_version} 哈希不匹配")
break
@@ -180,10 +256,24 @@ class OfflineHashVerifyThread(QThread):
"""运行线程"""
debug_mode = False
# 设置超时限制(分钟)
timeout_minutes = 10
max_execution_time = timeout_minutes * 60 # 转换为秒
start_execution_time = time.time()
# 尝试检测是否处于调试模式
if self.main_window and hasattr(self.main_window, 'debug_manager'):
debug_mode = self.main_window.debug_manager._is_debug_mode()
# 检查超时的函数
def check_timeout():
elapsed = time.time() - start_execution_time
if elapsed > max_execution_time:
if debug_mode:
logger.debug(f"DEBUG: 哈希校验超时,已运行 {elapsed:.1f}")
return True
return False
# 获取预期的哈希值
expected_hash = self.plugin_hash.get(self.game_version, "")
@@ -359,23 +449,57 @@ class OfflineHashVerifyThread(QThread):
try:
# 读取文件内容并计算哈希值,同时更新进度
file_size = os.path.getsize(patch_file)
chunk_size = 1024 * 1024 # 1MB
# 根据文件大小动态调整块大小
# 文件越大块越大最大256MB
chunk_size = min(256 * 1024 * 1024, max(16 * 1024 * 1024, file_size // 20))
if debug_mode:
logger.debug(f"DEBUG: 文件大小: {file_size} 字节, 使用块大小: {chunk_size // (1024 * 1024)}MB")
hash_obj = hashlib.sha256()
with open(patch_file, "rb") as f:
bytes_read = 0
start_time = time.time()
last_progress_time = start_time
while True:
if self.isInterruptionRequested():
break
# 检查超时
if check_timeout():
logger.error(f"哈希计算超时,强制终止")
self.progress.emit(100)
self.finished.emit(
False,
f"{self.game_version} 哈希计算超时,已超过 {timeout_minutes} 分钟。请考虑跳过哈希校验或稍后再试。",
""
)
return
chunk = f.read(chunk_size)
if not chunk:
break
hash_obj.update(chunk)
bytes_read += len(chunk)
# 计算进度 (70-95%)
progress = 70 + int(25 * bytes_read / file_size)
self.progress.emit(min(95, progress))
# 每秒更新一次日志进度
current_time = time.time()
if debug_mode and current_time - last_progress_time >= 1.0:
elapsed = current_time - start_time
speed = bytes_read / (elapsed if elapsed > 0 else 1) / (1024 * 1024) # MB/s
percent = bytes_read / file_size * 100
logger.debug(f"DEBUG: 哈希计算进度 - {percent:.1f}% - 已处理: {bytes_read/(1024*1024):.1f}MB/{file_size/(1024*1024):.1f}MB - 速度: {speed:.1f}MB/s")
last_progress_time = current_time
# 记录总用时
if debug_mode:
total_time = time.time() - start_time
logger.debug(f"DEBUG: 哈希计算完成,耗时: {total_time:.1f}秒,平均速度: {file_size/(total_time*1024*1024):.1f}MB/s")
file_hash = hash_obj.hexdigest()
# 比较哈希值