498 lines
18 KiB
Python

import os
import py7zr
import requests
import shutil
import hashlib
import sys
import base64
import psutil
import ctypes
from PySide6 import QtWidgets, QtCore
from PySide6.QtCore import Qt, QByteArray, Signal
from PySide6.QtWidgets import (
QApplication,
QWidget,
QMessageBox,
QProgressBar,
QVBoxLayout,
QLabel,
)
from PySide6.QtGui import QIcon, QPixmap
from collections import deque
from pic_data import img_data
from GUI import Ui_mainwin
APP_VERSION = "@FRAISEMOE Addons Installer V4.8.6.17218"
TEMP = os.getenv("TEMP")
CACHE = os.path.join(TEMP, "FRAISEMOE")
PLUGIN = os.path.join(CACHE, "PLUGIN")
CONFIG_URL = "https://archive.ovofish.com/api/widget/nekopara/download_url.json"
UA = "Mozilla/5.0 (Linux debian12 Python-Accept) Gecko/20100101 Firefox/114.0"
SRC_HASHES = {
"NEKOPARA Vol.1": "04b48b231a7f34431431e5027fcc7b27affaa951b8169c541709156acf754f3e",
"NEKOPARA Vol.2": "b9c00a2b113a1e768bf78400e4f9075ceb7b35349cdeca09be62eb014f0d4b42",
"NEKOPARA Vol.3": "2ce7b223c84592e1ebc3b72079dee1e5e8d064ade15723328a64dee58833b9d5",
"NEKOPARA Vol.4": "4a4a9ae5a75a18aacbe3ab0774d7f93f99c046afe3a777ee0363e8932b90f36a",
}
def admin_status():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def run_as_admin():
script = os.path.abspath(sys.argv[0])
params = " ".join([script] + sys.argv[1:])
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, params, None, 1)
class DownloadThread(QtCore.QThread):
progress = Signal(int)
finished = Signal(bool, str)
def __init__(self, url, _7z_path, parent=None):
super().__init__(parent)
self.url = url
self._7z_path = _7z_path
def run(self):
try:
headers = {"User-Agent": UA}
r = requests.get(self.url, headers=headers, stream=True, timeout=10)
r.raise_for_status()
block_size = 64 * 1024
total_size = int(r.headers.get("content-length", 0))
with open(self._7z_path, "wb") as f:
for chunk in r.iter_content(chunk_size=block_size):
f.write(chunk)
self.progress.emit(f.tell() * 100 // total_size)
self.finished.emit(True, "")
except requests.exceptions.RequestException as e:
self.finished.emit(False, f"\n网络请求错误\n\n【错误信息】: {e}\n")
except Exception as e:
self.finished.emit(False, f"\n未知错误\n\n【错误信息】: {e}\n")
def game_process_status(process_name):
for proc in psutil.process_iter(["pid", "name"]):
try:
if process_name.lower() in proc.info["name"].lower():
return proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return None
def kill_process(pid):
try:
process = psutil.Process(pid)
process.terminate()
process.wait(timeout=5)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
class MyWindow(QWidget, Ui_mainwin):
def __init__(self):
super().__init__()
self.setupUi(self)
self.selected_folder = ""
self.installed_status = {
"NEKOPARA Vol.1": False,
"NEKOPARA Vol.2": False,
"NEKOPARA Vol.3": False,
"NEKOPARA Vol.4": False,
}
self.download_queue = deque()
self.current_download_thread = None
game_process_info = {
"nekopara_vol1.exe": "NEKOPARA Vol.1",
"nekopara_vol2.exe": "NEKOPARA Vol.2",
"NEKOPARAvol3.exe": "NEKOPARA Vol.3",
"nekopara_vol4.exe": "NEKOPARA Vol.4",
}
for process_name, game_version in game_process_info.items():
pid = game_process_status(process_name)
if pid:
msg_box = QMessageBox()
msg_box.setWindowTitle(f"进程检测 {APP_VERSION}")
pixmap = QPixmap()
pixmap.loadFromData(QByteArray(base64.b64decode(img_data["icon"])))
icon = QIcon(pixmap)
msg_box.setWindowIcon(icon)
msg_box.setText(f"\n检测到 {game_version} 正在运行,是否关闭?\n")
yes_button = msg_box.addButton(
"确定", QMessageBox.ButtonRole.AcceptRole
)
no_button = msg_box.addButton("取消", QMessageBox.ButtonRole.RejectRole)
msg_box.setDefaultButton(no_button)
msg_box.exec()
if msg_box.clickedButton() == yes_button:
kill_process(pid)
else:
QMessageBox.warning(
self,
f"警告 {APP_VERSION}",
f"\n请关闭 {game_version} 后再运行本程序。\n",
)
self.close()
sys.exit()
if not os.path.exists(PLUGIN):
os.makedirs(PLUGIN)
if not os.path.exists(PLUGIN):
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
"\n无法创建缓存位置\n\n使用管理员身份运行或检查文件读写权限\n",
)
self.close()
sys.exit()
self.startbtn.clicked.connect(self.file_dialog)
self.exitbtn.clicked.connect(self.shutdown_app)
def get_install_paths(self):
return {
"NEKOPARA Vol.1": os.path.join(
self.selected_folder, "NEKOPARA Vol. 1", "adultsonly.xp3"
),
"NEKOPARA Vol.2": os.path.join(
self.selected_folder, "NEKOPARA Vol. 2", "adultsonly.xp3"
),
"NEKOPARA Vol.3": os.path.join(
self.selected_folder, "NEKOPARA Vol. 3", "update00.int"
),
"NEKOPARA Vol.4": os.path.join(
self.selected_folder, "NEKOPARA Vol. 4", "vol4adult.xp3"
),
}
def file_dialog(self):
self.selected_folder = QtWidgets.QFileDialog.getExistingDirectory(
self, f"选择游戏所在【上级目录】 {APP_VERSION}"
)
if not self.selected_folder:
QMessageBox.warning(
self, f"通知 {APP_VERSION}", "\n未选择任何目录,请重新选择\n"
)
return
self.download_action()
def hash_calculate(self, file_path):
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def hash_pop_window(self):
msg_box = QMessageBox()
msg_box.setWindowTitle(f"通知 {APP_VERSION}")
pixmap = QPixmap()
pixmap.loadFromData(QByteArray(base64.b64decode(img_data["icon"])))
icon = QIcon(pixmap)
msg_box.setWindowIcon(icon)
msg_box.setText("\n正在检验文件状态...\n")
msg_box.setStandardButtons(QMessageBox.StandardButton.NoButton)
msg_box.show()
QApplication.processEvents()
return msg_box
def pre_hash_compare(self, install_path, game_version, SRC_HASHES):
if not os.path.exists(install_path):
self.installed_status[game_version] = False
return
msg_box = self.hash_pop_window()
file_hash = self.hash_calculate(install_path)
msg_box.close()
if file_hash != SRC_HASHES[game_version]:
msg_box = QMessageBox(self)
msg_box.setWindowTitle(f"文件校验 {APP_VERSION}")
msg_box.setText(
f"\n【 当前版本已安装旧版本补丁 -> {game_version}\n\n是否重新安装?\n----->取消安装前应确认补丁是否可用<-----\n"
)
yes_button = msg_box.addButton("确定", QMessageBox.ButtonRole.AcceptRole)
no_button = msg_box.addButton("取消", QMessageBox.ButtonRole.RejectRole)
msg_box.setDefaultButton(no_button)
msg_box.exec()
if msg_box.clickedButton() == yes_button:
self.installed_status[game_version] = False
return
else:
self.installed_status[game_version] = True
return
else:
self.installed_status[game_version] = True
return
def late_hash_compare(self, SRC_HASHES):
install_paths = self.get_install_paths()
passed = True
for game, hash_value in SRC_HASHES.items():
if self.installed_status.get(game):
msg_box = self.hash_pop_window()
file_hash = self.hash_calculate(install_paths[game])
msg_box.close()
if file_hash != hash_value:
passed = False
break
return passed
def download_config(self) -> dict:
try:
headers = {"User-Agent": UA}
response = requests.get(CONFIG_URL, headers=headers, timeout=10)
response.raise_for_status()
response = response.json()
return {f"vol{i+1}": response[f"vol.{i+1}.data"]["url"] for i in range(4)}
except requests.exceptions.RequestException as e:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n下载配置获取失败\n\n网络状态异常或服务器故障\n\n【错误信息】:{e}\n",
)
except Exception as e:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n下载配置获取失败\n\n未知错误\n\n【错误信息】:{e}\n",
)
return {}
def download_setting(self, url, game_folder, game_version, _7z_path, plugin_path):
game_exe = {
"NEKOPARA Vol.1": os.path.join(
self.selected_folder, "NEKOPARA Vol. 1", "nekopara_vol1.exe"
),
"NEKOPARA Vol.2": os.path.join(
self.selected_folder, "NEKOPARA Vol. 2", "nekopara_vol2.exe"
),
"NEKOPARA Vol.3": os.path.join(
self.selected_folder, "NEKOPARA Vol. 3", "NEKOPARAvol3.exe"
),
"NEKOPARA Vol.4": os.path.join(
self.selected_folder, "NEKOPARA Vol. 4", "nekopara_vol4.exe"
),
}
if (
game_version not in game_exe
or not os.path.exists(game_exe[game_version])
or self.installed_status[game_version]
):
self.next_download_task()
return
progress_window = ProgressWindow(self)
progress_window.show()
self.current_download_thread = DownloadThread(url, _7z_path, self)
self.current_download_thread.progress.connect(progress_window.setprogressbarval)
self.current_download_thread.finished.connect(
lambda success, error: self.install_setting(
success,
error,
progress_window,
game_folder,
game_version,
_7z_path,
plugin_path,
)
)
self.current_download_thread.start()
def install_setting(
self,
success,
error,
progress_window,
game_folder,
game_version,
_7z_path,
plugin_path,
):
progress_window.close()
if success:
try:
msg_box = self.hash_pop_window()
QApplication.processEvents()
with py7zr.SevenZipFile(_7z_path, mode="r") as archive:
archive.extractall(path=PLUGIN)
shutil.copy(plugin_path, game_folder)
self.installed_status[game_version] = True
QMessageBox.information(
self, f"通知 {APP_VERSION}", f"\n{game_version} 补丁已安装\n"
)
except py7zr.Bad7zFile as e:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n文件损坏\n\n【错误信息】:{e}\n",
)
except FileNotFoundError as e:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n文件不存在\n\n【错误信息】:{e}\n",
)
except Exception as e:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n文件操作失败,请重试\n\n【错误信息】:{e}\n",
)
finally:
msg_box.close()
else:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n文件获取失败\n网络状态异常或服务器故障\n\n【错误信息】:{error}\n",
)
self.next_download_task()
def download_action(self):
install_paths = self.get_install_paths()
for game_version, install_path in install_paths.items():
self.pre_hash_compare(install_path, game_version, SRC_HASHES)
if self.late_hash_compare(SRC_HASHES):
config = self.download_config()
if not config:
QMessageBox.critical(
self, f"错误 {APP_VERSION}", "\n网络状态异常或服务器故障,请重试\n"
)
return
for i in range(1, 5):
game_version = f"NEKOPARA Vol.{i}"
if self.installed_status[game_version] == False:
url = config[f"vol{i}"]
game_folder = os.path.join(
self.selected_folder, f"NEKOPARA Vol. {i}"
)
_7z_path = os.path.join(PLUGIN, f"vol.{i}.7z")
plugin_path = os.path.join(
PLUGIN,
f"vol.{i}",
[
"adultsonly.xp3",
"adultsonly.xp3",
"update00.int",
"vol4adult.xp3",
][i - 1],
)
self.download_queue.append(
(url, game_folder, game_version, _7z_path, plugin_path)
)
self.next_download_task()
def next_download_task(self):
if not self.download_queue:
self.show_result()
return
url, game_folder, game_version, _7z_path, plugin_path = (
self.download_queue.popleft()
)
self.download_setting(url, game_folder, game_version, _7z_path, plugin_path)
def show_result(self):
installed_version = "\n".join(
[i for i in self.installed_status if self.installed_status[i]]
)
failed_ver = "\n".join(
[i for i in self.installed_status if not self.installed_status[i]]
)
QMessageBox.information(
self,
f"完成 {APP_VERSION}",
f"\n安装结果:\n"
f"安装成功数:{len(installed_version.splitlines())} 安装失败数:{len(failed_ver.splitlines())}\n\n"
f"安装成功的版本:\n"
f"{installed_version}\n"
f"尚未持有的版本:\n"
f"{failed_ver}\n",
)
def shutdown_app(self):
msg_box = QMessageBox(self)
msg_box.setWindowTitle("退出程序")
msg_box.setText("\n是否确定退出?\n")
yes_button = msg_box.addButton("确定", QMessageBox.ButtonRole.AcceptRole)
no_button = msg_box.addButton("取消", QMessageBox.ButtonRole.RejectRole)
msg_box.setDefaultButton(no_button)
msg_box.exec()
if msg_box.clickedButton() == yes_button:
if (
self.current_download_thread
and self.current_download_thread.isRunning()
):
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
"\n当前有下载任务正在进行,完成后再试。\n",
)
return
if os.path.exists(PLUGIN):
try:
shutil.rmtree(PLUGIN)
except Exception as e:
QMessageBox.critical(
self,
f"错误 {APP_VERSION}",
f"\n清理缓存失败\n\n【错误信息】:{e}\n",
)
sys.exit()
class ProgressWindow(QtWidgets.QDialog):
def __init__(self, parent=None):
super(ProgressWindow, self).__init__(parent)
self.setWindowTitle(f"下载进度 {APP_VERSION}")
self.resize(400, 100)
self.progress_bar_max = 100
self.setWindowFlags(self.windowFlags() & ~Qt.WindowCloseButtonHint)
self.setWindowFlags(self.windowFlags() & ~Qt.WindowSystemMenuHint)
layout = QVBoxLayout()
self.progress_bar = QProgressBar()
self.progress_bar.setValue(0)
self.label = QLabel("\n正在下载...\n")
layout.addWidget(self.label)
layout.addWidget(self.progress_bar)
self.setLayout(layout)
def setmaxvalue(self, value):
self.progress_bar_max = value
self.progress_bar.setMaximum(value)
def setprogressbarval(self, value):
self.progress_bar.setValue(value)
if value == self.progress_bar_max:
QtCore.QTimer.singleShot(2000, self.close)
if __name__ == "__main__":
if not admin_status():
run_as_admin()
sys.exit()
app = QApplication([])
window = MyWindow()
window.show()
sys.exit(app.exec())