mirror of
https://github.com/stashapp/CommunityScripts.git
synced 2026-04-12 09:52:34 -05:00
[Haven VLM Connector] release v1.1.0 (#686)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
name: Haven VLM Connector
|
||||
# requires: PythonDepManager
|
||||
description: Tag videos with Vision-Language Models using any OpenAI-compatible VLM endpoint
|
||||
version: 1.0.3
|
||||
version: 1.1.0
|
||||
url: https://discourse.stashapp.cc/t/haven-vlm-connector/5464
|
||||
exec:
|
||||
- python
|
||||
|
||||
@@ -13,6 +13,7 @@ import yaml
|
||||
# VLM Engine Configuration
|
||||
VLM_ENGINE_CONFIG = {
|
||||
"active_ai_models": ["vlm_multiplexer_model"],
|
||||
"trace_logging": True,
|
||||
"pipelines": {
|
||||
"video_pipeline_dynamic": {
|
||||
"inputs": [
|
||||
|
||||
@@ -18,6 +18,7 @@ from datetime import datetime
|
||||
try:
|
||||
from exit_tracker import install_exit_tracker
|
||||
import stashapi.log as log
|
||||
|
||||
install_exit_tracker(log)
|
||||
except ImportError as e:
|
||||
print(f"Warning: exit_tracker not available: {e}")
|
||||
@@ -28,23 +29,23 @@ except ImportError as e:
|
||||
# Use PythonDepManager for dependency management
|
||||
try:
|
||||
from PythonDepManager import ensure_import
|
||||
|
||||
|
||||
# Install and ensure all required dependencies with specific versions
|
||||
ensure_import(
|
||||
"stashapi:stashapp-tools==0.2.58",
|
||||
"aiohttp==3.12.13",
|
||||
"pydantic==2.12.5",
|
||||
"vlm-engine==0.9.4",
|
||||
"pyyaml==6.0.2"
|
||||
"vlm-engine==1.0.0",
|
||||
"pyyaml==6.0.2",
|
||||
)
|
||||
|
||||
|
||||
# Import the dependencies after ensuring they're available
|
||||
import stashapi.log as log
|
||||
from stashapi.stashapp import StashInterface
|
||||
import aiohttp
|
||||
import pydantic
|
||||
import yaml
|
||||
|
||||
|
||||
except ImportError as e:
|
||||
print(f"Failed to import PythonDepManager or required dependencies: {e}")
|
||||
print("Please ensure PythonDepManager is installed and available.")
|
||||
@@ -59,7 +60,9 @@ try:
|
||||
import haven_vlm_config as config
|
||||
except ModuleNotFoundError:
|
||||
log.error("Please provide a haven_vlm_config.py file with the required variables.")
|
||||
raise Exception("Please provide a haven_vlm_config.py file with the required variables.")
|
||||
raise Exception(
|
||||
"Please provide a haven_vlm_config.py file with the required variables."
|
||||
)
|
||||
|
||||
import haven_media_handler as media_handler
|
||||
import haven_vlm_engine as vlm_engine
|
||||
@@ -78,29 +81,36 @@ video_progress: Dict[str, float] = {}
|
||||
|
||||
# ----------------- Main Execution -----------------
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
"""Main entry point for the plugin"""
|
||||
global semaphore
|
||||
|
||||
|
||||
# Semaphore initialization logging for hypothesis A
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_A] Initializing semaphore with limit {config.config.concurrent_task_limit}")
|
||||
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_A] Initializing semaphore with limit {config.config.concurrent_task_limit}"
|
||||
)
|
||||
|
||||
semaphore = asyncio.Semaphore(config.config.concurrent_task_limit)
|
||||
|
||||
|
||||
# Post-semaphore creation logging
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_A] Semaphore created successfully (limit: {config.config.concurrent_task_limit})")
|
||||
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_A] Semaphore created successfully (limit: {config.config.concurrent_task_limit})"
|
||||
)
|
||||
|
||||
json_input = read_json_input()
|
||||
output = {}
|
||||
await run(json_input, output)
|
||||
out = json.dumps(output)
|
||||
print(out + "\n")
|
||||
|
||||
|
||||
def read_json_input() -> Dict[str, Any]:
|
||||
"""Read JSON input from stdin"""
|
||||
json_input = sys.stdin.read()
|
||||
return json.loads(json_input)
|
||||
|
||||
|
||||
async def run(json_input: Dict[str, Any], output: Dict[str, Any]) -> None:
|
||||
"""Main execution logic"""
|
||||
plugin_args = None
|
||||
@@ -113,7 +123,7 @@ async def run(json_input: Dict[str, Any], output: Dict[str, Any]) -> None:
|
||||
raise
|
||||
|
||||
try:
|
||||
plugin_args = json_input['args']["mode"]
|
||||
plugin_args = json_input["args"]["mode"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
@@ -129,46 +139,54 @@ async def run(json_input: Dict[str, Any], output: Dict[str, Any]) -> None:
|
||||
collect_incorrect_markers_and_images()
|
||||
output["output"] = "ok"
|
||||
return
|
||||
|
||||
|
||||
output["output"] = "ok"
|
||||
return
|
||||
|
||||
|
||||
# ----------------- High Level Processing Functions -----------------
|
||||
|
||||
|
||||
async def tag_videos() -> None:
|
||||
"""Tag videos with VLM analysis using improved async orchestration"""
|
||||
global completed_tasks, total_tasks
|
||||
|
||||
scenes = media_handler.get_tagme_scenes()
|
||||
if not scenes:
|
||||
log.info("No videos to tag. Have you tagged any scenes with the VLM_TagMe tag to get processed?")
|
||||
log.info(
|
||||
"No videos to tag. Have you tagged any scenes with the VLM_TagMe tag to get processed?"
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
total_tasks = len(scenes)
|
||||
completed_tasks = 0
|
||||
|
||||
|
||||
video_progress.clear()
|
||||
for scene in scenes:
|
||||
video_progress[scene.get('id', 'unknown')] = 0.0
|
||||
video_progress[scene.get("id", "unknown")] = 0.0
|
||||
log.progress(0.0)
|
||||
|
||||
log.info(f"🚀 Starting video processing for {total_tasks} scenes with semaphore limit of {config.config.concurrent_task_limit}")
|
||||
|
||||
log.info(
|
||||
f"🚀 Starting video processing for {total_tasks} scenes with semaphore limit of {config.config.concurrent_task_limit}"
|
||||
)
|
||||
|
||||
# Create tasks with proper indexing for debugging
|
||||
tasks = []
|
||||
for i, scene in enumerate(scenes):
|
||||
# Pre-task creation logging for hypothesis A (semaphore deadlock) and E (signal termination)
|
||||
scene_id = scene.get('id')
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_A] Creating task {i+1}/{total_tasks} for scene {scene_id}, semaphore limit: {config.config.concurrent_task_limit}")
|
||||
|
||||
scene_id = scene.get("id")
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_A] Creating task {i + 1}/{total_tasks} for scene {scene_id}, semaphore limit: {config.config.concurrent_task_limit}"
|
||||
)
|
||||
|
||||
task = asyncio.create_task(__tag_video_with_timing(scene, i))
|
||||
tasks.append(task)
|
||||
|
||||
|
||||
# Use asyncio.as_completed to process results as they finish (proves concurrency)
|
||||
completed_task_futures = asyncio.as_completed(tasks)
|
||||
|
||||
|
||||
batch_start_time = asyncio.get_event_loop().time()
|
||||
|
||||
|
||||
for completed_task in completed_task_futures:
|
||||
try:
|
||||
await completed_task
|
||||
@@ -178,36 +196,46 @@ async def tag_videos() -> None:
|
||||
completed_tasks += 1
|
||||
# Exception logging for hypothesis E (signal termination)
|
||||
error_type = type(e).__name__
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_E] Task failed with exception: {error_type}: {str(e)} (Task {completed_tasks}/{total_tasks})")
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_E] Task failed with exception: {error_type}: {str(e)} (Task {completed_tasks}/{total_tasks})"
|
||||
)
|
||||
|
||||
log.error(f"❌ Task failed: {e}")
|
||||
|
||||
total_time = asyncio.get_event_loop().time() - batch_start_time
|
||||
|
||||
log.info(f"🎉 All {total_tasks} videos completed in {total_time:.2f}s (avg: {total_time/total_tasks:.2f}s/video)")
|
||||
log.info(
|
||||
f"🎉 All {total_tasks} videos completed in {total_time:.2f}s (avg: {total_time / total_tasks:.2f}s/video)"
|
||||
)
|
||||
log.progress(1.0)
|
||||
|
||||
|
||||
async def find_marker_settings() -> None:
|
||||
"""Find optimal marker settings based on a single tagged video"""
|
||||
scenes = media_handler.get_tagme_scenes()
|
||||
if len(scenes) != 1:
|
||||
log.error("Please tag exactly one scene with the VLM_TagMe tag to get processed.")
|
||||
log.error(
|
||||
"Please tag exactly one scene with the VLM_TagMe tag to get processed."
|
||||
)
|
||||
return
|
||||
scene = scenes[0]
|
||||
await __find_marker_settings(scene)
|
||||
|
||||
|
||||
def collect_incorrect_markers_and_images() -> None:
|
||||
"""Collect data from incorrectly tagged markers and images"""
|
||||
incorrect_images = media_handler.get_incorrect_images()
|
||||
image_paths, image_ids, temp_files = media_handler.get_image_paths_and_ids(incorrect_images)
|
||||
image_paths, image_ids, temp_files = media_handler.get_image_paths_and_ids(
|
||||
incorrect_images
|
||||
)
|
||||
incorrect_markers = media_handler.get_incorrect_markers()
|
||||
|
||||
|
||||
if not (len(incorrect_images) > 0 or len(incorrect_markers) > 0):
|
||||
log.info("No incorrect images or markers to collect.")
|
||||
return
|
||||
|
||||
|
||||
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
|
||||
|
||||
try:
|
||||
# Process images
|
||||
image_folder = os.path.join(config.config.output_data_dir, "images")
|
||||
@@ -235,98 +263,130 @@ def collect_incorrect_markers_and_images() -> None:
|
||||
scene_folder = os.path.join(config.config.output_data_dir, "scenes")
|
||||
os.makedirs(scene_folder, exist_ok=True)
|
||||
tag_folders = {}
|
||||
|
||||
|
||||
for marker in incorrect_markers:
|
||||
scene_path = marker['scene']['files'][0]['path']
|
||||
scene_path = marker["scene"]["files"][0]["path"]
|
||||
if not scene_path:
|
||||
log.error(f"Marker {marker['id']} has no scene path")
|
||||
continue
|
||||
try:
|
||||
tag_name = marker['primary_tag']['name']
|
||||
tag_name = marker["primary_tag"]["name"]
|
||||
if tag_name not in tag_folders:
|
||||
tag_folders[tag_name] = os.path.join(scene_folder, tag_name)
|
||||
os.makedirs(tag_folders[tag_name], exist_ok=True)
|
||||
media_handler.write_scene_marker_to_file(marker, scene_path, tag_folders[tag_name])
|
||||
media_handler.write_scene_marker_to_file(
|
||||
marker, scene_path, tag_folders[tag_name]
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Failed to collect scene: {e}")
|
||||
|
||||
|
||||
# Remove incorrect tags from images
|
||||
image_ids = [image['id'] for image in incorrect_images]
|
||||
image_ids = [image["id"] for image in incorrect_images]
|
||||
media_handler.remove_incorrect_tag_from_images(image_ids)
|
||||
|
||||
|
||||
# ----------------- Low Level Processing Functions -----------------
|
||||
|
||||
|
||||
async def __tag_video_with_timing(scene: Dict[str, Any], scene_index: int) -> None:
|
||||
"""Tag a single video scene with timing diagnostics"""
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
scene_id = scene.get('id', 'unknown')
|
||||
|
||||
scene_id = scene.get("id", "unknown")
|
||||
|
||||
log.info(f"🎬 Starting video {scene_index + 1}: Scene {scene_id}")
|
||||
|
||||
|
||||
try:
|
||||
await __tag_video(scene)
|
||||
end_time = asyncio.get_event_loop().time()
|
||||
duration = end_time - start_time
|
||||
log.info(f"✅ Completed video {scene_index + 1} (Scene {scene_id}) in {duration:.2f}s")
|
||||
|
||||
log.info(
|
||||
f"✅ Completed video {scene_index + 1} (Scene {scene_id}) in {duration:.2f}s"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
end_time = asyncio.get_event_loop().time()
|
||||
duration = end_time - start_time
|
||||
log.error(f"❌ Failed video {scene_index + 1} (Scene {scene_id}) after {duration:.2f}s: {e}")
|
||||
log.error(
|
||||
f"❌ Failed video {scene_index + 1} (Scene {scene_id}) after {duration:.2f}s: {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def __tag_video(scene: Dict[str, Any]) -> None:
|
||||
"""Tag a single video scene with semaphore timing instrumentation"""
|
||||
scene_id = scene.get('id')
|
||||
|
||||
scene_id = scene.get("id")
|
||||
|
||||
# Pre-semaphore acquisition logging for hypothesis A (semaphore deadlock)
|
||||
task_start_time = asyncio.get_event_loop().time()
|
||||
acquisition_start_time = task_start_time
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_A] Task starting for scene {scene_id} at {task_start_time:.3f}s")
|
||||
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_A] Task starting for scene {scene_id} at {task_start_time:.3f}s"
|
||||
)
|
||||
|
||||
async with semaphore:
|
||||
try:
|
||||
# Semaphore acquisition successful logging
|
||||
acquisition_end_time = asyncio.get_event_loop().time()
|
||||
acquisition_time = acquisition_end_time - acquisition_start_time
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_A] Semaphore acquired for scene {scene_id} after {acquisition_time:.3f}s")
|
||||
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_A] Semaphore acquired for scene {scene_id} after {acquisition_time:.3f}s"
|
||||
)
|
||||
|
||||
if scene_id is None:
|
||||
log.error("Scene missing 'id' field")
|
||||
return
|
||||
|
||||
files = scene.get('files', [])
|
||||
|
||||
files = scene.get("files", [])
|
||||
if not files:
|
||||
log.error(f"Scene {scene_id} has no files")
|
||||
return
|
||||
|
||||
scene_file = files[0].get('path')
|
||||
|
||||
scene_file = files[0].get("path")
|
||||
if scene_file is None:
|
||||
log.error(f"Scene {scene_id} file has no path")
|
||||
return
|
||||
|
||||
|
||||
# Check if scene is VR
|
||||
is_vr = media_handler.is_vr_scene(scene.get('tags', []))
|
||||
|
||||
is_vr = media_handler.is_vr_scene(scene.get("tags", []))
|
||||
|
||||
def progress_cb(p: int) -> None:
|
||||
global video_progress, total_tasks
|
||||
video_progress[scene_id] = p / 100.0
|
||||
total_prog = sum(video_progress.values()) / total_tasks
|
||||
|
||||
stats = vlm_engine.vlm_engine.get_performance_stats()
|
||||
total_frames = stats.get("total_frames_processed", 0)
|
||||
elapsed_seconds = stats.get("elapsed_time", 0.0)
|
||||
|
||||
log.info(f"[Throughput] total_frames: {total_frames}")
|
||||
log.info(f"[Throughput] elapsed_seconds: {elapsed_seconds:.2f}")
|
||||
|
||||
if elapsed_seconds > 0:
|
||||
fpm = (total_frames / elapsed_seconds) * 60.0
|
||||
else:
|
||||
fpm = 0.0
|
||||
|
||||
log.info(f"[Throughput] calculated_fpm: {fpm:.1f}")
|
||||
log.info(
|
||||
f"[Throughput] Frame ~{(p / 100) * 100:.0f}: {fpm:.1f} FPM | progress: {p}%"
|
||||
)
|
||||
log.progress(total_prog)
|
||||
|
||||
|
||||
# Process video through VLM Engine with HTTP timing for hypothesis B
|
||||
processing_start_time = asyncio.get_event_loop().time()
|
||||
|
||||
|
||||
# HTTP request lifecycle tracking start
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_B] Starting VLM processing for scene {scene_id}: {scene_file}")
|
||||
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_B] Starting VLM processing for scene {scene_id}: {scene_file}"
|
||||
)
|
||||
|
||||
video_result = await vlm_engine.process_video_async(
|
||||
scene_file,
|
||||
vr_video=is_vr,
|
||||
frame_interval=config.config.video_frame_interval,
|
||||
threshold=config.config.video_threshold,
|
||||
return_confidence=config.config.video_confidence_return,
|
||||
progress_callback=progress_cb
|
||||
progress_callback=progress_cb,
|
||||
)
|
||||
|
||||
# Extract detected tags
|
||||
@@ -337,13 +397,15 @@ async def __tag_video(scene: Dict[str, Any]) -> None:
|
||||
# Post-VLM processing logging
|
||||
processing_end_time = asyncio.get_event_loop().time()
|
||||
processing_duration = processing_end_time - processing_start_time
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_B] VLM processing completed for scene {scene_id} in {processing_duration:.2f}s ({len(detected_tags)} detected tags)")
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_B] VLM processing completed for scene {scene_id} in {processing_duration:.2f}s ({len(detected_tags)} detected tags)"
|
||||
)
|
||||
|
||||
if detected_tags:
|
||||
# Clear all existing tags and markers before adding new ones
|
||||
media_handler.clear_all_tags_from_video(scene)
|
||||
media_handler.clear_all_markers_from_video(scene_id)
|
||||
|
||||
|
||||
# Add tags to scene
|
||||
tag_ids = media_handler.get_tag_ids(list(detected_tags), create=True)
|
||||
media_handler.add_tags_to_video(scene_id, tag_ids)
|
||||
@@ -351,81 +413,91 @@ async def __tag_video(scene: Dict[str, Any]) -> None:
|
||||
|
||||
# Add markers if enabled
|
||||
if config.config.create_markers:
|
||||
media_handler.add_markers_to_video_from_dict(scene_id, video_result.tag_timespans)
|
||||
media_handler.add_markers_to_video_from_dict(
|
||||
scene_id, video_result.tag_timespans
|
||||
)
|
||||
log.info(f"Added markers to scene {scene_id}")
|
||||
|
||||
# Remove VLM_TagMe tag from processed scene
|
||||
media_handler.remove_tagme_tag_from_scene(scene_id)
|
||||
|
||||
|
||||
# Task completion logging
|
||||
task_end_time = asyncio.get_event_loop().time()
|
||||
total_task_time = task_end_time - task_start_time
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_A] Task completed for scene {scene_id} in {total_task_time:.2f}s")
|
||||
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_A] Task completed for scene {scene_id} in {total_task_time:.2f}s"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Exception handling with detailed logging for hypothesis E
|
||||
exception_time = asyncio.get_event_loop().time()
|
||||
error_type = type(e).__name__
|
||||
log.debug(f"[DEBUG_HYPOTHESIS_E] Task exception for scene {scene_id}: {error_type}: {str(e)} at {exception_time:.3f}s")
|
||||
|
||||
scene_id = scene.get('id', 'unknown')
|
||||
log.debug(
|
||||
f"[DEBUG_HYPOTHESIS_E] Task exception for scene {scene_id}: {error_type}: {str(e)} at {exception_time:.3f}s"
|
||||
)
|
||||
|
||||
scene_id = scene.get("id", "unknown")
|
||||
log.error(f"Error processing video scene {scene_id}: {e}")
|
||||
# Add error tag to failed scene if we have a valid ID
|
||||
if scene_id != 'unknown':
|
||||
if scene_id != "unknown":
|
||||
media_handler.add_error_scene(scene_id)
|
||||
|
||||
|
||||
async def __find_marker_settings(scene: Dict[str, Any]) -> None:
|
||||
"""Find optimal marker settings for a scene"""
|
||||
try:
|
||||
scene_id = scene.get('id')
|
||||
scene_id = scene.get("id")
|
||||
if scene_id is None:
|
||||
log.error("Scene missing 'id' field")
|
||||
return
|
||||
|
||||
files = scene.get('files', [])
|
||||
|
||||
files = scene.get("files", [])
|
||||
if not files:
|
||||
log.error(f"Scene {scene_id} has no files")
|
||||
return
|
||||
|
||||
scene_file = files[0].get('path')
|
||||
|
||||
scene_file = files[0].get("path")
|
||||
if scene_file is None:
|
||||
log.error(f"Scene {scene_id} file has no path")
|
||||
return
|
||||
|
||||
|
||||
# Get existing markers for the scene
|
||||
existing_markers = media_handler.get_scene_markers(scene_id)
|
||||
|
||||
|
||||
# Convert markers to desired timespan format
|
||||
desired_timespan_data = {}
|
||||
for marker in existing_markers:
|
||||
tag_name = marker['primary_tag']['name']
|
||||
tag_name = marker["primary_tag"]["name"]
|
||||
desired_timespan_data[tag_name] = TimeFrame(
|
||||
start=marker['seconds'],
|
||||
end=marker.get('end_seconds', marker['seconds'] + 1),
|
||||
total_confidence=1.0
|
||||
start=marker["seconds"],
|
||||
end=marker.get("end_seconds", marker["seconds"] + 1),
|
||||
total_confidence=1.0,
|
||||
)
|
||||
|
||||
# Find optimal settings
|
||||
optimal_settings = await vlm_engine.find_optimal_marker_settings_async(
|
||||
existing_json={}, # No existing JSON data
|
||||
desired_timespan_data=desired_timespan_data
|
||||
desired_timespan_data=desired_timespan_data,
|
||||
)
|
||||
|
||||
# Output results
|
||||
log.info(f"Optimal marker settings found for scene {scene_id}:")
|
||||
log.info(json.dumps(optimal_settings, indent=2))
|
||||
|
||||
|
||||
except Exception as e:
|
||||
scene_id = scene.get('id', 'unknown')
|
||||
scene_id = scene.get("id", "unknown")
|
||||
log.error(f"Error finding marker settings for scene {scene_id}: {e}")
|
||||
|
||||
|
||||
# ----------------- Cleanup -----------------
|
||||
|
||||
|
||||
async def cleanup() -> None:
|
||||
"""Cleanup resources"""
|
||||
if vlm_engine.vlm_engine:
|
||||
await vlm_engine.vlm_engine.shutdown()
|
||||
|
||||
|
||||
# Run main function if script is executed directly
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
|
||||
@@ -13,49 +13,57 @@ import json
|
||||
# Use PythonDepManager for dependency management
|
||||
from vlm_engine import VLMEngine
|
||||
from vlm_engine.config_models import (
|
||||
EngineConfig,
|
||||
PipelineConfig,
|
||||
ModelConfig,
|
||||
PipelineModelConfig
|
||||
EngineConfig,
|
||||
PipelineConfig,
|
||||
ModelConfig,
|
||||
PipelineModelConfig,
|
||||
)
|
||||
|
||||
import haven_vlm_config as config
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.CRITICAL)
|
||||
logging.basicConfig(
|
||||
level=logging.WARNING, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimeFrame:
|
||||
"""Represents a time frame with start and end times"""
|
||||
|
||||
start: float
|
||||
end: float
|
||||
total_confidence: Optional[float] = None
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Convert to JSON string"""
|
||||
return json.dumps({
|
||||
"start": self.start,
|
||||
"end": self.end,
|
||||
"total_confidence": self.total_confidence
|
||||
})
|
||||
return json.dumps(
|
||||
{
|
||||
"start": self.start,
|
||||
"end": self.end,
|
||||
"total_confidence": self.total_confidence,
|
||||
}
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"TimeFrame(start={self.start}, end={self.end}, confidence={self.total_confidence})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoTagInfo:
|
||||
"""Represents video tagging information"""
|
||||
|
||||
video_duration: float
|
||||
video_tags: Dict[str, Set[str]]
|
||||
tag_totals: Dict[str, Dict[str, float]]
|
||||
tag_timespans: Dict[str, Dict[str, List[TimeFrame]]]
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_data: Dict[str, Any]) -> 'VideoTagInfo':
|
||||
def from_json(cls, json_data: Dict[str, Any]) -> "VideoTagInfo":
|
||||
"""Create VideoTagInfo from JSON data"""
|
||||
logger.debug(f"Creating VideoTagInfo from JSON: {json_data}")
|
||||
|
||||
|
||||
# Convert tag_timespans to TimeFrame objects
|
||||
tag_timespans = {}
|
||||
for category, tags in json_data.get("tag_timespans", {}).items():
|
||||
@@ -65,46 +73,65 @@ class VideoTagInfo:
|
||||
TimeFrame(
|
||||
start=tf["start"],
|
||||
end=tf["end"],
|
||||
total_confidence=tf.get("total_confidence")
|
||||
) for tf in timeframes
|
||||
total_confidence=tf.get("total_confidence"),
|
||||
)
|
||||
for tf in timeframes
|
||||
]
|
||||
|
||||
|
||||
return cls(
|
||||
video_duration=json_data.get("video_duration", 0.0),
|
||||
video_tags=json_data.get("video_tags", {}),
|
||||
tag_totals=json_data.get("tag_totals", {}),
|
||||
tag_timespans=tag_timespans
|
||||
tag_timespans=tag_timespans,
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"VideoTagInfo(duration={self.video_duration}, tags={len(self.video_tags)}, timespans={len(self.tag_timespans)})"
|
||||
|
||||
|
||||
class HavenVLMEngine:
|
||||
"""Main VLM Engine integration class"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.engine: Optional[VLMEngine] = None
|
||||
self.engine_config: Optional[EngineConfig] = None
|
||||
self._initialized = False
|
||||
|
||||
def _configure_logging(self) -> None:
|
||||
"""Configure logging levels based on plugin config."""
|
||||
vlm_config = config.config.vlm_engine_config
|
||||
trace_enabled = vlm_config.get("trace_logging", False)
|
||||
|
||||
if trace_enabled:
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logging.getLogger("logger").setLevel(logging.DEBUG)
|
||||
logging.getLogger("multiplexer_llm").setLevel(logging.DEBUG)
|
||||
logger.debug("Trace logging enabled for vlm-engine and multiplexer-llm")
|
||||
else:
|
||||
logger.setLevel(logging.WARNING)
|
||||
|
||||
async def initialize(self) -> None:
|
||||
"""Initialize the VLM Engine with configuration"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
try:
|
||||
self._configure_logging()
|
||||
logger.info("Initializing Haven VLM Engine...")
|
||||
|
||||
|
||||
# Convert config dict to EngineConfig objects
|
||||
self.engine_config = self._create_engine_config()
|
||||
|
||||
|
||||
# Create and initialize the engine
|
||||
self.engine = VLMEngine(config=self.engine_config)
|
||||
await self.engine.initialize()
|
||||
|
||||
|
||||
self._initialized = True
|
||||
logger.info("Haven VLM Engine initialized successfully")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize VLM Engine: {e}")
|
||||
raise
|
||||
@@ -112,24 +139,23 @@ class HavenVLMEngine:
|
||||
def _create_engine_config(self) -> EngineConfig:
|
||||
"""Create EngineConfig from the configuration"""
|
||||
vlm_config = config.config.vlm_engine_config
|
||||
|
||||
|
||||
# Create pipeline configs
|
||||
pipelines = {}
|
||||
for pipeline_name, pipeline_data in vlm_config["pipelines"].items():
|
||||
models = [
|
||||
PipelineModelConfig(
|
||||
name=model["name"],
|
||||
inputs=model["inputs"],
|
||||
outputs=model["outputs"]
|
||||
) for model in pipeline_data["models"]
|
||||
name=model["name"], inputs=model["inputs"], outputs=model["outputs"]
|
||||
)
|
||||
for model in pipeline_data["models"]
|
||||
]
|
||||
|
||||
|
||||
pipelines[pipeline_name] = PipelineConfig(
|
||||
inputs=pipeline_data["inputs"],
|
||||
output=pipeline_data["output"],
|
||||
short_name=pipeline_data["short_name"],
|
||||
version=pipeline_data["version"],
|
||||
models=models
|
||||
models=models,
|
||||
)
|
||||
|
||||
# Create model configs with new architectural changes
|
||||
@@ -141,17 +167,21 @@ class HavenVLMEngine:
|
||||
for endpoint in model_data.get("multiplexer_endpoints", []):
|
||||
# Validate that max_concurrent is present
|
||||
if "max_concurrent" not in endpoint:
|
||||
raise ValueError(f"Endpoint '{endpoint.get('name', 'unnamed')}' is missing required 'max_concurrent' parameter")
|
||||
|
||||
multiplexer_endpoints.append({
|
||||
"base_url": endpoint["base_url"],
|
||||
"api_key": endpoint.get("api_key", ""),
|
||||
"name": endpoint["name"],
|
||||
"weight": endpoint.get("weight", 5),
|
||||
"is_fallback": endpoint.get("is_fallback", False),
|
||||
"max_concurrent": endpoint["max_concurrent"]
|
||||
})
|
||||
|
||||
raise ValueError(
|
||||
f"Endpoint '{endpoint.get('name', 'unnamed')}' is missing required 'max_concurrent' parameter"
|
||||
)
|
||||
|
||||
multiplexer_endpoints.append(
|
||||
{
|
||||
"base_url": endpoint["base_url"],
|
||||
"api_key": endpoint.get("api_key", ""),
|
||||
"name": endpoint["name"],
|
||||
"weight": endpoint.get("weight", 5),
|
||||
"is_fallback": endpoint.get("is_fallback", False),
|
||||
"max_concurrent": endpoint["max_concurrent"],
|
||||
}
|
||||
)
|
||||
|
||||
models[model_name] = ModelConfig(
|
||||
type=model_data["type"],
|
||||
model_file_name=model_data["model_file_name"],
|
||||
@@ -160,23 +190,26 @@ class HavenVLMEngine:
|
||||
model_identifier=model_data["model_identifier"],
|
||||
model_version=model_data["model_version"],
|
||||
use_multiplexer=model_data.get("use_multiplexer", False),
|
||||
max_concurrent_requests=model_data.get("max_concurrent_requests", 10),
|
||||
instance_count=model_data.get("instance_count",1),
|
||||
max_batch_size=model_data.get("max_batch_size",1),
|
||||
max_concurrent_requests=model_data.get(
|
||||
"max_concurrent_requests", 10
|
||||
),
|
||||
instance_count=model_data.get("instance_count", 1),
|
||||
max_batch_size=model_data.get("max_batch_size", 1),
|
||||
multiplexer_endpoints=multiplexer_endpoints,
|
||||
tag_list=model_data.get("tag_list", [])
|
||||
tag_list=model_data.get("tag_list", []),
|
||||
)
|
||||
else:
|
||||
models[model_name] = ModelConfig(
|
||||
type=model_data["type"],
|
||||
model_file_name=model_data["model_file_name"]
|
||||
model_file_name=model_data["model_file_name"],
|
||||
)
|
||||
|
||||
return EngineConfig(
|
||||
active_ai_models=vlm_config["active_ai_models"],
|
||||
pipelines=pipelines,
|
||||
models=models,
|
||||
category_config=vlm_config["category_config"]
|
||||
category_config=vlm_config["category_config"],
|
||||
loglevel="DEBUG" if vlm_config.get("trace_logging", False) else "WARNING",
|
||||
)
|
||||
|
||||
async def process_video(
|
||||
@@ -187,7 +220,7 @@ class HavenVLMEngine:
|
||||
threshold: Optional[float] = None,
|
||||
return_confidence: Optional[bool] = None,
|
||||
existing_json: Optional[Dict[str, Any]] = None,
|
||||
progress_callback: Optional[Callable[[int], None]] = None
|
||||
progress_callback: Optional[Callable[[int], None]] = None,
|
||||
) -> VideoTagInfo:
|
||||
"""Process a video using the VLM Engine"""
|
||||
if not self._initialized:
|
||||
@@ -195,41 +228,53 @@ class HavenVLMEngine:
|
||||
|
||||
try:
|
||||
logger.info(f"Processing video: {video_path}")
|
||||
|
||||
|
||||
# Use config defaults if not provided
|
||||
frame_interval = frame_interval or config.config.video_frame_interval
|
||||
threshold = threshold or config.config.video_threshold
|
||||
return_confidence = return_confidence if return_confidence is not None else config.config.video_confidence_return
|
||||
return_confidence = (
|
||||
return_confidence
|
||||
if return_confidence is not None
|
||||
else config.config.video_confidence_return
|
||||
)
|
||||
|
||||
# Process video through the engine
|
||||
results = await self.engine.process_video(
|
||||
video_path,
|
||||
frame_interval=frame_interval,
|
||||
progress_callback=progress_callback
|
||||
progress_callback=progress_callback,
|
||||
)
|
||||
|
||||
|
||||
logger.info(f"Video processing completed for: {video_path}")
|
||||
logger.debug(f"Raw results structure: {type(results)}")
|
||||
|
||||
|
||||
# Extract video_tag_info from the nested structure
|
||||
if isinstance(results, dict) and 'video_tag_info' in results:
|
||||
video_tag_data = results['video_tag_info']
|
||||
logger.debug(f"Using video_tag_info from results: {video_tag_data.keys()}")
|
||||
if isinstance(results, dict) and "video_tag_info" in results:
|
||||
video_tag_data = results["video_tag_info"]
|
||||
logger.debug(
|
||||
f"Using video_tag_info from results: {video_tag_data.keys()}"
|
||||
)
|
||||
else:
|
||||
# Fallback: assume results is already in the correct format
|
||||
video_tag_data = results
|
||||
logger.debug(f"Using results directly: {video_tag_data.keys() if isinstance(video_tag_data, dict) else type(video_tag_data)}")
|
||||
|
||||
logger.debug(
|
||||
f"Using results directly: {video_tag_data.keys() if isinstance(video_tag_data, dict) else type(video_tag_data)}"
|
||||
)
|
||||
|
||||
return VideoTagInfo.from_json(video_tag_data)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing video {video_path}: {e}")
|
||||
raise
|
||||
|
||||
def get_performance_stats(self) -> Dict[str, Any]:
|
||||
"""Get performance statistics from the VLM Engine."""
|
||||
if not self._initialized or not self.engine:
|
||||
return {}
|
||||
return self.engine.get_performance_stats()
|
||||
|
||||
async def find_optimal_marker_settings(
|
||||
self,
|
||||
existing_json: Dict[str, Any],
|
||||
desired_timespan_data: Dict[str, TimeFrame]
|
||||
self, existing_json: Dict[str, Any], desired_timespan_data: Dict[str, TimeFrame]
|
||||
) -> Dict[str, Any]:
|
||||
"""Find optimal marker settings based on existing data"""
|
||||
if not self._initialized:
|
||||
@@ -237,25 +282,24 @@ class HavenVLMEngine:
|
||||
|
||||
try:
|
||||
logger.info("Finding optimal marker settings...")
|
||||
|
||||
|
||||
# Convert TimeFrame objects to dict format
|
||||
desired_data = {}
|
||||
for key, timeframe in desired_timespan_data.items():
|
||||
desired_data[key] = {
|
||||
"start": timeframe.start,
|
||||
"end": timeframe.end,
|
||||
"total_confidence": timeframe.total_confidence
|
||||
"total_confidence": timeframe.total_confidence,
|
||||
}
|
||||
|
||||
# Call the engine's optimization method
|
||||
results = await self.engine.optimize_timeframe_settings(
|
||||
existing_json_data=existing_json,
|
||||
desired_timespan_data=desired_data
|
||||
existing_json_data=existing_json, desired_timespan_data=desired_data
|
||||
)
|
||||
|
||||
|
||||
logger.info("Optimal marker settings found")
|
||||
return results
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error finding optimal marker settings: {e}")
|
||||
raise
|
||||
@@ -267,14 +311,16 @@ class HavenVLMEngine:
|
||||
# VLMEngine doesn't have a shutdown method, just perform basic cleanup
|
||||
logger.info("VLM Engine cleanup completed")
|
||||
self._initialized = False
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during VLM Engine cleanup: {e}")
|
||||
self._initialized = False
|
||||
|
||||
|
||||
# Global VLM Engine instance
|
||||
vlm_engine = HavenVLMEngine()
|
||||
|
||||
|
||||
# Convenience functions for backward compatibility
|
||||
async def process_video_async(
|
||||
video_path: str,
|
||||
@@ -283,17 +329,24 @@ async def process_video_async(
|
||||
threshold: Optional[float] = None,
|
||||
return_confidence: Optional[bool] = None,
|
||||
existing_json: Optional[Dict[str, Any]] = None,
|
||||
progress_callback: Optional[Callable[[int], None]] = None
|
||||
progress_callback: Optional[Callable[[int], None]] = None,
|
||||
) -> VideoTagInfo:
|
||||
"""Process video asynchronously"""
|
||||
return await vlm_engine.process_video(
|
||||
video_path, vr_video, frame_interval, threshold, return_confidence, existing_json,
|
||||
progress_callback=progress_callback
|
||||
video_path,
|
||||
vr_video,
|
||||
frame_interval,
|
||||
threshold,
|
||||
return_confidence,
|
||||
existing_json,
|
||||
progress_callback=progress_callback,
|
||||
)
|
||||
|
||||
|
||||
async def find_optimal_marker_settings_async(
|
||||
existing_json: Dict[str, Any],
|
||||
desired_timespan_data: Dict[str, TimeFrame]
|
||||
existing_json: Dict[str, Any], desired_timespan_data: Dict[str, TimeFrame]
|
||||
) -> Dict[str, Any]:
|
||||
"""Find optimal marker settings asynchronously"""
|
||||
return await vlm_engine.find_optimal_marker_settings(existing_json, desired_timespan_data)
|
||||
return await vlm_engine.find_optimal_marker_settings(
|
||||
existing_json, desired_timespan_data
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user