2025-07-14 21:12:48 +03:00

371 lines
14 KiB
Python

import os
import sys
import zipfile
import tempfile
from PythonDepManager import ensure_import
# --- VENV AUTO-CREATION WITH REQUIREMENTS AND AUTO-RESTART ---
venv_dir = os.path.join(os.path.dirname(__file__), "venv")
requirements_path = os.path.join(os.path.dirname(__file__), "requirements.txt")
# --- PYTHON VERSION CHECK ---
if not os.path.isdir(venv_dir) and not (sys.version_info.major == 3 and sys.version_info.minor == 10):
ensure_import("stashapi:stashapp-tools>=0.2.58")
import stashapi.log as log
log.error("Error: Python version must be >= 3.10.X (recommanded 3.10.11) for the first installation of the plugin. Once installed you can change back your python version in stash as this plugin will run within its own venv")
log.error(f"Current version: {sys.version}")
log.error("Go to https://www.python.org/downloads/release/python-31011/")
sys.exit(1)
# --- END PYTHON VERSION CHECK ---
def in_venv():
# Checks if running inside the venv we expect
return (
hasattr(sys, 'real_prefix') or
(hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix)
) and os.path.abspath(sys.prefix) == os.path.abspath(venv_dir)
def install_dependencies():
"""
Install dependencies from requirements.txt if not already installed.
"""
if not os.path.isfile(requirements_path):
print("No requirements.txt found, skipping dependency installation.")
return
import subprocess
pip_exe = os.path.join(venv_dir, "Scripts", "pip.exe") if os.name == "nt" else os.path.join(venv_dir, "bin", "pip")
py_exe = os.path.join(venv_dir, "Scripts", "python.exe") if os.name == "nt" else os.path.join(venv_dir, "bin", "python")
subprocess.check_call([py_exe,"-m","pip", "install", "--upgrade", "pip"])
subprocess.check_call([pip_exe, "install", "-r", requirements_path])
if not os.path.isdir(venv_dir):
ensure_import("stashapi:stashapp-tools>=0.2.58")
import stashapi.log as log
import subprocess
log.info("No venv found. Creating virtual environment...")
subprocess.check_call([sys.executable, "-m", "venv", venv_dir])
log.progress(0.25)
log.info("Virtual environment created at "+ venv_dir)
if os.path.isfile(requirements_path):
log.info("Installing dependencies... This might take a while")
install_dependencies()
else:
log.info("No requirements.txt found, skipping dependency installation.")
# If not running in the venv, restart the script using the venv's Python
if not in_venv():
py_exe = os.path.join(venv_dir, "Scripts", "python.exe") if os.name == "nt" else os.path.join(venv_dir, "bin", "python")
print(f"Restarting script in venv: {py_exe}")
os.execv(py_exe, [py_exe] + sys.argv)
# --- END VENV AUTO-CREATION WITH REQUIREMENTS AND AUTO-RESTART ---
import json
import subprocess
import platform
# Set environment variables
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # Suppress TF logs
# Ensure dependencies
try:
from deepface import DeepFace
import numpy as np
import psutil
import stashapi.log as log
from stashapi.stashapp import StashInterface
except:
install_dependencies()
from deepface import DeepFace
import numpy as np
import psutil
import stashapi.log as log
from stashapi.stashapp import StashInterface
VOY_DB_PATH = os.path.join(os.path.dirname(__file__), "voy_db")
os.makedirs(os.path.join(VOY_DB_PATH, "facenet"), exist_ok=True)
os.makedirs(os.path.join(VOY_DB_PATH, "arc"), exist_ok=True)
def main():
"""
Main entry point for the plugin.
"""
global stash
json_input = json.loads(sys.stdin.read())
stash = StashInterface(json_input["server_connection"])
mode_arg = json_input["args"].get("mode")
config = stash.get_configuration()["plugins"]
settings = {"voyCount": 15, "sceneCount": 0, "imgCount": 0}
if "LocalVisage" in config:
settings.update(config["LocalVisage"])
if mode_arg == "spawn_server":
spawn_server(json_input["server_connection"])
elif mode_arg == "stop_server":
kill_stashface_server()
elif mode_arg == "rebuild_model":
rebuild_model(update_only=False, settings=settings)
elif mode_arg == "update_model":
rebuild_model(update_only=True, settings=settings)
def can_read_image(image_path):
"""
Check if an image path can be read, handling both regular files and files inside ZIP archives.
Args:
image_path (str): Path to the image file
Returns:
tuple: (can_read, actual_path) where can_read is bool and actual_path is the path to use
"""
if os.path.exists(image_path):
return True, image_path
# Check if it's inside a ZIP file
if ".zip" in image_path.lower():
try:
parts = image_path.split(".zip")
if len(parts) >= 2:
zip_path = parts[0] + ".zip"
internal_path = parts[1].lstrip(os.sep + "/") # Remove leading separators
if os.path.exists(zip_path):
with zipfile.ZipFile(zip_path, 'r') as zip_file:
# Check if the internal path exists in the ZIP
if internal_path in zip_file.namelist():
# Extract to temporary file and return temp path
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(internal_path)[1]) as tmp_file:
tmp_file.write(zip_file.read(internal_path))
return True, tmp_file.name
except Exception as e:
log.warning(f"Error reading from ZIP file {image_path}: {e}")
return False, image_path
def cleanup_temp_file(file_path):
"""
Clean up temporary files created for ZIP extraction.
Args:
file_path (str): Path to the temporary file
"""
try:
if file_path.startswith(tempfile.gettempdir()):
os.unlink(file_path)
except Exception as e:
log.warning(f"Error cleaning up temporary file {file_path}: {e}")
def find_performers(settings):
"""
Find performers with images for model building.
"""
query={}
# query performers based on sceneCount and imgCount settings
scene_count_min = settings.get("sceneCount", 0)
img_count_min = settings.get("imgCount", 0)
if scene_count_min>0 or img_count_min>0:
query={
"scene_count": {"modifier": "GREATER_THAN", "value": scene_count_min-1},
"image_count": {"modifier": "GREATER_THAN", "value": img_count_min-1},
}
performers_all = stash.find_performers(f=query, fragment="id name image_path custom_fields")
performers_without_image = stash.find_performers(f={"is_missing": "image"}, fragment="id")
performers_without_image_ids = {p["id"] for p in performers_without_image}
performers_to_process = [p for p in performers_all if p["id"] not in performers_without_image_ids]
performers_to_process = [
p for p in performers_to_process
if (p.get("scene_count", 0) >= scene_count_min and
p.get("image_count", 0) >= img_count_min)
]
return enrich_performers(performers_to_process, settings)
def enrich_performers(performers, settings):
"""
Add extra images to each performer for embedding calculation.
"""
for progress, performer in enumerate(performers):
performer["images"] = []
if performer.get("image_path"):
performer["images"].append(performer["image_path"])
extra_images = stash.find_images(
filter={
"direction": "ASC",
"page": 1,
"per_page": settings.get("voyCount", 15) - 1,
"q": "",
"sort": "random_11365347"
},
f={
"performer_count": {"modifier": "EQUALS", "value": 1},
"performers": {"modifier": "INCLUDES_ALL", "value": [performer["id"]]},
"path": {
"modifier": "NOT_MATCHES_REGEX",
"value": r".*\.(mp4|webm|avi|mov|mkv|flv|wmv|gif)$|.*[^\x00-\x7F].*"
}
}
)
for image in extra_images:
if image.get("visual_files") and len(image["visual_files"]) > 0:
image_path = image["visual_files"][0]["path"]
can_read, actual_path = can_read_image(image_path)
if can_read:
performer["images"].append(actual_path)
else:
log.warning(f"Image path does not exist and cannot be read: {image_path}")
else:
log.warning(f"No visual files found for image ID: {image['id']}")
log.progress((progress + 1) / len(performers))
return performers
def rebuild_model(update_only, settings):
"""
Build or update the face embedding model for all performers.
"""
log.info("Updating model..." if update_only else "Rebuilding model...")
performers = find_performers(settings)
if not performers:
log.info("No performers found for model building.")
return
log.info("Database scraped, starting to rebuild model...")
for progress, performer in enumerate(performers):
embeddings_facenet = []
embeddings_arc = []
custom_fields = performer.get("custom_fields", {})
images_used = custom_fields.get("number_of_images_used_for_voy", 0)
if update_only and images_used >= settings["voyCount"]:
continue
if update_only and len(performer["images"]) <= images_used:
continue
for uri in performer["images"]:
try:
result_facenet = DeepFace.represent(
img_path=uri,
model_name="Facenet512",
detector_backend='yolov8',
normalization='Facenet2018',
align=True,
enforce_detection=False
)
embeddings_facenet.append(result_facenet[0]['embedding'])
result_arc = DeepFace.represent(
img_path=uri,
model_name="ArcFace",
detector_backend='yolov8',
enforce_detection=False,
align=True
)
embeddings_arc.append(result_arc[0]['embedding'])
except Exception as e:
log.warning(f"[WARN] Skipping {uri}: {e}")
finally:
# Clean up temporary files created for ZIP extraction
cleanup_temp_file(uri)
if embeddings_facenet and embeddings_arc:
avg_embedding_facenet = np.mean(embeddings_facenet, axis=0).astype(np.float32)
facenet_path = os.path.join(VOY_DB_PATH, "facenet", f"{performer['id']}-{performer['name']}.voy")
np.save(facenet_path, avg_embedding_facenet)
avg_embedding_arc = np.mean(embeddings_arc, axis=0).astype(np.float32)
arc_path = os.path.join(VOY_DB_PATH, "arc", f"{performer['id']}-{performer['name']}.voy")
np.save(arc_path, avg_embedding_arc)
embeddings_count = max(len(embeddings_facenet), len(embeddings_arc))
stash.update_performer({
"id": performer["id"],
"custom_fields": {
"partial": {
"number_of_images_used_for_voy": embeddings_count,
}
}
})
log.info(f"[INFO] Saved VOY for {performer['name']} with {embeddings_count} images.")
else:
log.warning(f"[WARN] No valid embeddings for {performer['name']}.")
log.progress((progress + 1) / len(performers))
log.info("Rebuilding model finished.")
if server_running():
kill_stashface_server()
# Optionally, reload server with new connection info if needed
def server_running():
"""
Check if the stashface server is running.
"""
try:
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
name = proc.info.get('name', '').lower()
cmdline_raw = proc.info.get('cmdline')
if not cmdline_raw:
continue
cmdline = [str(arg).lower() for arg in cmdline_raw]
if 'python' in name and any('stashface' in arg and 'app.py' in arg for arg in cmdline):
log.debug("Stashface server is already running.")
return True
except psutil.NoSuchProcess:
return False
return False
def kill_stashface_server():
"""
Kill any running stashface server processes.
"""
killed = False
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
cmdline = proc.info['cmdline']
if cmdline and any('stashface' in arg and 'app.py' in arg for arg in cmdline):
log.debug(f"Killing process {proc.pid}: {' '.join(cmdline)}")
proc.kill()
killed = True
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if killed:
log.info("Stashface server killed.")
def spawn_server(server_connection=None):
"""
Spawn the stashface server as a subprocess.
"""
if server_running():
log.info("Stashface server is already running.")
return
plugin_dir = os.path.dirname(__file__)
py_exe = os.path.join(venv_dir, "Scripts", "python.exe") if os.name == "nt" else os.path.join(venv_dir, "bin", "python")
cmd = [
py_exe,
os.path.abspath(os.path.join(plugin_dir, "stashface", "app.py")),
]
log.info("Spawning server")
env = os.environ.copy()
if server_connection is not None:
env["SERVER_CONNECTION"] = json.dumps(server_connection)
if platform.system() == "Windows":
subprocess.Popen(
cmd,
creationflags=subprocess.CREATE_NEW_CONSOLE,
close_fds=True,
cwd=plugin_dir,
env=env
)
else:
subprocess.Popen(
cmd,
start_new_session=True,
close_fds=True,
cwd=plugin_dir,
env=env
)
log.info("Server spawned successfully, you can now use the plugin.")
if __name__ == '__main__':
main()