Star Identifier plugin (#72)

Co-authored-by: DogmaDragon <103123951+DogmaDragon@users.noreply.github.com>
This commit is contained in:
axxeman23 2025-09-29 14:39:47 -04:00 committed by GitHub
parent 4d6f9fdc59
commit ae9a034201
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 811 additions and 0 deletions

View File

@ -0,0 +1,92 @@
# Star Identifier
https://github.com/axxeman23/star_identifier
## Intro
Star Identifier uses [facial recognition](https://github.com/ageitgey/face_recognition) to automatically identify who is in images or scene screenshots from the performers already in your [Stash](https://github.com/stashapp/stash) library.
## Requirements
### Python3
__version: 3.10.x +__
#### Installing Python
1. Download Python [here](https://www.python.org/downloads/)
2. Install & add to your PATH
3. Configure Stash to use Python (if necessary. This can be set in the `System` tab of your `Settings` page)
### Libs & Dependencies
#### CMake
For Windows:
- You'll also need to install Microsoft Visual Studio 2015 (or newer) with C/C++ Compiler installed. [Link here](https://visualstudio.microsoft.com/downloads/)
- Install and add CMake to your PATH. [Link](https://cmake.org/download/)
- For more details, see [this issue](https://github.com/ageitgey/face_recognition/issues/175)
For Mac & Linux:
`brew install cmake`
#### Python Libraries
1. numpy
2. dlib
3. face_recognition
`pip install numpy dlib face_recognition`
For more details, see the [Face Recognition installation instructions](https://github.com/ageitgey/face_recognition#installation).
### Plugin Files
You'll need the following in your `plugins` folder from this repo. Copy `star_identifier.yml` to the `plugins` folder, and the rest of the files to a folder called `py_plugins` inside the `plugins` folder. If you already have `log.py` in `py_plugins`, skip copying that one (it should be the same)
```
star_identifier.yml
py_plugins:
| log.py
| star_identifier_config.py
| star_identifier_interface.py
| star_identifier.py
```
## Config
### Paths
Running the plugin will create a folder. By default, this will be created in your `plugins` folder, but you can change that in the config.
Face encodings will be saved to that new folder. The encodings file will be roughly 1MB per 1,000 performers.
### Stash Settings
Star Identifier uses a tag to find images or scenes you would like identified. By default, that tag is `star identifier`.
Since the recognition is based on a single performer image, that image needs to have a pretty clear front-facing view of the performer's face. If face_recognition fails to find a performer's face, Star Identifier will tag that performer with `star identifier performer error` by default.
### Star Identifier Settings
You can adjust the tolerance for identification here. `0.6` is default and typical, but I've found `0.5` to work well. Lower is more strict.
## Running
### Export Performers
This is the first step. Star Identifier loads each performer's image, encodes their facial features into a numpy array, and saves those arrays. The clearer the face of the performer, the better identification results will be. Performers whose faces are not recognized by face_recognition will be tagged for you to update as desired.
This only needs to be run once, or after new performers are added or have updated images.
### Identify Images
This loads all images in the stash database tagged with `star identifier` (by default), compares the recognized faces to the exported face database, and then adds all potential matches to those images as performers.
### Identify Scene Screenshots
This loads the screenshot for every scene in the stash database tagged with `star identifier` (by default), compares the recognized faces to the exported face database, and then adds all potential matches to those scenes as performers.
## Upcoming roadmap
See [issues](https://github.com/axxeman23/star_identifier/issues)

View File

@ -0,0 +1,52 @@
import sys
# Log messages sent from a plugin instance are transmitted via stderr and are
# encoded with a prefix consisting of special character SOH, then the log
# level (one of t, d, i, w, e, or p - corresponding to trace, debug, info,
# warning, error and progress levels respectively), then special character
# STX.
#
# The LogTrace, LogDebug, LogInfo, LogWarning, and LogError methods, and their equivalent
# formatted methods are intended for use by plugin instances to transmit log
# messages. The LogProgress method is also intended for sending progress data.
#
def __prefix(level_char):
start_level_char = b'\x01'
end_level_char = b'\x02'
ret = start_level_char + level_char + end_level_char
return ret.decode()
def __log(level_char, s):
if level_char == "":
return
print(__prefix(level_char) + s + "\n", file=sys.stderr, flush=True)
def LogTrace(s):
__log(b't', s)
def LogDebug(s):
__log(b'd', s)
def LogInfo(s):
__log(b'i', s)
def LogWarning(s):
__log(b'w', s)
def LogError(s):
__log(b'e', s)
def LogProgress(p):
progress = min(max(0, p), 1)
__log(b'p', str(progress))

View File

@ -0,0 +1,3 @@
face_recognition
numpy
requests

View File

@ -0,0 +1,314 @@
# https://github.com/axxeman23/star_identifier
# built-in
import json
import sys
import os
import pathlib
from concurrent.futures import ProcessPoolExecutor
# external
import urllib.request
import face_recognition
import numpy as np
# local
import log
import star_identifier_config as config
from star_identifier_interface import IdentifierStashInterface
#
# constants
#
current_path = str(config.root_path or pathlib.Path(__file__).parent.absolute())
encoding_export_folder = str(pathlib.Path(current_path + f'/../{config.encodings_folder}/').absolute())
encodings_path = os.path.join(encoding_export_folder, config.encodings_filename)
errors_path = os.path.join(encoding_export_folder, config.encodings_error_filename)
#
# main
#
def main():
json_input = read_json_input()
output = {}
try:
run(json_input)
except Exception as error:
log.LogError(str(error))
return
out = json.dumps(output)
print(out + "\n")
def run(json_input):
log.LogInfo('==> running')
mode_arg = json_input['args']['mode']
client = IdentifierStashInterface(json_input["server_connection"])
match mode_arg:
case "export_known":
export_known(client)
case "identify_imgs":
identify_imgs(client, *load_encodings())
case "identify_scene_screenshots":
identify_scene_screenshots(client, *load_encodings())
case "debug":
debug_func(client)
case _:
export_known(client)
#
# utils
#
def read_json_input():
json_input = sys.stdin.read()
return json.loads(json_input)
def json_print(input, path):
os.makedirs(encoding_export_folder, exist_ok=True)
f = open(path, 'w')
json.dump(input, f)
f.close()
def get_scrape_tag(client, tag_name):
tag_id = client.findTagIdWithName(tag_name)
if tag_id is not None:
return tag_id
else:
client.createTagWithName(tag_name)
tag_id = client.findTagIdWithName(tag_name)
return tag_id
def get_scrape_tag_filter(client):
return {
"tags": {
"value": [get_scrape_tag(client, config.tag_name_identify)],
"modifier": "INCLUDES_ALL"
}
}
def load_encodings():
log.LogInfo("Loading exported face encodings...")
e = Exception(f"Encoding database not found at {encodings_path}. Run Export Performers and try again.")
try:
ids = []
known_face_encodings = []
npz = np.load(encodings_path)
if not len(npz):
raise e
for id in npz:
ids.append(id)
known_face_encodings.append(npz[id])
return [ids, known_face_encodings]
except FileNotFoundError:
raise e
#
# debug
#
def debug_print(input):
f = open(os.path.join(current_path, 'debug.txt'), 'a')
f.write(str(input))
f.close()
def debug_func(client):
f = open(os.path.join(current_path, 'debug.txt'), 'w')
f.close()
#
# export function
#
def export_known(client):
log.LogInfo('Getting all performer images...')
performers = client.getPerformerImages()
total = len(performers)
log.LogInfo(f"Found {total} performers")
if total == 0:
log.LogError('No performers found.')
return
os.makedirs(encoding_export_folder, exist_ok=True)
count = 0
outputDict = {}
errorList = []
log.LogInfo('Starting performer image export (this might take a while)')
futures_list = []
with ProcessPoolExecutor(max_workers=10) as executor:
for performer in performers:
futures_list.append(executor.submit(encode_performer_from_url, performer))
for future in futures_list:
log.LogProgress(count / total)
try:
result = future.result()
outputDict[result['id']] = result['encodings']
except IndexError:
log.LogInfo(f"No face found for {result['name']}")
errorList.append({ 'id': result['id'], 'name': result['name'] })
count += 1
np.savez(encodings_path, **outputDict)
json_print(errorList, errors_path)
log.LogInfo(f'Finished exporting all {total} performer images. Failed recognitions saved to {str(errors_path)}.')
error_tag = get_scrape_tag(client, config.tag_name_encoding_error)
error_ids = list(map(lambda entry: entry['id'], errorList))
log.LogInfo(f"Tagging failed performer exports with {config.tag_name_encoding_error}...")
client.bulkPerformerAddTags(error_ids, [error_tag])
#
# Facial recognition functions
#
# Encoding
def encode_performer_from_url(performer):
image = face_recognition.load_image_file(urllib.request.urlopen(performer['image_path']))
performer['encodings'] = face_recognition.face_encodings(image)[0]
return performer
# Matching
def get_recognized_ids_from_image(image, known_face_encodings, ids):
image['matched_ids'] = get_recognized_ids(face_recognition.load_image_file(image['path']), known_face_encodings, ids)
return image
def get_recognized_ids_from_scene_screenshot(scene, known_face_encodings, ids):
image = urllib.request.urlopen(scene['paths']['screenshot'])
scene['matched_ids'] = get_recognized_ids(face_recognition.load_image_file(image), known_face_encodings, ids)
return scene
def get_recognized_ids(image_file, known_face_encodings, ids):
unknown_face_encodings = face_recognition.face_encodings(image_file)
recognized_ids = np.empty((0,0), int)
for unknown_face in unknown_face_encodings:
results = face_recognition.compare_faces(known_face_encodings, unknown_face, tolerance=config.tolerance)
recognized_ids = np.append(recognized_ids, [ids[i] for i in range(len(results)) if results[i] == True])
return np.unique(recognized_ids).tolist()
# Execution
def execute_identification_list(known_face_encodings, ids, args):
count = 0
futures_list = []
with ProcessPoolExecutor(max_workers=10) as executor:
for item in args['items']:
futures_list.append(executor.submit(args['executor_func'], *[item, known_face_encodings, ids]))
for future in futures_list:
log.LogProgress(count / args['total'])
debug_print(future)
try:
result = future.result()
if not len(result['matched_ids']):
log.LogInfo(f"No matching performer found for {args['name']} id {result['id']}. Moving on to next {args['name']}...")
else:
log.LogDebug(f"updating {args['name']} {result['id']} with ")
args['submit_func'](result['id'], result['matched_ids'])
except IndexError:
log.LogError(f"No face found in tagged {args['name']} id {result['id']}. Moving on to next {args['name']}...")
except:
log.LogError(f"Unknown error comparing tagged {args['name']} id {result['id']}. Moving on to next {args['name']}...")
count += 1
# Imgs
def identify_imgs(client, ids, known_face_encodings):
log.LogInfo(f"Getting images tagged with '{config.tag_name_identify}'...")
images = client.findImages(get_scrape_tag_filter(client))
total = len(images)
if not total:
log.LogError(f"No tagged images found. Tag images with '{config.tag_name_identify}', then try again.")
return
log.LogInfo(f"Found {total} tagged images. Starting identification...")
execution_args = {
'name': 'image',
'items': images,
'total': total,
'executor_func': get_recognized_ids_from_image,
'submit_func': client.addPerformersToImage
}
execute_identification_list(
known_face_encodings,
ids,
execution_args
)
log.LogInfo('Image identification complete!')
# Scenes
def identify_scene_screenshots(client, ids, known_face_encodings):
log.LogInfo(f"Getting scenes tagged with '{config.tag_name_identify}'...")
scenes = client.getScenePaths(get_scrape_tag_filter(client))
total = len(scenes)
if not total:
log.LogError(f"No tagged scenes found. Tag scenes with '{config.tag_name_identify}', then try again.")
return
log.LogInfo(f"Found {total} tagged scenes. Starting identification...")
execution_args = {
'name': 'scene',
'items': scenes,
'total': total,
'executor_func': get_recognized_ids_from_scene_screenshot,
'submit_func': client.addPerformersToScene
}
execute_identification_list(
known_face_encodings,
ids,
execution_args
)
log.LogInfo("Scene screenshot identification complete!")
if __name__ == "__main__":
main()
# https://github.com/ageitgey/face_recognition
# https://github.com/ageitgey/face_recognition/issues/175

View File

@ -0,0 +1,21 @@
name: Star Identifier
description: Use facial recognition to automatically identify who is in images or scene screenshots from the performers already in your Stash library.
version: 1.0
url: https://github.com/axxeman23/star_identifier
exec:
- python
- "{pluginDir}/py_plugins/star_identifier.py"
interface: raw
tasks:
- name: Export Performers
description: Run this first! Exports current performer images and adds them to an encoding file for recognition.
defaultArgs:
mode: export_known
- name: Identify Images
description: Compares images tagged with 'star identifier' (by default) to exported performers, and adds all possible matches to the images.
defaultArgs:
mode: identify_imgs
- name: Identify Scene Screenshots
description: Compares scene screenshots tagged with 'star identifier' (by default) to exported performers, and adds all possible matches to the scenes.
defaultArgs:
mode: identify_scene_screenshots

View File

@ -0,0 +1,27 @@
#
# Paths
#
root_path = '' # defaults to plugins folder
encodings_folder = 'star-identifier-encodings'
encodings_filename = 'star-identifier-encodings.npz'
encodings_error_filename = 'errors.json'
#
# Stash Settings
#
# The identifier will run on images / scenes tagged with this
tag_name_identify = 'star identifier'
# If the identifier can't find a face for a performer,
# it will add this tag to that performer
tag_name_encoding_error = 'star identifier performer error'
#
# Star Identifier Settings
#
# Tolerance: How much distance between faces to consider it a match.
# Lower is more strict. 0.6 is typical best performance.
tolerance = 0.6

View File

@ -0,0 +1,302 @@
# most of this copied from https://github.com/niemands/StashPlugins
import requests
import sys
import log
class IdentifierStashInterface:
port = ""
url = ""
headers = {
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/json",
"Accept": "application/json",
"Connection": "keep-alive",
"DNT": "1"
}
cookies = {}
#
# Init
#
def __init__(self, conn):
self.port = conn['Port']
scheme = conn['Scheme']
# Session cookie for authentication
self.cookies = {
'session': conn.get('SessionCookie').get('Value')
}
try:
# If stash does not accept connections from all interfaces use the host specified in the config
host = conn.get('Host') if '0.0.0.0' not in conn.get('Host') or '' else 'localhost'
except TypeError:
# Pre stable 0.8
host = 'localhost'
# Stash GraphQL endpoint
self.url = scheme + "://" + host + ":" + str(self.port) + "/graphql"
log.LogDebug(f"Using stash GraphQl endpoint at {self.url}")
def __callGraphQL(self, query, variables=None):
json = {'query': query}
if variables is not None:
json['variables'] = variables
response = requests.post(self.url, json=json, headers=self.headers, cookies=self.cookies)
if response.status_code == 200:
result = response.json()
if result.get("error", None):
for error in result["error"]["errors"]:
raise Exception("GraphQL error: {}".format(error))
if result.get("data", None):
return result.get("data")
elif response.status_code == 401:
sys.exit("HTTP Error 401, Unauthorised. Cookie authentication most likely failed")
else:
raise ConnectionError(
"GraphQL query failed:{} - {}. Query: {}. Variables: {}".format(
response.status_code, response.content, query, variables)
)
#
# Queries
#
# Performers
def getPerformerImages(self, performer_filter=None):
return self.__getPerformerImages(performer_filter)
def __getPerformerImages(self, performer_filter=None, page=1):
per_page = 1000
query = """
query($per_page: Int, $page: Int, $performer_filter: PerformerFilterType) {
findPerformers(
performer_filter: $performer_filter
filter: { per_page: $per_page, page: $page }
) {
count
performers {
id
name
image_path
}
}
}
"""
variables = {
'per_page': per_page,
'page': page
}
if performer_filter:
variables['performer_filter'] = performer_filter
result = self.__callGraphQL(query, variables)
performers = result.get('findPerformers').get('performers')
if len(performers) == per_page:
next_page = self.__getPerformerImages(performer_filter, page + 1)
for performer in next_page:
performers.append(performer)
return performers
# Tags
def findTagIdWithName(self, name):
query = """
query($name: String!) {
findTags(
tag_filter: {
name: {value: $name, modifier: EQUALS}
}
){
tags{
id
name
}
}
}
"""
variables = {
'name': name,
}
result = self.__callGraphQL(query, variables)
if result.get('findTags') is not None and result.get('findTags').get('tags') != []:
return result.get('findTags').get('tags')[0].get('id')
return None
# Images
def findImages(self, image_filter=None):
return self.__findImages(image_filter)
def __findImages(self, image_filter=None, page=1):
per_page = 1000
query = """
query($per_page: Int, $page: Int, $image_filter: ImageFilterType) {
findImages(
image_filter: $image_filter,
filter: { per_page: $per_page, page: $page }
) {
count
images {
id
path
performers {
id
}
}
}
}
"""
variables = {
'per_page': per_page,
'page': page
}
if image_filter:
variables['image_filter'] = image_filter
result = self.__callGraphQL(query, variables)
images = result.get('findImages').get('images')
if len(images) == per_page:
next_page = self.__findImages(image_filter, page + 1)
for image in next_page:
images.append(image)
return images
# Scenes
def getScenePaths(self, scene_filter=None):
return self.__getScenePaths(scene_filter)
def __getScenePaths(self, scene_filter=None, page=1):
per_page = 1000
query = """
query($per_page: Int, $page: Int, $scene_filter: SceneFilterType) {
findScenes(
scene_filter: $scene_filter,
filter: { per_page: $per_page, page: $page }
) {
count
scenes {
id
paths {
screenshot
stream
}
}
}
}
"""
variables = {
'per_page': per_page,
'page': page
}
if scene_filter:
variables['scene_filter'] = scene_filter
result = self.__callGraphQL(query, variables)
scenes = result.get('findScenes').get('scenes')
if len(scenes) == 1000:
next_page = self.__getScenePaths(scene_filter, page + 1)
for scene in next_page:
scenes.append(scene)
return scenes
#
# Mutations
#
def createTagWithName(self, name):
query = """
mutation tagCreate($input:TagCreateInput!) {
tagCreate(input: $input){
id
}
}
"""
variables = {'input': {
'name': name
}}
result = self.__callGraphQL(query, variables)
if result.get('tagCreate'):
log.LogDebug(f"Created tag: {name}")
return result.get('tagCreate').get("id")
else:
log.LogError(f"Could not create tag: {name}")
return None
def updateImage(self, image_data):
query = """
mutation($input: ImageUpdateInput!) {
imageUpdate(input: $input) {
id
}
}
"""
variables = {'input': image_data}
self.__callGraphQL(query, variables)
def addPerformersToImage(self, image_id, performer_ids):
self.updateImage({
'id': image_id,
'performer_ids': performer_ids
})
def bulkPerformerAddTags(self, performer_ids, tag_ids):
query = """
mutation($ids: [ID!], $tag_ids: BulkUpdateIds) {
bulkPerformerUpdate(input: { ids: $ids, tag_ids: $tag_ids }) {
id
}
}
"""
variables = {
"ids": performer_ids,
"tag_ids": {
"ids": tag_ids,
"mode": 'ADD'
}
}
self.__callGraphQL(query, variables)
def addPerformersToScene(self, scene_id, performer_ids):
query = """
mutation BulkSceneUpdate($ids: [ID!], $performer_ids: BulkUpdateIds) {
bulkSceneUpdate(input: { ids: $ids, performer_ids: $performer_ids}) {
id
}
}
"""
variables = {
"ids": [scene_id],
"performer_ids": {
"ids": performer_ids,
"mode": "ADD"
}
}
self.__callGraphQL(query, variables)