From 2d45ea271ed635f9bc950541d36e19150100140a Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 2 Oct 2025 12:48:11 -0600 Subject: [PATCH] Refactor object genai to be a post-processor (#20331) * Refactor object genai to be a post-processor * Include function correctly --- .../post/object_descriptions.py | 349 +++++++++++++++++ frigate/data_processing/types.py | 4 + frigate/embeddings/maintainer.py | 360 +++--------------- frigate/genai/__init__.py | 2 +- frigate/stats/util.py | 8 + frigate/util/image.py | 21 +- 6 files changed, 440 insertions(+), 304 deletions(-) create mode 100644 frigate/data_processing/post/object_descriptions.py diff --git a/frigate/data_processing/post/object_descriptions.py b/frigate/data_processing/post/object_descriptions.py new file mode 100644 index 000000000..23af43548 --- /dev/null +++ b/frigate/data_processing/post/object_descriptions.py @@ -0,0 +1,349 @@ +"""Post processor for object descriptions using GenAI.""" + +import datetime +import logging +import os +import threading +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import cv2 +import numpy as np +from peewee import DoesNotExist + +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import CameraConfig, FrigateConfig +from frigate.const import CLIPS_DIR, UPDATE_EVENT_DESCRIPTION +from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor +from frigate.data_processing.types import PostProcessDataEnum +from frigate.genai import GenAIClient +from frigate.models import Event +from frigate.types import TrackedObjectUpdateTypesEnum +from frigate.util.builtin import EventsPerSecond, InferenceSpeed +from frigate.util.image import create_thumbnail, ensure_jpeg_bytes +from frigate.util.path import get_event_thumbnail_bytes + +if TYPE_CHECKING: + from frigate.embeddings import Embeddings + +from ..post.api import PostProcessorApi +from ..types import DataProcessorMetrics + +logger = logging.getLogger(__name__) + +MAX_THUMBNAILS = 10 + + +class ObjectDescriptionProcessor(PostProcessorApi): + def __init__( + self, + config: FrigateConfig, + embeddings: "Embeddings", + requestor: InterProcessRequestor, + metrics: DataProcessorMetrics, + client: GenAIClient, + semantic_trigger_processor: SemanticTriggerProcessor | None, + ): + super().__init__(config, metrics, None) + self.config = config + self.embeddings = embeddings + self.requestor = requestor + self.metrics = metrics + self.genai_client = client + self.semantic_trigger_processor = semantic_trigger_processor + self.tracked_events: dict[str, list[Any]] = {} + self.early_request_sent: dict[str, bool] = {} + self.object_desc_speed = InferenceSpeed(self.metrics.object_desc_speed) + self.object_desc_dps = EventsPerSecond() + self.object_desc_dps.start() + + def __handle_frame_update( + self, camera: str, data: dict, yuv_frame: np.ndarray + ) -> None: + """Handle an update to a frame for an object.""" + camera_config = self.config.cameras[camera] + + # no need to save our own thumbnails if genai is not enabled + # or if the object has become stationary + if not data["stationary"]: + if data["id"] not in self.tracked_events: + self.tracked_events[data["id"]] = [] + + data["thumbnail"] = create_thumbnail(yuv_frame, data["box"]) + + # Limit the number of thumbnails saved + if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS: + # Always keep the first thumbnail for the event + self.tracked_events[data["id"]].pop(1) + + self.tracked_events[data["id"]].append(data) + + # check if we're configured to send an early request after a minimum number of updates received + if camera_config.objects.genai.send_triggers.after_significant_updates: + if ( + len(self.tracked_events.get(data["id"], [])) + >= camera_config.objects.genai.send_triggers.after_significant_updates + and data["id"] not in self.early_request_sent + ): + if data["has_clip"] and data["has_snapshot"]: + event: Event = Event.get(Event.id == data["id"]) + + if ( + not camera_config.objects.genai.objects + or event.label in camera_config.objects.genai.objects + ) and ( + not camera_config.objects.genai.required_zones + or set(data["entered_zones"]) + & set(camera_config.objects.genai.required_zones) + ): + logger.debug(f"{camera} sending early request to GenAI") + + self.early_request_sent[data["id"]] = True + threading.Thread( + target=self._genai_embed_description, + name=f"_genai_embed_description_{event.id}", + daemon=True, + args=( + event, + [ + data["thumbnail"] + for data in self.tracked_events[data["id"]] + ], + ), + ).start() + + def __handle_frame_finalize( + self, camera: str, event: Event, thumbnail: bytes + ) -> None: + """Handle the finalization of a frame.""" + camera_config = self.config.cameras[camera] + + if ( + camera_config.objects.genai.enabled + and camera_config.objects.genai.send_triggers.tracked_object_end + and ( + not camera_config.objects.genai.objects + or event.label in camera_config.objects.genai.objects + ) + and ( + not camera_config.objects.genai.required_zones + or set(event.zones) & set(camera_config.objects.genai.required_zones) + ) + ): + self._process_genai_description(event, camera_config, thumbnail) + + def __regenerate_description(self, event_id: str, source: str, force: bool) -> None: + """Regenerate the description for an event.""" + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + logger.error(f"Event {event_id} not found for description regeneration") + return + + if self.genai_client is None: + logger.error("GenAI not enabled") + return + + camera_config = self.config.cameras[event.camera] + if not camera_config.objects.genai.enabled and not force: + logger.error(f"GenAI not enabled for camera {event.camera}") + return + + thumbnail = get_event_thumbnail_bytes(event) + + # ensure we have a jpeg to pass to the model + thumbnail = ensure_jpeg_bytes(thumbnail) + + logger.debug( + f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}" + ) + + if event.has_snapshot and source == "snapshot": + snapshot_image = self._read_and_crop_snapshot(event) + if not snapshot_image: + return + + embed_image = ( + [snapshot_image] + if event.has_snapshot and source == "snapshot" + else ( + [data["thumbnail"] for data in self.tracked_events[event_id]] + if len(self.tracked_events.get(event_id, [])) > 0 + else [thumbnail] + ) + ) + + self._genai_embed_description(event, embed_image) + + def process_data(self, frame_data: dict, data_type: PostProcessDataEnum) -> None: + """Process a frame update.""" + self.metrics.object_desc_dps.value = self.object_desc_dps.eps() + + if data_type != PostProcessDataEnum.tracked_object: + return + + state: str | None = frame_data.get("state", None) + + if state is not None: + logger.debug(f"Processing {state} for {frame_data['camera']}") + + if state == "update": + self.__handle_frame_update( + frame_data["camera"], frame_data["data"], frame_data["yuv_frame"] + ) + elif state == "finalize": + self.__handle_frame_finalize( + frame_data["camera"], frame_data["event"], frame_data["thumbnail"] + ) + + def handle_request(self, topic: str, data: dict[str, Any]) -> str | None: + """Handle a request.""" + if topic == "regenerate_description": + self.__regenerate_description( + data["event_id"], data["source"], data["force"] + ) + return None + + def _read_and_crop_snapshot(self, event: Event) -> bytes | None: + """Read, decode, and crop the snapshot image.""" + + snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg") + + if not os.path.isfile(snapshot_file): + logger.error( + f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}" + ) + return None + + try: + with open(snapshot_file, "rb") as image_file: + snapshot_image = image_file.read() + + img = cv2.imdecode( + np.frombuffer(snapshot_image, dtype=np.int8), + cv2.IMREAD_COLOR, + ) + + # Crop snapshot based on region + # provide full image if region doesn't exist (manual events) + height, width = img.shape[:2] + x1_rel, y1_rel, width_rel, height_rel = event.data.get( + "region", [0, 0, 1, 1] + ) + x1, y1 = int(x1_rel * width), int(y1_rel * height) + + cropped_image = img[ + y1 : y1 + int(height_rel * height), + x1 : x1 + int(width_rel * width), + ] + + _, buffer = cv2.imencode(".jpg", cropped_image) + + return buffer.tobytes() + except Exception: + return None + + def _process_genai_description( + self, event: Event, camera_config: CameraConfig, thumbnail + ) -> None: + if event.has_snapshot and camera_config.objects.genai.use_snapshot: + snapshot_image = self._read_and_crop_snapshot(event) + if not snapshot_image: + return + + num_thumbnails = len(self.tracked_events.get(event.id, [])) + + # ensure we have a jpeg to pass to the model + thumbnail = ensure_jpeg_bytes(thumbnail) + + embed_image = ( + [snapshot_image] + if event.has_snapshot and camera_config.objects.genai.use_snapshot + else ( + [data["thumbnail"] for data in self.tracked_events[event.id]] + if num_thumbnails > 0 + else [thumbnail] + ) + ) + + if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0: + logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}") + + Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir( + parents=True, exist_ok=True + ) + + for idx, data in enumerate(self.tracked_events[event.id], 1): + jpg_bytes: bytes | None = data["thumbnail"] + + if jpg_bytes is None: + logger.warning(f"Unable to save thumbnail {idx} for {event.id}.") + else: + with open( + os.path.join( + CLIPS_DIR, + f"genai-requests/{event.id}/{idx}.jpg", + ), + "wb", + ) as j: + j.write(jpg_bytes) + + # Generate the description. Call happens in a thread since it is network bound. + threading.Thread( + target=self._genai_embed_description, + name=f"_genai_embed_description_{event.id}", + daemon=True, + args=( + event, + embed_image, + ), + ).start() + + # Delete tracked events based on the event_id + if event.id in self.tracked_events: + del self.tracked_events[event.id] + + def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None: + """Embed the description for an event.""" + start = datetime.datetime.now().timestamp() + camera_config = self.config.cameras[event.camera] + description = self.genai_client.generate_object_description( + camera_config, thumbnails, event + ) + + if not description: + logger.debug("Failed to generate description for %s", event.id) + return + + # fire and forget description update + self.requestor.send_data( + UPDATE_EVENT_DESCRIPTION, + { + "type": TrackedObjectUpdateTypesEnum.description, + "id": event.id, + "description": description, + "camera": event.camera, + }, + ) + + # Embed the description + if self.config.semantic_search.enabled: + self.embeddings.embed_description(event.id, description) + + # Check semantic trigger for this description + if self.semantic_trigger_processor is not None: + self.semantic_trigger_processor.process_data( + {"event_id": event.id, "camera": event.camera, "type": "text"}, + PostProcessDataEnum.tracked_object, + ) + + # Update inference timing metrics + self.object_desc_speed.update(datetime.datetime.now().timestamp() - start) + self.object_desc_dps.update() + + logger.debug( + "Generated description for %s (%d images): %s", + event.id, + len(thumbnails), + description, + ) diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index c77880535..263a8b987 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -22,6 +22,8 @@ class DataProcessorMetrics: yolov9_lpr_pps: Synchronized review_desc_speed: Synchronized review_desc_dps: Synchronized + object_desc_speed: Synchronized + object_desc_dps: Synchronized classification_speeds: dict[str, Synchronized] classification_cps: dict[str, Synchronized] @@ -38,6 +40,8 @@ class DataProcessorMetrics: self.yolov9_lpr_pps = manager.Value("d", 0.0) self.review_desc_speed = manager.Value("d", 0.0) self.review_desc_dps = manager.Value("d", 0.0) + self.object_desc_speed = manager.Value("d", 0.0) + self.object_desc_dps = manager.Value("d", 0.0) self.classification_speeds = manager.dict() self.classification_cps = manager.dict() diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index d06d3c481..ca2fa9534 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -3,14 +3,10 @@ import base64 import datetime import logging -import os import threading from multiprocessing.synchronize import Event as MpEvent -from pathlib import Path -from typing import Any, Optional +from typing import Any -import cv2 -import numpy as np from peewee import DoesNotExist from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum @@ -30,16 +26,12 @@ from frigate.comms.recordings_updater import ( RecordingsDataTypeEnum, ) from frigate.comms.review_updater import ReviewDataSubscriber -from frigate.config import CameraConfig, FrigateConfig +from frigate.config import FrigateConfig from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, ) -from frigate.const import ( - CLIPS_DIR, - UPDATE_EVENT_DESCRIPTION, -) from frigate.data_processing.common.license_plate.model import ( LicensePlateModelRunner, ) @@ -50,6 +42,7 @@ from frigate.data_processing.post.audio_transcription import ( from frigate.data_processing.post.license_plate import ( LicensePlatePostProcessor, ) +from frigate.data_processing.post.object_descriptions import ObjectDescriptionProcessor from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor from frigate.data_processing.real_time.api import RealTimeProcessorApi @@ -67,13 +60,8 @@ from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.genai import get_genai_client from frigate.models import Event, Recordings, ReviewSegment, Trigger -from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize -from frigate.util.image import ( - SharedMemoryFrameManager, - calculate_region, - ensure_jpeg_bytes, -) +from frigate.util.image import SharedMemoryFrameManager from frigate.util.path import get_event_thumbnail_bytes from .embeddings import Embeddings @@ -235,20 +223,30 @@ class EmbeddingMaintainer(threading.Thread): AudioTranscriptionPostProcessor(self.config, self.requestor, metrics) ) + semantic_trigger_processor: SemanticTriggerProcessor | None = None if self.config.semantic_search.enabled: + semantic_trigger_processor = SemanticTriggerProcessor( + db, + self.config, + self.requestor, + metrics, + self.embeddings, + ) + self.post_processors.append(semantic_trigger_processor) + + if any(c.objects.genai.enabled_in_config for c in self.config.cameras.values()): self.post_processors.append( - SemanticTriggerProcessor( - db, + ObjectDescriptionProcessor( self.config, - self.requestor, - metrics, self.embeddings, + self.requestor, + self.metrics, + self.genai_client, + semantic_trigger_processor, ) ) self.stop_event = stop_event - self.tracked_events: dict[str, list[Any]] = {} - self.early_request_sent: dict[str, bool] = {} # recordings data self.recordings_available_through: dict[str, float] = {} @@ -337,11 +335,8 @@ class EmbeddingMaintainer(threading.Thread): camera_config = self.config.cameras[camera] - # no need to process updated objects if face recognition, lpr, genai are disabled - if ( - not camera_config.objects.genai.enabled - and len(self.realtime_processors) == 0 - ): + # no need to process updated objects if no processors are active + if len(self.realtime_processors) == 0 and len(self.post_processors) == 0: return # Create our own thumbnail based on the bounding box and the frame time @@ -361,57 +356,17 @@ class EmbeddingMaintainer(threading.Thread): for processor in self.realtime_processors: processor.process_frame(data, yuv_frame) - # no need to save our own thumbnails if genai is not enabled - # or if the object has become stationary - if self.genai_client is not None and not data["stationary"]: - if data["id"] not in self.tracked_events: - self.tracked_events[data["id"]] = [] - - data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) - - # Limit the number of thumbnails saved - if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS: - # Always keep the first thumbnail for the event - self.tracked_events[data["id"]].pop(1) - - self.tracked_events[data["id"]].append(data) - - # check if we're configured to send an early request after a minimum number of updates received - if ( - self.genai_client is not None - and camera_config.objects.genai.send_triggers.after_significant_updates - ): - if ( - len(self.tracked_events.get(data["id"], [])) - >= camera_config.objects.genai.send_triggers.after_significant_updates - and data["id"] not in self.early_request_sent - ): - if data["has_clip"] and data["has_snapshot"]: - event: Event = Event.get(Event.id == data["id"]) - - if ( - not camera_config.objects.genai.objects - or event.label in camera_config.objects.genai.objects - ) and ( - not camera_config.objects.genai.required_zones - or set(data["entered_zones"]) - & set(camera_config.objects.genai.required_zones) - ): - logger.debug(f"{camera} sending early request to GenAI") - - self.early_request_sent[data["id"]] = True - threading.Thread( - target=self._genai_embed_description, - name=f"_genai_embed_description_{event.id}", - daemon=True, - args=( - event, - [ - data["thumbnail"] - for data in self.tracked_events[data["id"]] - ], - ), - ).start() + for processor in self.post_processors: + if isinstance(processor, ObjectDescriptionProcessor): + processor.process_data( + { + "camera": camera, + "data": data, + "state": "update", + "yuv_frame": yuv_frame, + }, + PostProcessDataEnum.tracked_object, + ) self.frame_manager.close(frame_name) @@ -424,12 +379,13 @@ class EmbeddingMaintainer(threading.Thread): break event_id, camera, updated_db = ended - camera_config = self.config.cameras[camera] # expire in realtime processors for processor in self.realtime_processors: processor.expire_object(event_id, camera) + thumbnail: bytes | None = None + if updated_db: try: event: Event = Event.get(Event.id == event_id) @@ -446,23 +402,6 @@ class EmbeddingMaintainer(threading.Thread): # Embed the thumbnail self._embed_thumbnail(event_id, thumbnail) - # Run GenAI - if ( - camera_config.objects.genai.enabled - and camera_config.objects.genai.send_triggers.tracked_object_end - and self.genai_client is not None - and ( - not camera_config.objects.genai.objects - or event.label in camera_config.objects.genai.objects - ) - and ( - not camera_config.objects.genai.required_zones - or set(event.zones) - & set(camera_config.objects.genai.required_zones) - ) - ): - self._process_genai_description(event, camera_config, thumbnail) - # call any defined post processors for processor in self.post_processors: if isinstance(processor, LicensePlatePostProcessor): @@ -492,16 +431,25 @@ class EmbeddingMaintainer(threading.Thread): {"event_id": event_id, "camera": camera, "type": "image"}, PostProcessDataEnum.tracked_object, ) + elif isinstance(processor, ObjectDescriptionProcessor): + if not updated_db: + continue + + processor.process_data( + { + "event": event, + "camera": camera, + "state": "finalize", + "thumbnail": thumbnail, + }, + PostProcessDataEnum.tracked_object, + ) else: processor.process_data( {"event_id": event_id, "camera": camera}, PostProcessDataEnum.tracked_object, ) - # Delete tracked events based on the event_id - if event_id in self.tracked_events: - del self.tracked_events[event_id] - def _expire_dedicated_lpr(self) -> None: """Remove plates not seen for longer than expiration timeout for dedicated lpr cameras.""" now = datetime.datetime.now().timestamp() @@ -570,9 +518,16 @@ class EmbeddingMaintainer(threading.Thread): event_id, source, force = payload if event_id: - self.handle_regenerate_description( - event_id, RegenerateDescriptionEnum(source), force - ) + for processor in self.post_processors: + if isinstance(processor, ObjectDescriptionProcessor): + processor.handle_request( + "regenerate_description", + { + "event_id": event_id, + "source": RegenerateDescriptionEnum(source), + "force": force, + }, + ) def _process_frame_updates(self) -> None: """Process event updates""" @@ -622,208 +577,9 @@ class EmbeddingMaintainer(threading.Thread): self.frame_manager.close(frame_name) - def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: - """Return jpg thumbnail of a region of the frame.""" - frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) - region = calculate_region( - frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4 - ) - frame = frame[region[1] : region[3], region[0] : region[2]] - width = int(height * frame.shape[1] / frame.shape[0]) - frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA) - ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) - - if ret: - return jpg.tobytes() - - return None - def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None: """Embed the thumbnail for an event.""" if not self.config.semantic_search.enabled: return self.embeddings.embed_thumbnail(event_id, thumbnail) - - def _process_genai_description( - self, event: Event, camera_config: CameraConfig, thumbnail - ) -> None: - if event.has_snapshot and camera_config.objects.genai.use_snapshot: - snapshot_image = self._read_and_crop_snapshot(event, camera_config) - if not snapshot_image: - return - - num_thumbnails = len(self.tracked_events.get(event.id, [])) - - # ensure we have a jpeg to pass to the model - thumbnail = ensure_jpeg_bytes(thumbnail) - - embed_image = ( - [snapshot_image] - if event.has_snapshot and camera_config.objects.genai.use_snapshot - else ( - [data["thumbnail"] for data in self.tracked_events[event.id]] - if num_thumbnails > 0 - else [thumbnail] - ) - ) - - if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0: - logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}") - - Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir( - parents=True, exist_ok=True - ) - - for idx, data in enumerate(self.tracked_events[event.id], 1): - jpg_bytes: bytes = data["thumbnail"] - - if jpg_bytes is None: - logger.warning(f"Unable to save thumbnail {idx} for {event.id}.") - else: - with open( - os.path.join( - CLIPS_DIR, - f"genai-requests/{event.id}/{idx}.jpg", - ), - "wb", - ) as j: - j.write(jpg_bytes) - - # Generate the description. Call happens in a thread since it is network bound. - threading.Thread( - target=self._genai_embed_description, - name=f"_genai_embed_description_{event.id}", - daemon=True, - args=( - event, - embed_image, - ), - ).start() - - def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None: - """Embed the description for an event.""" - camera_config = self.config.cameras[event.camera] - - description = self.genai_client.generate_object_description( - camera_config, thumbnails, event - ) - - if not description: - logger.debug("Failed to generate description for %s", event.id) - return - - # fire and forget description update - self.requestor.send_data( - UPDATE_EVENT_DESCRIPTION, - { - "type": TrackedObjectUpdateTypesEnum.description, - "id": event.id, - "description": description, - "camera": event.camera, - }, - ) - - # Embed the description - if self.config.semantic_search.enabled: - self.embeddings.embed_description(event.id, description) - - # Check semantic trigger for this description - for processor in self.post_processors: - if isinstance(processor, SemanticTriggerProcessor): - processor.process_data( - {"event_id": event.id, "camera": event.camera, "type": "text"}, - PostProcessDataEnum.tracked_object, - ) - else: - continue - - logger.debug( - "Generated description for %s (%d images): %s", - event.id, - len(thumbnails), - description, - ) - - def _read_and_crop_snapshot(self, event: Event, camera_config) -> bytes | None: - """Read, decode, and crop the snapshot image.""" - - snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg") - - if not os.path.isfile(snapshot_file): - logger.error( - f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}" - ) - return None - - try: - with open(snapshot_file, "rb") as image_file: - snapshot_image = image_file.read() - - img = cv2.imdecode( - np.frombuffer(snapshot_image, dtype=np.int8), - cv2.IMREAD_COLOR, - ) - - # Crop snapshot based on region - # provide full image if region doesn't exist (manual events) - height, width = img.shape[:2] - x1_rel, y1_rel, width_rel, height_rel = event.data.get( - "region", [0, 0, 1, 1] - ) - x1, y1 = int(x1_rel * width), int(y1_rel * height) - - cropped_image = img[ - y1 : y1 + int(height_rel * height), - x1 : x1 + int(width_rel * width), - ] - - _, buffer = cv2.imencode(".jpg", cropped_image) - - return buffer.tobytes() - except Exception: - return None - - def handle_regenerate_description( - self, event_id: str, source: str, force: bool - ) -> None: - try: - event: Event = Event.get(Event.id == event_id) - except DoesNotExist: - logger.error(f"Event {event_id} not found for description regeneration") - return - - if self.genai_client is None: - logger.error("GenAI not enabled") - return - - camera_config = self.config.cameras[event.camera] - if not camera_config.objects.genai.enabled and not force: - logger.error(f"GenAI not enabled for camera {event.camera}") - return - - thumbnail = get_event_thumbnail_bytes(event) - - # ensure we have a jpeg to pass to the model - thumbnail = ensure_jpeg_bytes(thumbnail) - - logger.debug( - f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}" - ) - - if event.has_snapshot and source == "snapshot": - snapshot_image = self._read_and_crop_snapshot(event, camera_config) - if not snapshot_image: - return - - embed_image = ( - [snapshot_image] - if event.has_snapshot and source == "snapshot" - else ( - [data["thumbnail"] for data in self.tracked_events[event_id]] - if len(self.tracked_events.get(event_id, [])) > 0 - else [thumbnail] - ) - ) - - self._genai_embed_description(event, embed_image) diff --git a/frigate/genai/__init__.py b/frigate/genai/__init__.py index ecb0244e2..f3d77e9ee 100644 --- a/frigate/genai/__init__.py +++ b/frigate/genai/__init__.py @@ -32,7 +32,7 @@ def register_genai_provider(key: GenAIProviderEnum): class GenAIClient: """Generative AI client for Frigate.""" - def __init__(self, genai_config: GenAIConfig, timeout: int = 60) -> None: + def __init__(self, genai_config: GenAIConfig, timeout: int = 120) -> None: self.genai_config: GenAIConfig = genai_config self.timeout = timeout self.provider = self._init_provider() diff --git a/frigate/stats/util.py b/frigate/stats/util.py index 6d20f8f9a..1f92ffbe0 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -361,6 +361,14 @@ def stats_snapshot( embeddings_metrics.review_desc_dps.value, 2 ) + if embeddings_metrics.object_desc_speed.value > 0.0: + stats["embeddings"]["object_description_speed"] = round( + embeddings_metrics.object_desc_speed.value * 1000, 2 + ) + stats["embeddings"]["object_descriptions"] = round( + embeddings_metrics.object_desc_dps.value, 2 + ) + for key in embeddings_metrics.classification_speeds.keys(): stats["embeddings"][f"{key}_classification_speed"] = round( embeddings_metrics.classification_speeds[key].value * 1000, 2 diff --git a/frigate/util/image.py b/frigate/util/image.py index 3406c092f..0ebd2f1a1 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -995,7 +995,26 @@ def get_histogram(image, x_min, y_min, x_max, y_max): return cv2.normalize(hist, hist).flatten() -def ensure_jpeg_bytes(image_data): +def create_thumbnail( + yuv_frame: np.ndarray, box: tuple[int, int, int, int], height=500 +) -> Optional[bytes]: + """Return jpg thumbnail of a region of the frame.""" + frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) + region = calculate_region( + frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4 + ) + frame = frame[region[1] : region[3], region[0] : region[2]] + width = int(height * frame.shape[1] / frame.shape[0]) + frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA) + ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) + + if ret: + return jpg.tobytes() + + return None + + +def ensure_jpeg_bytes(image_data: bytes) -> bytes: """Ensure image data is jpeg bytes for genai""" try: img_array = np.frombuffer(image_data, dtype=np.uint8)