mirror of
https://github.com/stashapp/CommunityScripts.git
synced 2026-04-21 01:03:29 -05:00
[tagGraph] Consolidate script and plugin versions (#61)
This commit is contained in:
30
plugins/tagGraph/README.md
Normal file
30
plugins/tagGraph/README.md
Normal file
@@ -0,0 +1,30 @@
|
||||
|
||||
# Tag Graph Generator
|
||||
|
||||
## Requirements
|
||||
* python >= 3.7.X
|
||||
* `pip install -r requirements.txt`
|
||||
|
||||
---
|
||||
|
||||
## Usage
|
||||
|
||||
### Running as a plugin
|
||||
move the `tagGraph` directory into Stash's plugins directory, reload plugins and you can run the **Generate Graph** task
|
||||
|
||||
### Running as a script
|
||||
> **⚠️ Note:** use this if you are connecting to a remote instance of stash
|
||||
|
||||
ensure `STASH_SETTINGS` is configured properly, you will likely need to change it
|
||||
|
||||
run `python .\tag_graph.py -script`
|
||||
|
||||
### View graph
|
||||
a `tag_graph.html` file will be generated inside the tagGraph directory, open it with a browser to view/interact with the graph
|
||||
|
||||
---
|
||||
|
||||
## Customizing the graph
|
||||
set `SHOW_OPTIONS` to `True` and you will get an interface to play around with that will affect what the graph looks like.
|
||||
|
||||
for more info see [pyvis docs](https://pyvis.readthedocs.io/en/latest/tutorial.html#using-the-configuration-ui-to-dynamically-tweak-network-settings)
|
||||
7
plugins/tagGraph/config.py
Normal file
7
plugins/tagGraph/config.py
Normal file
@@ -0,0 +1,7 @@
|
||||
STASH_SETTINGS = {
|
||||
"Scheme":"http",
|
||||
"Domain": "localhost",
|
||||
"Port": "9999",
|
||||
"ApiKey": "YOUR_API_KEY_HERE"
|
||||
}
|
||||
SHOW_OPTIONS = False
|
||||
@@ -1,59 +0,0 @@
|
||||
import re, sys, copy
|
||||
|
||||
|
||||
# Log messages sent from a plugin instance are transmitted via stderr and are
|
||||
# encoded with a prefix consisting of special character SOH, then the log
|
||||
# level (one of t, d, i, w, e, or p - corresponding to trace, debug, info,
|
||||
# warning, error and progress levels respectively), then special character
|
||||
# STX.
|
||||
#
|
||||
# The LogTrace, LogDebug, LogInfo, LogWarning, and LogError methods, and their equivalent
|
||||
# formatted methods are intended for use by plugin instances to transmit log
|
||||
# messages. The LogProgress method is also intended for sending progress data.
|
||||
#
|
||||
|
||||
def __prefix(level_char):
|
||||
start_level_char = b'\x01'
|
||||
end_level_char = b'\x02'
|
||||
|
||||
ret = start_level_char + level_char + end_level_char
|
||||
return ret.decode()
|
||||
|
||||
|
||||
|
||||
def __log(levelChar, s):
|
||||
|
||||
s_out = copy.deepcopy(s)
|
||||
if not isinstance(s_out, str):
|
||||
s_out = str(s_out)
|
||||
s_out = re.sub(r'(?<=")(data:image.+?;base64).+?(?=")', r'\1;truncated', s_out)
|
||||
|
||||
if levelChar == "":
|
||||
return
|
||||
|
||||
print(__prefix(levelChar) + s_out + "\n", file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def trace(s):
|
||||
__log(b't', s)
|
||||
|
||||
|
||||
def debug(s):
|
||||
__log(b'd', s)
|
||||
|
||||
|
||||
def info(s):
|
||||
__log(b'i', s)
|
||||
|
||||
|
||||
def warning(s):
|
||||
__log(b'w', s)
|
||||
|
||||
|
||||
def error(s):
|
||||
__log(b'e', s)
|
||||
|
||||
|
||||
def progress(p):
|
||||
progress = min(max(0, p), 1)
|
||||
__log(b'p', str(progress))
|
||||
@@ -1,112 +0,0 @@
|
||||
import requests
|
||||
import sys
|
||||
import log
|
||||
import re
|
||||
|
||||
class StashInterface:
|
||||
port = ""
|
||||
url = ""
|
||||
headers = {
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Connection": "keep-alive",
|
||||
"DNT": "1"
|
||||
}
|
||||
cookies = {}
|
||||
|
||||
def __init__(self, conn, fragments={}):
|
||||
self.port = conn['Port'] if conn.get('Port') else '9999'
|
||||
scheme = conn['Scheme'] if conn.get('Scheme') else 'http'
|
||||
|
||||
# Session cookie for authentication
|
||||
|
||||
self.cookies = {}
|
||||
if conn.get('SessionCookie'):
|
||||
self.cookies.update({
|
||||
'session': conn['SessionCookie']['Value']
|
||||
})
|
||||
|
||||
domain = conn['Domain'] if conn.get('Domain') else 'localhost'
|
||||
|
||||
# Stash GraphQL endpoint
|
||||
self.url = f'{scheme}://{domain}:{self.port}/graphql'
|
||||
log.debug(f"Using stash GraphQl endpoint at {self.url}")
|
||||
|
||||
self.fragments = fragments
|
||||
|
||||
def __resolveFragments(self, query):
|
||||
|
||||
fragmentRefrences = list(set(re.findall(r'(?<=\.\.\.)\w+', query)))
|
||||
fragments = []
|
||||
for ref in fragmentRefrences:
|
||||
fragments.append({
|
||||
"fragment": ref,
|
||||
"defined": bool(re.search("fragment {}".format(ref), query))
|
||||
})
|
||||
|
||||
if all([f["defined"] for f in fragments]):
|
||||
return query
|
||||
else:
|
||||
for fragment in [f["fragment"] for f in fragments if not f["defined"]]:
|
||||
if fragment not in self.fragments:
|
||||
raise Exception(f'GraphQL error: fragment "{fragment}" not defined')
|
||||
query += self.fragments[fragment]
|
||||
return self.__resolveFragments(query)
|
||||
|
||||
def __callGraphQL(self, query, variables=None):
|
||||
|
||||
query = self.__resolveFragments(query)
|
||||
|
||||
json_request = {'query': query}
|
||||
if variables is not None:
|
||||
json_request['variables'] = variables
|
||||
|
||||
response = requests.post(self.url, json=json_request, headers=self.headers, cookies=self.cookies)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
|
||||
if result.get("errors"):
|
||||
for error in result["errors"]:
|
||||
log.debug(f"GraphQL error: {error}")
|
||||
if result.get("error"):
|
||||
for error in result["error"]["errors"]:
|
||||
log.debug(f"GraphQL error: {error}")
|
||||
if result.get("data"):
|
||||
return result['data']
|
||||
elif response.status_code == 401:
|
||||
sys.exit("HTTP Error 401, Unauthorised. Cookie authentication most likely failed")
|
||||
else:
|
||||
raise ConnectionError(
|
||||
"GraphQL query failed:{} - {}. Query: {}. Variables: {}".format(
|
||||
response.status_code, response.content, query, variables)
|
||||
)
|
||||
|
||||
def __match_alias_item(self, search, items):
|
||||
item_matches = {}
|
||||
for item in items:
|
||||
if re.match(rf'{search}$', item.name, re.IGNORECASE):
|
||||
log.debug(f'matched "{search}" to "{item.name}" ({item.id}) using primary name')
|
||||
item_matches[item.id] = item
|
||||
if not item.aliases:
|
||||
continue
|
||||
for alias in item.aliases:
|
||||
if re.match(rf'{search}$', alias.strip(), re.IGNORECASE):
|
||||
log.debug(f'matched "{search}" to "{alias}" ({item.id}) using alias')
|
||||
item_matches[item.id] = item
|
||||
return list(item_matches.values())
|
||||
|
||||
def get_db_path(self):
|
||||
query = """
|
||||
query Configuration {
|
||||
configuration {
|
||||
general{
|
||||
databasePath
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
result = self.__callGraphQL(query)
|
||||
return result['configuration']['general']['databasePath']
|
||||
@@ -1,63 +1,247 @@
|
||||
|
||||
import os, sys, json, sqlite3
|
||||
import os, re, sys, copy, json, requests
|
||||
|
||||
# local deps
|
||||
import log
|
||||
from stash_interface import StashInterface
|
||||
|
||||
# external deps
|
||||
# local dependencies
|
||||
import config
|
||||
# external dependencies
|
||||
from pyvis.network import Network
|
||||
|
||||
class ro_stash_db:
|
||||
def __init__(self, db_path):
|
||||
self.conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
|
||||
class StashLogger:
|
||||
# Log messages sent from a script scraper instance are transmitted via stderr and are
|
||||
# encoded with a prefix consisting of special character SOH, then the log
|
||||
# level (one of t, d, i, w or e - corresponding to trace, debug, info,
|
||||
# warning and error levels respectively), then special character
|
||||
# STX.
|
||||
#
|
||||
# The log.trace, log.debug, log.info, log.warning, and log.error methods, and their equivalent
|
||||
# formatted methods are intended for use by script scraper instances to transmit log
|
||||
# messages.
|
||||
#
|
||||
def __log(self, level_char: bytes, s):
|
||||
if level_char:
|
||||
lvl_char = "\x01{}\x02".format(level_char.decode())
|
||||
s = re.sub(r"data:image.+?;base64(.+?')","[...]",str(s))
|
||||
for x in s.split("\n"):
|
||||
print(lvl_char, x, file=sys.stderr, flush=True)
|
||||
def trace(self, s):
|
||||
self.__log(b't', s)
|
||||
def debug(self, s):
|
||||
self.__log(b'd', s)
|
||||
def info(self, s):
|
||||
self.__log(b'i', s)
|
||||
def warning(self, s):
|
||||
self.__log(b'w', s)
|
||||
def error(self, s):
|
||||
self.__log(b'e', s)
|
||||
def progress(self, p):
|
||||
progress = min(max(0, p), 1)
|
||||
self.__log(b'p', str(progress))
|
||||
|
||||
def get_tag_relations(self):
|
||||
cur = self.conn.cursor()
|
||||
cur.execute("SELECT parent_id, child_id FROM tags_relations")
|
||||
return cur.fetchall()
|
||||
class StashInterface:
|
||||
port = ""
|
||||
url = ""
|
||||
headers = {
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Connection": "keep-alive",
|
||||
"DNT": "1"
|
||||
}
|
||||
cookies = {}
|
||||
|
||||
def get_tag(self, tag_id):
|
||||
cur = self.conn.cursor()
|
||||
cur.execute(f"SELECT id, name FROM tags WHERE id={tag_id}")
|
||||
return cur.fetchone()
|
||||
def __init__(self, conn, fragments={}):
|
||||
global log
|
||||
|
||||
if conn.get("Logger"):
|
||||
log = conn.get("Logger")
|
||||
else:
|
||||
raise Exception("No logger passed to StashInterface")
|
||||
|
||||
self.port = conn['Port'] if conn.get('Port') else '9999'
|
||||
scheme = conn['Scheme'] if conn.get('Scheme') else 'http'
|
||||
|
||||
def main():
|
||||
global stash, stash_db
|
||||
api_key = conn.get("ApiKey")
|
||||
if api_key:
|
||||
self.headers["ApiKey"] = api_key
|
||||
|
||||
json_input = json.loads(sys.stdin.read())
|
||||
# Session cookie for authentication
|
||||
self.cookies = {}
|
||||
if conn.get('SessionCookie'):
|
||||
self.cookies.update({
|
||||
'session': conn['SessionCookie']['Value']
|
||||
})
|
||||
|
||||
stash = StashInterface(json_input["server_connection"])
|
||||
stash_db = ro_stash_db(stash.get_db_path())
|
||||
domain = conn['Domain'] if conn.get('Domain') else 'localhost'
|
||||
|
||||
create_graph()
|
||||
# Stash GraphQL endpoint
|
||||
self.url = f'{scheme}://{domain}:{self.port}/graphql'
|
||||
|
||||
try:
|
||||
self.get_stash_config()
|
||||
except Exception:
|
||||
log.error(f"Could not connect to Stash at {self.url}")
|
||||
sys.exit()
|
||||
|
||||
log.info(f"Using Stash's GraphQl endpoint at {self.url}")
|
||||
|
||||
self.fragments = fragments
|
||||
|
||||
def __resolveFragments(self, query):
|
||||
|
||||
fragmentReferences = list(set(re.findall(r'(?<=\.\.\.)\w+', query)))
|
||||
fragments = []
|
||||
for ref in fragmentReferences:
|
||||
fragments.append({
|
||||
"fragment": ref,
|
||||
"defined": bool(re.search("fragment {}".format(ref), query))
|
||||
})
|
||||
|
||||
if all([f["defined"] for f in fragments]):
|
||||
return query
|
||||
else:
|
||||
for fragment in [f["fragment"] for f in fragments if not f["defined"]]:
|
||||
if fragment not in self.fragments:
|
||||
raise Exception(f'GraphQL error: fragment "{fragment}" not defined')
|
||||
query += self.fragments[fragment]
|
||||
return self.__resolveFragments(query)
|
||||
|
||||
def __callGraphQL(self, query, variables=None):
|
||||
|
||||
query = self.__resolveFragments(query)
|
||||
|
||||
json_request = {'query': query}
|
||||
if variables is not None:
|
||||
json_request['variables'] = variables
|
||||
|
||||
response = requests.post(self.url, json=json_request, headers=self.headers, cookies=self.cookies)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
|
||||
if result.get("errors"):
|
||||
for error in result["errors"]:
|
||||
log.error(f"GraphQL error: {error}")
|
||||
if result.get("error"):
|
||||
for error in result["error"]["errors"]:
|
||||
log.error(f"GraphQL error: {error}")
|
||||
if result.get("data"):
|
||||
return result['data']
|
||||
elif response.status_code == 401:
|
||||
sys.exit("HTTP Error 401, Unauthorized. Cookie authentication most likely failed")
|
||||
else:
|
||||
raise ConnectionError(
|
||||
"GraphQL query failed:{} - {}. Query: {}. Variables: {}".format(
|
||||
response.status_code, response.content, query, variables)
|
||||
)
|
||||
|
||||
def __match_alias_item(self, search, items):
|
||||
item_matches = {}
|
||||
for item in items:
|
||||
if re.match(rf'{search}$', item.name, re.IGNORECASE):
|
||||
log.debug(f'matched "{search}" to "{item.name}" ({item.id}) using primary name')
|
||||
item_matches[item.id] = item
|
||||
if not item.aliases:
|
||||
continue
|
||||
for alias in item.aliases:
|
||||
if re.match(rf'{search}$', alias.strip(), re.IGNORECASE):
|
||||
log.debug(f'matched "{search}" to "{alias}" ({item.id}) using alias')
|
||||
item_matches[item.id] = item
|
||||
return list(item_matches.values())
|
||||
|
||||
def get_stash_config(self):
|
||||
query = """
|
||||
query Configuration {
|
||||
configuration { general { stashes{ path } } }
|
||||
}
|
||||
"""
|
||||
result = self.__callGraphQL(query)
|
||||
return result['configuration']
|
||||
|
||||
def get_tags_with_relations(self):
|
||||
query = """
|
||||
query FindTags($filter: FindFilterType, $tag_filter: TagFilterType) {
|
||||
findTags(filter: $filter, tag_filter: $tag_filter) {
|
||||
count
|
||||
tags {
|
||||
id
|
||||
name
|
||||
parents { id }
|
||||
children { id }
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
variables = {
|
||||
"tag_filter":{
|
||||
"child_count":{"modifier": "GREATER_THAN", "value": 0},
|
||||
"OR": {
|
||||
"parent_count": {"modifier": "GREATER_THAN", "value": 0}}
|
||||
},
|
||||
"filter": {"q":"", "per_page":-1}
|
||||
}
|
||||
result = self.__callGraphQL(query, variables)
|
||||
return result['findTags']['tags']
|
||||
|
||||
def script_init():
|
||||
import logging as log
|
||||
log.basicConfig(level=log.INFO, format='%(levelname)s: %(message)s')
|
||||
stash_connection = config.STASH_SETTINGS
|
||||
stash_connection["Logger"] = log
|
||||
generate_graph(stash_connection)
|
||||
|
||||
def plugin_init():
|
||||
log = StashLogger()
|
||||
stash_connection = json.loads(sys.stdin.read())["server_connection"]
|
||||
stash_connection["Logger"] = log
|
||||
generate_graph(stash_connection)
|
||||
print(json.dumps({"output":"ok"}))
|
||||
|
||||
def generate_graph(stash_connection):
|
||||
log = stash_connection["Logger"]
|
||||
|
||||
stash = StashInterface(stash_connection)
|
||||
log.info("getting tags from stash...")
|
||||
tags = stash.get_tags_with_relations()
|
||||
|
||||
log.info("generating graph...")
|
||||
if config.SHOW_OPTIONS:
|
||||
G = Network(directed=True, height="100%", width="66%", bgcolor="#202b33", font_color="white")
|
||||
G.show_buttons()
|
||||
else:
|
||||
G = Network(directed=True, height="100%", width="100%", bgcolor="#202b33", font_color="white")
|
||||
|
||||
node_theme = {
|
||||
"border": "#adb5bd",
|
||||
"background":"#394b59",
|
||||
"highlight":{
|
||||
"border": "#137cbd",
|
||||
"background":"#FFFFFF"
|
||||
}
|
||||
}
|
||||
edge_theme = {
|
||||
"color": "#FFFFFF",
|
||||
"highlight":"#137cbd"
|
||||
}
|
||||
|
||||
# create all nodes
|
||||
for tag in tags:
|
||||
G.add_node(tag["id"], label=tag["name"], color=node_theme )
|
||||
# create all edges
|
||||
for tag in tags:
|
||||
for child in tag["children"]:
|
||||
G.add_edge( tag["id"], child["id"], color=edge_theme )
|
||||
|
||||
|
||||
def create_graph():
|
||||
G = Network(height="1080px",width="1080px", directed=True)
|
||||
|
||||
for relation in stash_db.get_tag_relations():
|
||||
parent, child = relation
|
||||
|
||||
parent_id, parent_name = stash_db.get_tag(parent)
|
||||
child_id, child_name = stash_db.get_tag(child)
|
||||
|
||||
G.add_node(parent_id, label=parent_name)
|
||||
G.add_node(child_id, label=child_name)
|
||||
|
||||
G.add_edge(child_id, parent_id)
|
||||
|
||||
curr_path = os.path.dirname(os.path.abspath(__file__))
|
||||
save_path = os.path.join(curr_path, "tag_graph.html")
|
||||
current_abs_path = os.path.dirname(os.path.abspath(__file__))
|
||||
save_path = os.path.join(current_abs_path, "tag_graph.html")
|
||||
|
||||
G.save_graph(save_path)
|
||||
log.info(f"saved graph to {save_path}")
|
||||
log.info(f'saved graph to "{save_path}"')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
if len(sys.argv) > 1:
|
||||
script_init()
|
||||
else:
|
||||
plugin_init()
|
||||
@@ -1,6 +1,6 @@
|
||||
name: Tag Graph
|
||||
description: Creates a visual of the Tag relations
|
||||
version: 0.1
|
||||
version: 0.2
|
||||
exec:
|
||||
- python
|
||||
- "{pluginDir}/tag_graph.py"
|
||||
@@ -9,4 +9,4 @@ tasks:
|
||||
- name: Generate Graph
|
||||
description: generates graph from current tag data
|
||||
defaultArgs:
|
||||
mode: gen
|
||||
mode: generate
|
||||
@@ -1,18 +0,0 @@
|
||||
|
||||
# Tag Graph Generator
|
||||
|
||||
## Requirements
|
||||
* python >= 3.7.X
|
||||
* `pip install -r requirements.txt`
|
||||
|
||||
## Usage
|
||||
|
||||
ensure `STASH_SETTINGS` is configured properly
|
||||
> **⚠️ Note:** if you are connecting to a remote/docker instance of stash you will need to change this
|
||||
|
||||
run `python .\tag_graph.py`
|
||||
|
||||
## Customizing the graph
|
||||
set `SHOW_OPTIONS` to `True` and you will get an interface to play around with that will affect what the graph looks like.
|
||||
|
||||
for more info see [pyvis docs](https://pyvis.readthedocs.io/en/latest/tutorial.html#using-the-configuration-ui-to-dynamically-tweak-network-settings)
|
||||
@@ -1,2 +0,0 @@
|
||||
pyvis==0.1.9
|
||||
requests==2.25.1
|
||||
@@ -1,147 +0,0 @@
|
||||
import re, sys, requests
|
||||
|
||||
class StashInterface:
|
||||
port = ""
|
||||
url = ""
|
||||
headers = {
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Connection": "keep-alive",
|
||||
"DNT": "1"
|
||||
}
|
||||
cookies = {}
|
||||
|
||||
def __init__(self, conn, fragments={}):
|
||||
global log
|
||||
|
||||
if conn.get("Logger"):
|
||||
log = conn.get("Logger")
|
||||
else:
|
||||
raise Exception("No logger passed to StashInterface")
|
||||
|
||||
self.port = conn['Port'] if conn.get('Port') else '9999'
|
||||
scheme = conn['Scheme'] if conn.get('Scheme') else 'http'
|
||||
|
||||
api_key = conn.get("ApiKey")
|
||||
if api_key:
|
||||
self.headers["ApiKey"] = api_key
|
||||
|
||||
# Session cookie for authentication
|
||||
self.cookies = {}
|
||||
if conn.get('SessionCookie'):
|
||||
self.cookies.update({
|
||||
'session': conn['SessionCookie']['Value']
|
||||
})
|
||||
|
||||
domain = conn['Domain'] if conn.get('Domain') else 'localhost'
|
||||
|
||||
# Stash GraphQL endpoint
|
||||
self.url = f'{scheme}://{domain}:{self.port}/graphql'
|
||||
|
||||
try:
|
||||
self.get_stash_config()
|
||||
except Exception:
|
||||
log.error(f"Could not connect to Stash at {self.url}")
|
||||
sys.exit()
|
||||
|
||||
log.info(f"Using Stash's GraphQl endpoint at {self.url}")
|
||||
|
||||
self.fragments = fragments
|
||||
|
||||
def __resolveFragments(self, query):
|
||||
|
||||
fragmentReferences = list(set(re.findall(r'(?<=\.\.\.)\w+', query)))
|
||||
fragments = []
|
||||
for ref in fragmentReferences:
|
||||
fragments.append({
|
||||
"fragment": ref,
|
||||
"defined": bool(re.search("fragment {}".format(ref), query))
|
||||
})
|
||||
|
||||
if all([f["defined"] for f in fragments]):
|
||||
return query
|
||||
else:
|
||||
for fragment in [f["fragment"] for f in fragments if not f["defined"]]:
|
||||
if fragment not in self.fragments:
|
||||
raise Exception(f'GraphQL error: fragment "{fragment}" not defined')
|
||||
query += self.fragments[fragment]
|
||||
return self.__resolveFragments(query)
|
||||
|
||||
def __callGraphQL(self, query, variables=None):
|
||||
|
||||
query = self.__resolveFragments(query)
|
||||
|
||||
json_request = {'query': query}
|
||||
if variables is not None:
|
||||
json_request['variables'] = variables
|
||||
|
||||
response = requests.post(self.url, json=json_request, headers=self.headers, cookies=self.cookies)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
|
||||
if result.get("errors"):
|
||||
for error in result["errors"]:
|
||||
log.error(f"GraphQL error: {error}")
|
||||
if result.get("error"):
|
||||
for error in result["error"]["errors"]:
|
||||
log.error(f"GraphQL error: {error}")
|
||||
if result.get("data"):
|
||||
return result['data']
|
||||
elif response.status_code == 401:
|
||||
sys.exit("HTTP Error 401, Unauthorized. Cookie authentication most likely failed")
|
||||
else:
|
||||
raise ConnectionError(
|
||||
"GraphQL query failed:{} - {}. Query: {}. Variables: {}".format(
|
||||
response.status_code, response.content, query, variables)
|
||||
)
|
||||
|
||||
def __match_alias_item(self, search, items):
|
||||
item_matches = {}
|
||||
for item in items:
|
||||
if re.match(rf'{search}$', item.name, re.IGNORECASE):
|
||||
log.debug(f'matched "{search}" to "{item.name}" ({item.id}) using primary name')
|
||||
item_matches[item.id] = item
|
||||
if not item.aliases:
|
||||
continue
|
||||
for alias in item.aliases:
|
||||
if re.match(rf'{search}$', alias.strip(), re.IGNORECASE):
|
||||
log.debug(f'matched "{search}" to "{alias}" ({item.id}) using alias')
|
||||
item_matches[item.id] = item
|
||||
return list(item_matches.values())
|
||||
|
||||
def get_stash_config(self):
|
||||
query = """
|
||||
query Configuration {
|
||||
configuration { general { stashes{ path } } }
|
||||
}
|
||||
"""
|
||||
result = self.__callGraphQL(query)
|
||||
return result['configuration']
|
||||
|
||||
def get_tags_with_relations(self):
|
||||
query = """
|
||||
query FindTags($filter: FindFilterType, $tag_filter: TagFilterType) {
|
||||
findTags(filter: $filter, tag_filter: $tag_filter) {
|
||||
count
|
||||
tags {
|
||||
id
|
||||
name
|
||||
parents { id }
|
||||
children { id }
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
variables = {
|
||||
"tag_filter":{
|
||||
"child_count":{"modifier": "GREATER_THAN", "value": 0},
|
||||
"OR": {
|
||||
"parent_count": {"modifier": "GREATER_THAN", "value": 0}}
|
||||
},
|
||||
"filter": {"q":"", "per_page":-1}
|
||||
}
|
||||
result = self.__callGraphQL(query, variables)
|
||||
return result['findTags']['tags']
|
||||
@@ -1,70 +0,0 @@
|
||||
|
||||
import os, sys, json
|
||||
import logging as log
|
||||
|
||||
# local dependencies
|
||||
from stash_interface import StashInterface
|
||||
# external dependencies
|
||||
from pyvis.network import Network
|
||||
|
||||
|
||||
### USER CONFIG ###
|
||||
STASH_SETTINGS = {
|
||||
"Scheme":"http",
|
||||
"Domain": "localhost",
|
||||
"Port": "9999",
|
||||
"Logger": log,
|
||||
# "ApiKey": "YOUR_API_KEY_HERE",
|
||||
}
|
||||
SHOW_OPTIONS = False
|
||||
|
||||
|
||||
def main():
|
||||
global stash
|
||||
|
||||
log.basicConfig(level=log.INFO, format='%(levelname)s: %(message)s')
|
||||
|
||||
stash = StashInterface(STASH_SETTINGS)
|
||||
|
||||
log.info("getting tags from stash...")
|
||||
tags = stash.get_tags_with_relations()
|
||||
|
||||
log.info("generating graph...")
|
||||
|
||||
|
||||
if SHOW_OPTIONS:
|
||||
G = Network(directed=True, height="100%", width="66%", bgcolor="#202b33", font_color="white")
|
||||
G.show_buttons()
|
||||
else:
|
||||
G = Network(directed=True, height="100%", width="100%", bgcolor="#202b33", font_color="white")
|
||||
|
||||
node_theme = {
|
||||
"border": "#adb5bd",
|
||||
"background":"#394b59",
|
||||
"highlight":{
|
||||
"border": "#137cbd",
|
||||
"background":"#FFFFFF"
|
||||
}
|
||||
}
|
||||
edge_theme = {
|
||||
"color": "#FFFFFF",
|
||||
"highlight":"#137cbd"
|
||||
}
|
||||
|
||||
# create all nodes
|
||||
for tag in tags:
|
||||
G.add_node(tag["id"], label=tag["name"], color=node_theme )
|
||||
# create all edges
|
||||
for tag in tags:
|
||||
for child in tag["children"]:
|
||||
G.add_edge( tag["id"], child["id"], color=edge_theme )
|
||||
|
||||
|
||||
current_abs_path = os.path.dirname(os.path.abspath(__file__))
|
||||
save_path = os.path.join(current_abs_path, "tag_graph.html")
|
||||
|
||||
G.save_graph(save_path)
|
||||
log.info(f'saved graph to "{save_path}"')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user