mirror of
https://github.com/stashapp/CommunityScrapers.git
synced 2025-12-12 15:34:43 -06:00
957 lines
33 KiB
Python
957 lines
33 KiB
Python
import json
|
|
import re
|
|
import sys
|
|
import difflib
|
|
import requests
|
|
from datetime import datetime
|
|
from html import unescape
|
|
from typing import Any, Callable
|
|
from urllib.parse import urlparse
|
|
|
|
import py_common.log as log
|
|
from py_common.util import dig, guess_nationality, scraper_args
|
|
from py_common.config import get_config
|
|
from py_common.types import (
|
|
ScrapedGallery,
|
|
ScrapedMovie,
|
|
ScrapedPerformer,
|
|
ScrapedScene,
|
|
ScrapedStudio,
|
|
ScrapedTag,
|
|
)
|
|
import AyloAPI.domains as domains
|
|
from AyloAPI.slugger import slugify
|
|
|
|
config = get_config(
|
|
default="""
|
|
# User Agent to use for the requests
|
|
user_agent = Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:79.0) Gecko/20100101 Firefox/79.0
|
|
|
|
# Scrape markers when using 'Scrape with...'
|
|
scrape_markers = False
|
|
|
|
# Minimum similarity ratio to consider a match when searching
|
|
minimum_similarity = 0.75
|
|
|
|
# Debug mode will save the latest API response to disk
|
|
debug = False
|
|
"""
|
|
)
|
|
|
|
|
|
def default_postprocess(obj: Any, _) -> Any:
|
|
return obj
|
|
|
|
|
|
## Temporary function to add markers to scenes, remove when/if Stash gets native support
|
|
def add_markers(scene_id: str, markers: list[dict]):
|
|
from itertools import tee, filterfalse
|
|
|
|
def partition(pred, iterable):
|
|
t1, t2 = tee(iterable)
|
|
return list(filter(pred, t2)), list(filterfalse(pred, t1))
|
|
|
|
from py_common.graphql import callGraphQL
|
|
|
|
def format_time(seconds: int) -> str:
|
|
if seconds > 3600:
|
|
return f"{seconds // 3600}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
|
|
return f"{(seconds // 60) % 60}:{seconds % 60:02}"
|
|
|
|
raw_tags = callGraphQL("query allTags { allTags { name id aliases } }")
|
|
if not raw_tags:
|
|
log.error("Failed to get tags from Stash")
|
|
return
|
|
|
|
tags = {tag["name"].lower(): tag["id"] for tag in raw_tags["allTags"]}
|
|
tags |= {
|
|
alias.lower(): tag["id"]
|
|
for tag in raw_tags["allTags"]
|
|
for alias in tag["aliases"]
|
|
}
|
|
existing_markers = callGraphQL(
|
|
"query FindScene($id: ID!){ findScene(id: $id) { scene_markers { title seconds } } }",
|
|
{"id": scene_id},
|
|
)
|
|
if not existing_markers:
|
|
log.error("Failed to get existing markers from Stash")
|
|
return
|
|
|
|
existing_markers = existing_markers["findScene"]["scene_markers"]
|
|
|
|
valid, invalid = partition(lambda m: m["name"].lower() in tags, markers)
|
|
if invalid:
|
|
invalid_tags = ", ".join({m["name"] for m in invalid})
|
|
log.debug(f"Skipping {len(invalid)} markers, tags do not exist: {invalid_tags}")
|
|
|
|
log.debug(f"Adding {len(valid)} out of {len(markers)} markers to scene {scene_id}")
|
|
create_query = "mutation SceneMarkerCreate($input: SceneMarkerCreateInput!) { sceneMarkerCreate(input: $input) {id}}"
|
|
for marker in sorted(valid, key=lambda m: m["seconds"]):
|
|
name = marker["name"]
|
|
seconds = marker["seconds"]
|
|
if any(m["seconds"] == marker["seconds"] for m in existing_markers):
|
|
log.debug(
|
|
f"Skipping marker '{name}' at {format_time(seconds)} because it already exists"
|
|
)
|
|
continue
|
|
variables = {
|
|
"input": {
|
|
"title": name,
|
|
"primary_tag_id": tags[name.lower()],
|
|
"seconds": int(seconds),
|
|
"scene_id": scene_id,
|
|
"tag_ids": [],
|
|
}
|
|
}
|
|
callGraphQL(create_query, variables)
|
|
log.debug(f"Added marker '{name}' at {format_time(seconds)}")
|
|
|
|
|
|
# network stuff
|
|
def __raw_request(url, headers) -> requests.Response:
|
|
log.trace(f"Sending GET request to {url}")
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 429:
|
|
log.error(
|
|
"[REQUEST] 429 Too Many Requests: "
|
|
"you have sent too many requests in a given amount of time."
|
|
)
|
|
sys.exit(1)
|
|
|
|
# Even a 404 will contain an instance token
|
|
return response
|
|
|
|
|
|
def __api_request(url: str, headers: dict) -> dict | None:
|
|
result = __raw_request(url, headers)
|
|
api_response = result.json()
|
|
if isinstance(api_response, list):
|
|
api_search_errors = "\n- ".join(
|
|
json.dumps(res, indent=None) for res in api_response
|
|
)
|
|
log.error(f"Errors from API:\n{api_search_errors}")
|
|
return None
|
|
|
|
if config.debug:
|
|
with open("api_response.json", "w", encoding="utf-8") as f:
|
|
json.dump(api_response, f, indent=2)
|
|
|
|
return api_response["result"]
|
|
|
|
|
|
def _create_headers_for(domain: str) -> dict[str, str]:
|
|
# If we haven't stored a token we must provide a function to get one
|
|
def get_instance_token(url: str) -> str | None:
|
|
r = __raw_request(url, {"User-Agent": config.user_agent})
|
|
if r and (token := r.cookies.get("instance_token")):
|
|
return token
|
|
log.error(
|
|
f"Failed to get instance_token from '{url}': "
|
|
"are you sure this site is in the Aylo network?"
|
|
)
|
|
|
|
api_token = domains.get_token_for(domain, fallback=get_instance_token)
|
|
if api_token is None:
|
|
log.error(f"Unable to get an API token for '{domain}'")
|
|
return {}
|
|
|
|
api_headers = {
|
|
"Instance": api_token,
|
|
"User-Agent": config.user_agent,
|
|
"Origin": f"https://{domain}",
|
|
"Referer": f"https://{domain}",
|
|
}
|
|
return api_headers
|
|
|
|
|
|
def _construct_url(api_result: dict) -> str:
|
|
"""
|
|
Tries to construct a valid public URL for an API result
|
|
|
|
This will often result in scene links that point to the parent network site,
|
|
so we might want to add wrapper scrapers that can add the correct URL as well
|
|
|
|
For example, a scene from We Live Together will have an URL for realitykings.com
|
|
but that scene is also on welivetogether.com and that might be considered more canonical
|
|
"""
|
|
|
|
brand = api_result["brand"]
|
|
type_ = api_result["type"]
|
|
id_ = api_result["id"]
|
|
slug = slugify(api_result["title"])
|
|
return f"https://www.{brand}.com/{type_}/{id_}/{slug}"
|
|
|
|
|
|
def _construct_performer_url(api_result: dict, site: str) -> str:
|
|
id_ = api_result["id"]
|
|
slug = slugify(api_result["name"])
|
|
return f"https://www.{site}.com/model/{id_}/{slug}"
|
|
|
|
|
|
## Helper functions for the objects returned from Aylo's API
|
|
def get_studio(api_object: dict) -> ScrapedStudio | None:
|
|
studio_name = dig(api_object, "collections", 0, "name")
|
|
parent_name = dig(api_object, "brandMeta", ("displayName", "name", "shortName"))
|
|
if studio_name:
|
|
if parent_name.lower() != studio_name.lower():
|
|
return {
|
|
"name": studio_name,
|
|
"parent": {"name": parent_name},
|
|
}
|
|
return {"name": studio_name}
|
|
elif parent_name:
|
|
return {"name": parent_name}
|
|
|
|
log.error(f"No studio for {api_object['type']} with id {api_object['id']}")
|
|
return None
|
|
|
|
|
|
# As documented by AdultSun, these tag IDs appear to be neutral but
|
|
# are actually gendered so we can map them to their gender-specific counterparts
|
|
tags_map = {
|
|
90: "Athletic Woman",
|
|
107: "White Woman",
|
|
112: "Black Woman",
|
|
113: "European Woman",
|
|
121: "Latina Woman",
|
|
125: "Black Hair (Female)",
|
|
126: "Blond Hair (Female)",
|
|
127: "Brown Hair (Female)",
|
|
128: "Red Hair (Female)",
|
|
215: "Rimming Him",
|
|
274: "Rimming Her",
|
|
374: "Black Man",
|
|
376: "European Man",
|
|
377: "Latino Man",
|
|
378: "White Man",
|
|
379: "Black Hair (Male)",
|
|
380: "Blond Hair (Male)",
|
|
381: "Brown Hair (Male)",
|
|
383: "Red Hair (Male)",
|
|
385: "Shaved Head",
|
|
386: "Short Hair (Male)",
|
|
}
|
|
|
|
|
|
def to_tag(api_object: dict) -> ScrapedTag:
|
|
mapped_tag = tags_map.get(api_object["id"], api_object["name"].strip())
|
|
return {"name": mapped_tag}
|
|
|
|
|
|
def to_tags(api_object: dict) -> list[ScrapedTag]:
|
|
tags = api_object.get("tags", [])
|
|
return [to_tag(x) for x in tags if "name" in x or x.get("id") in tags_map.keys()]
|
|
|
|
|
|
def to_marker(api_object: dict) -> dict:
|
|
return {
|
|
**to_tag(api_object),
|
|
"seconds": api_object["startTime"],
|
|
}
|
|
|
|
|
|
## Helper functions to convert from Aylo's API to Stash's scaper return types
|
|
def to_scraped_performer(
|
|
performer_from_api: dict, site: str | None = None
|
|
) -> ScrapedPerformer:
|
|
if (type_ := dig(performer_from_api, "brand")) and type_ not in (
|
|
"actorsandtags",
|
|
# Older sites use this type
|
|
"phpactors",
|
|
):
|
|
wrong_type = performer_from_api.get("type", "mystery")
|
|
wrong_id = performer_from_api.get("id", "unknown")
|
|
log.error(f"Attempted to scrape a '{wrong_type}' (ID: {wrong_id}) as a scene.")
|
|
raise ValueError("Invalid performer from API")
|
|
# This is all we get when scraped as part of a scene or movie
|
|
performer: ScrapedPerformer = {
|
|
"name": performer_from_api["name"],
|
|
"gender": performer_from_api["gender"],
|
|
}
|
|
|
|
if aliases := ", ".join(
|
|
alias
|
|
for alias in performer_from_api.get("aliases", [])
|
|
if alias.lower() != performer["name"].lower()
|
|
):
|
|
performer["aliases"] = aliases
|
|
|
|
if details := performer_from_api.get("bio"):
|
|
performer["details"] = details
|
|
|
|
# All remaining fields are only available when scraped directly
|
|
if height := performer_from_api.get("height"):
|
|
# Convert to cm
|
|
performer["height"] = str(round(height * 2.54))
|
|
|
|
if weight := performer_from_api.get("weight"):
|
|
# Convert to kg
|
|
performer["weight"] = str(round(weight / 2.205))
|
|
|
|
if birthdate := performer_from_api.get("birthday"):
|
|
performer["birthdate"] = datetime.strptime(
|
|
birthdate, "%Y-%m-%dT%H:%M:%S%z"
|
|
).strftime("%Y-%m-%d")
|
|
|
|
if birthplace := performer_from_api.get("birthPlace"):
|
|
performer["country"] = guess_nationality(birthplace)
|
|
|
|
if measurements := performer_from_api.get("measurements"):
|
|
performer["measurements"] = measurements
|
|
|
|
images = dig(performer_from_api, "images", "master_profile") or {}
|
|
# Performers can have multiple images, try to get the biggest versions
|
|
if images := [
|
|
img
|
|
for alt in images.values()
|
|
if (img := dig(alt, ("xx", "xl", "lg", "md", "sm"), "url"))
|
|
]:
|
|
performer["images"] = images
|
|
|
|
if tags := to_tags(performer_from_api):
|
|
performer["tags"] = tags
|
|
|
|
if site:
|
|
performer["url"] = _construct_performer_url(performer_from_api, site)
|
|
|
|
return performer
|
|
|
|
|
|
def to_scraped_movie(movie_from_api: dict) -> ScrapedMovie:
|
|
if movie_from_api["type"] not in ("movie", "serie"):
|
|
wrong_type = movie_from_api["type"]
|
|
wrong_id = movie_from_api["id"]
|
|
log.error(f"Attempted to scrape a '{wrong_type}' (ID: {wrong_id}) as a movie.")
|
|
raise ValueError("Invalid movie from API")
|
|
|
|
movie: ScrapedMovie = {
|
|
"name": movie_from_api["title"],
|
|
"synopsis": dig(movie_from_api, "description"),
|
|
"url": _construct_url(movie_from_api),
|
|
}
|
|
|
|
if front_image := dig(movie_from_api, "images", "cover", "0", "xx", "url"):
|
|
movie["front_image"] = re.sub(r"/m=[^/]+", "", front_image)
|
|
elif poster := dig(movie_from_api, "images", "poster", "0", "xx", "url"):
|
|
movie["front_image"] = re.sub(r"/m=[^/]+", "", poster)
|
|
|
|
if date := dig(movie_from_api, "dateReleased"):
|
|
movie["date"] = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z").strftime(
|
|
"%Y-%m-%d"
|
|
)
|
|
|
|
if studio := get_studio(movie_from_api):
|
|
movie["studio"] = studio
|
|
|
|
return movie
|
|
|
|
|
|
def to_scraped_scene(scene_from_api: dict) -> ScrapedScene:
|
|
if not scene_from_api["type"] == "scene":
|
|
wrong_type = scene_from_api["type"]
|
|
wrong_id = scene_from_api["id"]
|
|
log.error(f"Attempted to scrape a '{wrong_type}' (ID: {wrong_id}) as a scene.")
|
|
raise ValueError("Invalid scene from API")
|
|
|
|
if (details := dig(scene_from_api, "description")) or (details := dig(scene_from_api, "parent", "description")):
|
|
details = unescape(details)
|
|
details = "\n".join([" ".join([s for s in x.strip(" ").split(" ") if s != ""]) for x in "".join(details).split("\n")])
|
|
|
|
scene: ScrapedScene = {
|
|
"title": scene_from_api["title"],
|
|
"code": str(scene_from_api["id"]),
|
|
"details": details,
|
|
"date": datetime.strptime(
|
|
scene_from_api["dateReleased"], "%Y-%m-%dT%H:%M:%S%z"
|
|
).strftime("%Y-%m-%d"),
|
|
"url": _construct_url(scene_from_api),
|
|
"performers": [
|
|
to_scraped_performer(p, dig(scene_from_api, "brand"))
|
|
for p in scene_from_api["actors"]
|
|
],
|
|
"tags": to_tags(scene_from_api),
|
|
}
|
|
|
|
if image := dig(
|
|
scene_from_api,
|
|
"images",
|
|
("poster", "poster_fallback"),
|
|
"0",
|
|
("xx", "xl", "lg", "md", "sm", "xs"),
|
|
"url",
|
|
):
|
|
scene["image"] = re.sub(r"/m=[^/]+", "", image)
|
|
|
|
if dig(scene_from_api, "parent", "type") in ("movie", "serie"):
|
|
scene["movies"] = [to_scraped_movie(scene_from_api["parent"])]
|
|
|
|
if studio := get_studio(scene_from_api):
|
|
scene["studio"] = studio
|
|
|
|
if config.scrape_markers and (markers := scene_from_api.get("timeTags")):
|
|
scene["markers"] = [to_marker(m) for m in markers] # type: ignore
|
|
|
|
return scene
|
|
|
|
|
|
## Primary functions used to scrape from Aylo's API
|
|
def scene_from_url(
|
|
url, postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess
|
|
) -> ScrapedScene | None:
|
|
"""
|
|
Scrapes a scene from a URL, running an optional postprocess function on the result
|
|
"""
|
|
|
|
if not (match := re.search(r"/(\d+)/", url)):
|
|
log.error(
|
|
"Can't get the ID of the Scene. "
|
|
"Are you sure that URL is from a site in the Aylo Network?"
|
|
)
|
|
return None
|
|
scene_id = match.group(1)
|
|
|
|
log.debug(f"Scene ID: {scene_id}")
|
|
|
|
# Extract the domain from the URL
|
|
domain = domains.site_name(url)
|
|
|
|
api_URL = f"https://site-api.project1service.com/v2/releases/{scene_id}"
|
|
api_headers = _create_headers_for(domain)
|
|
if not api_headers:
|
|
return None
|
|
api_scene_json = __api_request(api_URL, api_headers)
|
|
|
|
if not api_scene_json:
|
|
return None
|
|
|
|
# If you scrape a trailer we can still get the correct scene data
|
|
if (
|
|
dig(api_scene_json, "type") != "scene"
|
|
and dig(api_scene_json, "parent", "type") == "scene"
|
|
):
|
|
log.debug("Result is a trailer, getting scene data from parent")
|
|
api_scene_json = api_scene_json["parent"]
|
|
|
|
return postprocess(to_scraped_scene(api_scene_json), api_scene_json)
|
|
|
|
|
|
def gallery_from_url(
|
|
url,
|
|
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
|
|
) -> ScrapedGallery | None:
|
|
"""
|
|
Scrapes a gallery from a URL, running an optional postprocess function on the result
|
|
|
|
Note that most Aylo sites do not have public links to galleries, so this will treat scenes as galleries
|
|
"""
|
|
|
|
scene = scene_from_url(url, postprocess=postprocess)
|
|
if not scene:
|
|
return None
|
|
|
|
temp = {
|
|
"title": scene.get("title"),
|
|
"details": scene.get("details"),
|
|
"url": scene.get("url"),
|
|
"date": scene.get("date"),
|
|
"studio": scene.get("studio"),
|
|
"tags": scene.get("tags"),
|
|
"performers": scene.get("performers"),
|
|
"code": scene.get("code"),
|
|
}
|
|
|
|
gallery: ScrapedGallery = {k: v for k, v in temp.items() if v is not None} # type: ignore
|
|
|
|
return gallery
|
|
|
|
|
|
def performer_from_url(
|
|
url,
|
|
postprocess: Callable[
|
|
[ScrapedPerformer, dict], ScrapedPerformer
|
|
] = default_postprocess,
|
|
) -> ScrapedPerformer | None:
|
|
"""
|
|
Scrapes a performer from a URL, running an optional postprocess function on the result
|
|
"""
|
|
|
|
if not (match := re.search(r"/(\d+)/", url)):
|
|
log.error(
|
|
"Can't get the ID of the performer. "
|
|
"Are you sure that URL is from a site in the Aylo Network?"
|
|
)
|
|
return None
|
|
performer_id = match.group(1)
|
|
|
|
log.debug(f"Performer ID: {performer_id}")
|
|
|
|
# Extract the domain from the URL
|
|
domain = urlparse(url).netloc.split(".")[-2]
|
|
|
|
api_URL = f"https://site-api.project1service.com/v1/actors/{performer_id}"
|
|
api_headers = _create_headers_for(domain)
|
|
api_performer_json = __api_request(api_URL, api_headers)
|
|
if not api_performer_json:
|
|
return None
|
|
|
|
return postprocess(to_scraped_performer(api_performer_json), api_performer_json)
|
|
|
|
|
|
def movie_from_url(
|
|
url, postprocess: Callable[[ScrapedMovie, dict], ScrapedMovie] = default_postprocess
|
|
) -> ScrapedMovie | None:
|
|
"""
|
|
Scrapes a movie from a URL, running an optional postprocess function on the result
|
|
"""
|
|
|
|
if not (match := re.search(r"/(\d+)/", url)):
|
|
log.error(
|
|
"Can't get the ID of the movie. "
|
|
"Are you sure that URL is from a site in the Aylo Network?"
|
|
)
|
|
return None
|
|
movie_id = match.group(1)
|
|
|
|
log.debug(f"Movie ID: {movie_id}")
|
|
|
|
# Extract the domain from the URL
|
|
domain = urlparse(url).netloc.split(".")[-2]
|
|
|
|
api_URL = f"https://site-api.project1service.com/v2/releases/{movie_id}"
|
|
api_headers = _create_headers_for(domain)
|
|
api_movie_json = __api_request(api_URL, api_headers)
|
|
if not api_movie_json:
|
|
return None
|
|
|
|
with open("api_response.json", "w", encoding="utf-8") as f:
|
|
json.dump(api_movie_json, f, indent=2)
|
|
|
|
if dig(api_movie_json, "type") in ("movie", "serie"):
|
|
return postprocess(to_scraped_movie(api_movie_json), api_movie_json)
|
|
|
|
# If you scrape a scene or trailer, we can still get the correct movie data
|
|
if dig(api_movie_json, "parent", "type") in ("movie", "serie"):
|
|
log.debug("Result is a scene or trailer, getting movie data from parent")
|
|
return movie_from_url(
|
|
url.replace(f"/{movie_id}/", f"/{api_movie_json['parent']['id']}/"),
|
|
postprocess=postprocess,
|
|
)
|
|
return postprocess(
|
|
to_scraped_movie(api_movie_json["parent"]), api_movie_json["parent"]
|
|
)
|
|
|
|
|
|
# Since the "Scrape with..." function in Stash expects a single result, we provide
|
|
# this function to return the first result that exceeds the threshold so
|
|
# that users don't need to use scene_search directly and THEN take the first result
|
|
def find_scene(
|
|
query: str,
|
|
search_domains: list[str] | None = None,
|
|
min_ratio: float = 0.9,
|
|
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
|
|
) -> ScrapedScene | None:
|
|
"""
|
|
Searches the Aylo API for scenes matching the given query and returns the
|
|
first match that exceeds `min_ratio` similarity: a float between 0 and 1.
|
|
|
|
Differs from `scene_from_query` in that it only returns the first match,
|
|
returning early as soon as it finds a match that exceeds the threshold.
|
|
|
|
If search_domains is provided it will only search those domains,
|
|
otherwise it will search all (this could be very slow!)
|
|
|
|
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
|
|
|
|
If postprocess is provided it will be called on the result before returning
|
|
"""
|
|
if not query:
|
|
log.error("No query provided")
|
|
return None
|
|
|
|
if not search_domains:
|
|
log.warning("Searcing all known domains, this could be very slow!")
|
|
search_domains = domains.all_domains()
|
|
|
|
log.debug(f"Matching '{query}' against {len(search_domains)} sites")
|
|
|
|
def matcher(candidate_title: str):
|
|
return round(
|
|
difflib.SequenceMatcher(
|
|
None, query.lower(), candidate_title.lower()
|
|
).ratio(),
|
|
3,
|
|
)
|
|
|
|
for domain in search_domains:
|
|
log.debug(f"Searching '{domain}'")
|
|
|
|
api_headers = _create_headers_for(domain)
|
|
search_url = f"https://site-api.project1service.com/v2/releases?search={query}&type=scene"
|
|
api_response = __api_request(search_url, api_headers)
|
|
|
|
if api_response is None:
|
|
log.error(f"Failed to search '{domain}'")
|
|
continue
|
|
if not api_response:
|
|
log.debug(f"No results from '{domain}'")
|
|
continue
|
|
|
|
best_match = max(api_response, key=lambda x: matcher(x["title"]))
|
|
ratio = matcher(best_match["title"])
|
|
if ratio >= min_ratio:
|
|
log.info(
|
|
f"Found scene '{best_match['title']}' with {ratio:.2%} similarity "
|
|
f"to '{query}' (exceeds {min_ratio:.2%} threshold) "
|
|
f"on '{domain}'"
|
|
)
|
|
return postprocess(to_scraped_scene(best_match), best_match)
|
|
else:
|
|
log.info(
|
|
f"Giving up on '{domain}': best result '{best_match['title']}' "
|
|
f"with {ratio:.2%} similarity"
|
|
)
|
|
|
|
log.error(f"No scenes found for '{query}'")
|
|
return None
|
|
|
|
|
|
# Since the "Scrape with..." function in Stash expects a single result, we provide
|
|
# this function to return the first result that exceeds the threshold so
|
|
# that users don't need to use performer_search directly and THEN take the first result
|
|
def find_performer(
|
|
query: str,
|
|
search_domains: list[str] | None = None,
|
|
min_ratio: float = 0.9,
|
|
postprocess: Callable[
|
|
[ScrapedPerformer, dict], ScrapedPerformer
|
|
] = default_postprocess,
|
|
) -> ScrapedPerformer | None:
|
|
"""
|
|
Searches the Aylo API for performers matching the given query and returns the
|
|
first match that exceeds `min_ratio` similarity: a float between 0 and 1.
|
|
|
|
Differs from `search_performer` in that it only returns the first match,
|
|
returning early as soon as it finds a match that exceeds the threshold.
|
|
|
|
If search_domains is provided it will only search those domains,
|
|
otherwise it will search all (this could be very slow!)
|
|
|
|
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
|
|
|
|
If postprocess is provided it will be called on the result before returning
|
|
"""
|
|
if not query:
|
|
log.error("No query provided")
|
|
return None
|
|
|
|
if not search_domains:
|
|
log.warning("Searcing all known domains, this could be very slow!")
|
|
search_domains = domains.all_domains()
|
|
|
|
log.debug(f"Matching '{query}' against {len(search_domains)} sites")
|
|
|
|
def matcher(candidate_name: str):
|
|
return round(
|
|
difflib.SequenceMatcher(
|
|
None, query.lower(), candidate_name.lower()
|
|
).ratio(),
|
|
3,
|
|
)
|
|
|
|
for domain in search_domains:
|
|
log.debug(f"Searching {domain}")
|
|
|
|
api_headers = _create_headers_for(domain)
|
|
search_url = f"https://site-api.project1service.com/v1/actors?search={query}"
|
|
api_response = __api_request(search_url, api_headers)
|
|
|
|
if api_response is None:
|
|
log.error(f"Failed to search {domain}")
|
|
continue
|
|
if not api_response:
|
|
log.debug(f"No results from {domain}")
|
|
continue
|
|
|
|
best_match = max(api_response, key=lambda x: matcher(x["name"]))
|
|
ratio = matcher(best_match["name"])
|
|
if ratio >= min_ratio:
|
|
log.info(
|
|
f"Found performer '{best_match['name']}' with {ratio:.2%} similarity "
|
|
f"to '{query}' (exceeds {min_ratio:.2%} threshold) "
|
|
f"on '{domain}'"
|
|
)
|
|
return postprocess(to_scraped_performer(best_match, domain), best_match)
|
|
else:
|
|
log.info(
|
|
f"Giving up on '{domain}': best result '{best_match['name']}' "
|
|
f"with {ratio:.2%} similarity"
|
|
)
|
|
|
|
log.error(f"No performers found for '{query}'")
|
|
return None
|
|
|
|
|
|
def scene_search(
|
|
query: str,
|
|
search_domains: list[str] | None = None,
|
|
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
|
|
) -> list[ScrapedScene]:
|
|
"""
|
|
Searches the Aylo API for the given query and returns a list of ScrapedScene
|
|
|
|
If search_domains is provided it will only search those domains,
|
|
otherwise it will search all known domains (this could be very slow!)
|
|
|
|
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
|
|
|
|
If postprocess is provided it will be called on each result before returning
|
|
"""
|
|
if not query:
|
|
log.error("No query provided")
|
|
return []
|
|
|
|
if not search_domains:
|
|
log.warning("Searcing all known domains, this could be very slow!")
|
|
search_domains = domains.all_domains()
|
|
|
|
log.debug(f"Searching for '{query}' on {len(search_domains)} sites")
|
|
|
|
# The source of the results will be based on the token used (Brazzers, Reality Kings, etc.)
|
|
search_url = f"https://site-api.project1service.com/v2/releases?search={query}&type=scene&limit=10"
|
|
search_results = []
|
|
already_seen = set()
|
|
|
|
def matcher(candidate: ScrapedScene):
|
|
return round(
|
|
difflib.SequenceMatcher(
|
|
None,
|
|
query.lower(),
|
|
candidate["title"].lower(), # type: ignore (title is always set)
|
|
).ratio(),
|
|
3,
|
|
)
|
|
|
|
for domain in search_domains:
|
|
log.debug(f"Searching {domain}")
|
|
|
|
api_headers = _create_headers_for(domain)
|
|
api_response = __api_request(search_url, api_headers)
|
|
if api_response is None:
|
|
log.error(f"Failed to search {domain}")
|
|
continue
|
|
if not api_response:
|
|
log.debug(f"No results from {domain}")
|
|
continue
|
|
|
|
candidates = [
|
|
postprocess(to_scraped_scene(result), result)
|
|
for result in api_response
|
|
if result["id"] not in already_seen
|
|
]
|
|
search_results.extend(
|
|
c
|
|
for c in candidates
|
|
if matcher(c) > 0.5 and c.get("code") not in already_seen
|
|
)
|
|
already_seen.update(c.get("code") for c in candidates)
|
|
|
|
# Try to to avoid more than 10ish results or this will take forever
|
|
if len(search_results) >= 10:
|
|
log.warning("Found more than 10 results, stopping search")
|
|
break
|
|
|
|
log.info(f"Search finished, found {len(search_results)} candidates")
|
|
|
|
return sorted(search_results, key=matcher, reverse=True)
|
|
|
|
|
|
def performer_search(
|
|
query: str,
|
|
search_domains: list[str] | None = None,
|
|
postprocess: Callable[
|
|
[ScrapedPerformer, dict], ScrapedPerformer
|
|
] = default_postprocess,
|
|
) -> list[ScrapedPerformer]:
|
|
"""
|
|
Searches the Aylo API for the given query and returns a list of ScrapedPerformer
|
|
|
|
If search_domains is provided it will only search those domains,
|
|
otherwise it will search all known domains (this could be very slow!)
|
|
|
|
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
|
|
|
|
If postprocess is provided it will be called on each result before returning
|
|
"""
|
|
if not query:
|
|
log.error("No query provided")
|
|
return []
|
|
|
|
if not search_domains:
|
|
log.warning("Searcing all known domains, this could be very slow!")
|
|
search_domains = domains.all_domains()
|
|
|
|
log.debug(f"Searching for '{query}' on {len(search_domains)} sites")
|
|
|
|
# The source of the results will be based on the token used (Brazzers, Reality Kings, etc.)
|
|
search_url = (
|
|
f"https://site-api.project1service.com/v1/actors?search={query}&limit=10"
|
|
)
|
|
search_results = []
|
|
already_seen = set()
|
|
|
|
def matcher(candidate: ScrapedPerformer):
|
|
return round(
|
|
difflib.SequenceMatcher(
|
|
None,
|
|
query.lower(),
|
|
candidate["name"].lower(), # type: ignore (name is always set)
|
|
).ratio(),
|
|
3,
|
|
)
|
|
|
|
for domain in search_domains:
|
|
log.debug(f"Searching {domain}")
|
|
|
|
api_headers = _create_headers_for(domain)
|
|
api_response = __api_request(search_url, api_headers)
|
|
if api_response is None:
|
|
log.error(f"Failed to search {domain}")
|
|
continue
|
|
if not api_response:
|
|
log.debug(f"No results from {domain}")
|
|
continue
|
|
|
|
candidates = [
|
|
postprocess(to_scraped_performer(result, domain), result)
|
|
for result in api_response
|
|
]
|
|
|
|
search_results.extend(
|
|
c
|
|
for c in candidates
|
|
if matcher(c) > 0.5 and c.get("name") not in already_seen
|
|
)
|
|
already_seen.update(c.get("name") for c in candidates)
|
|
|
|
# Try to to avoid more than 10ish results or this will take forever
|
|
if len(search_results) >= 10:
|
|
log.warning("Found more than 10 results, stopping search")
|
|
break
|
|
|
|
log.debug(f"Search finished, found {len(search_results)} candidates")
|
|
|
|
return sorted(search_results, key=matcher, reverse=True)
|
|
|
|
|
|
def scene_from_fragment(
|
|
fragment: dict,
|
|
search_domains: list[str] | None = None,
|
|
min_ratio=config.minimum_similarity,
|
|
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
|
|
) -> ScrapedScene | None:
|
|
"""
|
|
Scrapes a scene from a fragment, which must contain at least one of the following:
|
|
- url: the URL of the scene
|
|
- title: the title of the scene
|
|
|
|
If domains is provided it will only search those domains,
|
|
otherwise it will search all known domains (this could be very slow!)
|
|
|
|
If min_ratio is provided _AND_ the fragment contains a title but no URL,
|
|
the search will only return a scene if a match with at least that ratio is found
|
|
|
|
If postprocess is provided it will be called on the result before returning
|
|
"""
|
|
log.debug(f"Fragment scraping scene {fragment['id']}")
|
|
if url := fragment.get("url"):
|
|
log.debug(f"Using scene URL: '{url}'")
|
|
if scene := scene_from_url(url, postprocess=postprocess):
|
|
if markers := scene.pop("markers", []): # type: ignore
|
|
if fragment["id"] and config.scrape_markers:
|
|
add_markers(fragment["id"], markers)
|
|
else:
|
|
log.debug(
|
|
f"This scene has {len(markers)} markers,"
|
|
" you can enable scraping them in config.ini"
|
|
)
|
|
return scene
|
|
log.debug("Failed to scrape scene from URL")
|
|
if title := fragment.get("title"):
|
|
log.debug(f"Searching for '{title}'")
|
|
if scene := find_scene(
|
|
title, search_domains, min_ratio, postprocess=postprocess
|
|
):
|
|
return scene
|
|
log.debug("Failed to find scene by title")
|
|
|
|
log.warning("Cannot scrape from this fragment: need to have title or url set")
|
|
|
|
|
|
def performer_from_fragment(
|
|
fragment: dict,
|
|
search_domains: list[str] | None = None,
|
|
min_ratio=0.9,
|
|
postprocess: Callable[
|
|
[ScrapedPerformer, dict], ScrapedPerformer
|
|
] = default_postprocess,
|
|
) -> ScrapedPerformer | None:
|
|
"""
|
|
Scrapes a performer from a fragment, which must contain one of the following:
|
|
- url: the URL of the performer page (anywhere in the Aylo network)
|
|
- name: the name of the performer
|
|
|
|
If domains is provided it will only search those domains,
|
|
otherwise it will search all known domains (this could be very slow!)
|
|
|
|
If min_ratio is provided _AND_ the fragment contains a title but no URL,
|
|
the search will only return a scene if a match with at least that ratio is found
|
|
|
|
If postprocess is provided it will be called on the result before returning
|
|
"""
|
|
log.debug("Fragment scraping performer...")
|
|
if url := fragment.get("url"):
|
|
log.debug(f"Using performer URL: '{url}'")
|
|
return performer_from_url(url, postprocess=postprocess)
|
|
elif name := fragment.get("name"):
|
|
log.debug(f"Searching for '{name}'")
|
|
return find_performer(name, search_domains, min_ratio, postprocess=postprocess)
|
|
|
|
log.warning("Cannot scrape from this fragment: need to have url or name set")
|
|
|
|
|
|
def main_scraper():
|
|
"""
|
|
Takes arguments from stdin or from the command line and dumps output as JSON to stdout
|
|
"""
|
|
op, args = scraper_args()
|
|
result = None
|
|
match op, args:
|
|
case "gallery-by-url" | "gallery-by-fragment", {"url": url} if url:
|
|
result = gallery_from_url(url)
|
|
case "scene-by-url", {"url": url} if url:
|
|
result = scene_from_url(url)
|
|
case "scene-by-name", {"name": name, "extra": _domains} if name:
|
|
result = scene_search(name, search_domains=_domains)
|
|
case "scene-by-fragment" | "scene-by-query-fragment", args:
|
|
_domains = args.get("extra", None)
|
|
result = scene_from_fragment(args, search_domains=_domains)
|
|
case "performer-by-url", {"url": url}:
|
|
result = performer_from_url(url)
|
|
case "performer-by-fragment", args:
|
|
_domains = args.get("extra", None)
|
|
result = performer_from_fragment(args, search_domains=_domains)
|
|
case "performer-by-name", {"name": name, "extra": _domains} if name:
|
|
result = performer_search(name, search_domains=_domains)
|
|
case "movie-by-url", {"url": url} if url:
|
|
result = movie_from_url(url)
|
|
case _:
|
|
log.error(f"Operation: {op}, arguments: {json.dumps(args)}")
|
|
sys.exit(1)
|
|
|
|
print(json.dumps(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main_scraper()
|