425 lines
13 KiB
Python

import sys
import json
import time
import requests
import log
import config
class StashInterface:
def __init__(self, fragment):
self._start = time.time()
self._fragment = fragment
self._mode = self._fragment['args'].get("mode") or "normal"
self._fragment_server = self._fragment["server_connection"]
self._plugin_dir = self._fragment_server["PluginDir"]
hook_ctx = self._fragment["args"].get("hookContext")
if hook_ctx:
self._hook_type = hook_ctx.get("type")
self._scene_id = hook_ctx.get("id")
else:
self._scene_id = None
self._path_rewrite = self._fragment["args"].get("pathRewrite")
log.LogDebug(
f"Starting nfoSceneParser plugin for scene {self._scene_id}")
def get_scene_id(self):
return self._scene_id
def get_mode(self):
return self._mode
def gql_findScene(self, scene_id):
query = """
query FindScene($id: ID!, $checksum: String) {
findScene(id: $id, checksum: $checksum) {
...SceneData
}
}
fragment SceneData on Scene {
id
title
details
urls
date
rating: rating100
organized
files {
path
}
studio {
...SlimStudioData
}
movies {
movie {
...SlimMovieData
}
scene_index
}
tags {
...SlimTagData
}
performers {
...SlimPerformerData
}
stash_ids {
endpoint
stash_id
}
}
fragment SlimStudioData on Studio {
id
name
}
fragment SlimMovieData on Movie {
id
name
director
}
fragment SlimTagData on Tag {
id
name
}
fragment SlimPerformerData on Performer {
id
name
}
"""
variables = {
"id": scene_id
}
result = self.__gql_call(query, variables)
# Path rewriting used for testing only
if (self._path_rewrite is not None):
result["findScene"]["files"][0]["path"] = result["findScene"]["files"][0]["path"].replace(
self._path_rewrite[0], self._path_rewrite[1])
return result.get("findScene")
def gql_findScenes(self, tag_id=None):
query = """
query FindScenes($scene_filter: SceneFilterType, $filter: FindFilterType) {
findScenes(scene_filter: $scene_filter, filter: $filter) {
count
scenes {
...SlimSceneData
}
}
}
fragment SlimSceneData on Scene {
id
organized
tags {
id
name
}
}
"""
variables = {
"scene_filter": None,
"filter": {
"direction": "ASC",
"page": 1,
"per_page": -1,
"sort": "updated_at"
}
}
if tag_id:
variables["scene_filter"] = {
"tags": {
"value": tag_id,
"modifier": "INCLUDES"
}
}
result = self.__gql_call(query, variables)
return result.get("findScenes")
def gql_updateScene(self, scene_id, scene_data):
query = """
mutation sceneUpdate($input: SceneUpdateInput!) {
sceneUpdate(input: $input) {
id
}
}
"""
input_data = {
"id": scene_id,
"title": scene_data["title"],
"details": scene_data["details"],
"date": scene_data["date"],
"rating100": scene_data["rating"],
"urls": scene_data["urls"],
"studio_id": scene_data["studio_id"],
"code": scene_data["code"],
"performer_ids": scene_data["performer_ids"],
"tag_ids": scene_data["tag_ids"],
}
if scene_data["cover_image"] is not None:
input_data.update({"cover_image": scene_data["cover_image"]})
# Update to "organized" according to config
if config.set_organized_nfo and scene_data["source"] == "nfo":
has_mandatory_tags = True
scene_keys = [item[0].replace(
"_id", "") if item[1] else None for item in scene_data.items()]
for mandatory_tag in config.set_organized_only_if:
if mandatory_tag not in scene_keys:
has_mandatory_tags = False
break
if has_mandatory_tags:
input_data.update({"organized": True})
# Update movie if exists
if scene_data["movie_id"] is not None:
input_data["movies"] = {
"movie_id": scene_data["movie_id"],
"scene_index": scene_data["scene_index"],
}
variables = {
"input": input_data
}
result = self.__gql_call(query, variables)
return result.get("sceneUpdate")
def gql_performerCreate(self, name):
query = """
mutation performerCreate($input: PerformerCreateInput!) {
performerCreate(input: $input) {
id
}
}
"""
variables = {
"input": {
"name": name
}
}
result = self.__gql_call(query, variables)
return result.get("performerCreate")
def gql_studioCreate(self, name):
query = """
mutation studioCreate($input: StudioCreateInput!) {
studioCreate(input: $input) {
id
}
}
"""
variables = {
"input": {
"name": name
}
}
result = self.__gql_call(query, variables)
return result.get("studioCreate")
def gql_tagCreate(self, name):
query = """
mutation tagCreate($input: TagCreateInput!) {
tagCreate(input: $input) {
id
}
}
"""
variables = {
"input": {
"name": name
}
}
result = self.__gql_call(query, variables)
return result.get("tagCreate")
def gql_movieCreate(self, file_data, studio_id, folder_data):
query = """
mutation movieCreate($input: MovieCreateInput!) {
movieCreate(input: $input) {
id
}
}
"""
# Use folder nfo data for some movie specific attributes (ignoring scene nfo specifics)
date = folder_data.get("date") or file_data["date"] or None
bl = config.blacklist
variables = {
"input": {
"name": file_data["movie"],
"studio_id": (studio_id or None) if "studio" not in bl else None,
"date": date if "date" not in bl else None,
"director": (file_data["director"] or None) if "director" not in bl else None,
"synopsis": (folder_data.get("details") or None) if "details" not in bl else None,
"rating100": (folder_data.get("rating") or None) if "rating" not in bl else None,
# Take 1st folder NFO URL for movie
"url": (folder_data.get("urls")[0] or None) if "urls" not in bl else None,
"front_image": folder_data.get("cover_image") if "cover_image" not in bl else None,
"back_image": folder_data.get("other_image") if "cover_image" not in bl else None,
}
}
result = self.__gql_call(query, variables)
return result.get("movieCreate")
def gql_findPerformers(self, name):
query = """
query findPerformers($performer_filter: PerformerFilterType, $filter: FindFilterType) {
findPerformers(performer_filter: $performer_filter, filter: $filter) {
performers {
id
name
alias_list
}
}
}
"""
variables = {
"performer_filter": {
"name": {
"value": name,
"modifier": "INCLUDES"
},
"OR": {
"aliases": {
"value": name,
"modifier": "INCLUDES"
}
}
},
"filter": {
"per_page": -1
},
}
result = self.__gql_call(query, variables)
return result.get("findPerformers")
def gql_findStudios(self, name):
query = """
query findStudios($studio_filter: StudioFilterType, $filter: FindFilterType) {
findStudios(studio_filter: $studio_filter, filter: $filter) {
studios {
id
name
aliases
}
}
}
"""
variables = {
"studio_filter": {
"name": {
"value": name,
"modifier": "INCLUDES"
},
"OR": {
"aliases": {
"value": name,
"modifier": "INCLUDES"
}
}
},
"filter": {
"per_page": -1
},
}
result = self.__gql_call(query, variables)
return result.get("findStudios")
def gql_findMovies(self, name):
query = """
query findMovies($movie_filter: MovieFilterType, $filter: FindFilterType) {
findMovies(movie_filter: $movie_filter, filter: $filter) {
movies {
id
name
}
}
}
"""
variables = {
"studio_filter": {
"name": {
"value": name,
"modifier": "INCLUDES"
}
},
"filter": {
"per_page": -1
},
}
result = self.__gql_call(query, variables)
return result.get("findMovies")
def gql_findTags(self, name=None):
query = """
query findTags($tag_filter: TagFilterType, $filter: FindFilterType) {
findTags(tag_filter: $tag_filter, filter: $filter) {
tags {
id
name
aliases
}
}
}
"""
variables = {
"filter": {
"per_page": -1
},
}
if name:
variables["tag_filter"] = {
"name": {
"value": name,
"modifier": "INCLUDES"
}
}
result = self.__gql_call(query, variables)
return result.get("findTags")
def exit_plugin(self, msg=None, err=None):
if not msg and not err:
msg = "plugin ended"
log.LogDebug(f"Execution time: {round(time.time() - self._start, 3)}s")
output_json = {"output": msg, "error": err}
print(json.dumps(output_json))
sys.exit()
def __gql_call(self, query, variables=None):
# Session cookie for authentication (supports API key for CLI tests)
graphql_port = str(self._fragment_server["Port"])
graphql_scheme = self._fragment_server["Scheme"]
graphql_cookies = "" if not self._fragment_server.get("SessionCookie") else {
"session": self._fragment_server["SessionCookie"]["Value"]}
graphql_headers = {
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/json",
"Accept": "application/json",
"Connection": "keep-alive",
"DNT": "1"
}
graphql_api_key = self._fragment_server.get("ApiKey")
if graphql_api_key is not None:
graphql_headers.update({"ApiKey": graphql_api_key})
graphql_domain = self._fragment_server["Host"]
if graphql_domain == "0.0.0.0":
graphql_domain = "localhost"
# Stash GraphQL endpoint
graphql_url = f"{graphql_scheme}://{graphql_domain}:{graphql_port}/graphql"
graphql_json = {"query": query}
if variables is not None:
graphql_json["variables"] = variables
try:
response = requests.post(
graphql_url, json=graphql_json, headers=graphql_headers, cookies=graphql_cookies, timeout=20)
except Exception as e:
self.exit_plugin(err=f"[FATAL] Error with the graphql request {repr(e)}")
if response.status_code == 200:
result = response.json()
if result.get("errors"):
for error in result["errors"]:
raise Exception(f"GraphQL error: {error}")
return None
if result.get("data"):
return result.get("data")
elif response.status_code == 401:
self.exit_plugin(err="HTTP Error 401, Unauthorised.")
else:
raise ConnectionError(
f"GraphQL query failed: {response.status_code} - {response.content}")