2026-02-07 13:13:02 +01:00

1220 lines
47 KiB
Python

"""
Watch domain model for change detection monitoring.
ARCHITECTURE NOTE: Configuration Override Hierarchy
===================================================
This module implements Watch objects that inherit from dict (technical debt).
The dream architecture would use Pydantic for:
1. CHAIN RESOLUTION (Watch → Tag → Global Settings)
- Current: Manual resolution scattered across codebase
- Future: @computed_field properties with automatic resolution
- Examples: resolved_fetch_backend, resolved_restock_settings, etc.
2. DATABASE BACKEND ABSTRACTION
- Current: Domain model tightly coupled to file-based JSON storage
- Future: Domain model (Pydantic) separate from persistence layer
- Enables: Easy migration to PostgreSQL, MongoDB, etc.
3. TYPE SAFETY & VALIDATION
- Current: Dict access with no compile-time checks
- Future: Type hints, IDE autocomplete, validation at boundaries
See class model docstring for detailed explanation and examples.
See: processors/restock_diff/processor.py:184-192 for manual resolution example
"""
from blinker import signal
from changedetectionio.validate_url import is_safe_valid_url
from changedetectionio.strtobool import strtobool
from changedetectionio.jinja2_custom import render as jinja_render
from . import watch_base
from .persistence import EntityPersistenceMixin
import os
import re
from pathlib import Path
from loguru import logger
from .. import jinja2_custom as safe_jinja
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024*20))
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
"""
Save compressed data using native brotli with streaming compression.
Uses chunked compression to minimize peak memory usage and malloc_trim()
to force release of C-level memory back to the OS.
Args:
contents: data to compress (str or bytes)
filepath: destination file path
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
Returns:
str: actual filepath saved (may differ from input if fallback used)
Raises:
Exception: if compression fails and fallback_uncompressed is False
"""
import brotli
import gc
import ctypes
# Ensure contents are bytes
if isinstance(contents, str):
contents = contents.encode('utf-8')
try:
original_size = len(contents)
logger.debug(f"Starting brotli streaming compression of {original_size} bytes.")
# Create streaming compressor
compressor = brotli.Compressor(quality=6, mode=mode if mode is not None else brotli.MODE_GENERIC)
# Stream compress in chunks to minimize memory usage
chunk_size = 65536 # 64KB chunks
total_compressed_size = 0
with open(filepath, 'wb') as f:
# Process data in chunks
offset = 0
while offset < len(contents):
chunk = contents[offset:offset + chunk_size]
compressed_chunk = compressor.process(chunk)
if compressed_chunk:
f.write(compressed_chunk)
total_compressed_size += len(compressed_chunk)
offset += chunk_size
# Finalize compression - critical for proper cleanup
final_chunk = compressor.finish()
if final_chunk:
f.write(final_chunk)
total_compressed_size += len(final_chunk)
logger.debug(f"Finished brotli compression - From {original_size} to {total_compressed_size} bytes.")
# Cleanup: Delete compressor, force Python GC, then force C-level memory release
del compressor
gc.collect()
# Force release of C-level memory back to OS (since brotli is a C library)
try:
ctypes.CDLL('libc.so.6').malloc_trim(0)
except Exception:
pass # malloc_trim not available on all systems (e.g., macOS)
return filepath
except Exception as e:
logger.error(f"Brotli compression error: {e}")
# Compression failed
if fallback_uncompressed:
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
fallback_path = filepath.replace('.br', '')
with open(fallback_path, 'wb') as f:
f.write(contents)
return fallback_path
else:
raise Exception(f"Brotli compression failed for {filepath}: {e}")
class model(EntityPersistenceMixin, watch_base):
"""
Watch domain model for monitoring URL changes.
Inherits from watch_base (which inherits dict) - see watch_base docstring for field documentation.
## Configuration Override Hierarchy (Chain Resolution)
The dream architecture uses a 3-level resolution chain:
Watch settings → Tag/Group settings → Global settings
Current implementation is MANUAL (see processor.py:184-192 for example):
- Processors manually check watch.get('field')
- Then loop through watch.tags to find first tag with overrides_watch=True
- Finally fall back to datastore['settings']['application']['field']
FUTURE: Pydantic-based chain resolution would enable:
```python
# Instead of manual resolution in every processor:
restock_settings = watch.get('restock_settings', {})
for tag_uuid in watch.get('tags'):
tag = datastore['settings']['application']['tags'][tag_uuid]
if tag.get('overrides_watch'):
restock_settings = tag.get('restock_settings', {})
break
# Clean computed properties with automatic resolution:
@computed_field
def resolved_restock_settings(self) -> dict:
if self.restock_settings:
return self.restock_settings
for tag_uuid in self.tags:
tag = self._datastore.get_tag(tag_uuid)
if tag.overrides_watch and tag.restock_settings:
return tag.restock_settings
return self._datastore.settings.restock_settings or {}
# Usage: watch.resolved_restock_settings (automatic, type-safe, tested once)
```
Benefits of Pydantic migration:
1. Single source of truth for resolution logic (not scattered across processors)
2. Type safety + IDE autocomplete (watch.resolved_fetch_backend vs dict navigation)
3. Database backend abstraction (domain model separate from persistence)
4. Automatic validation at boundaries
5. Self-documenting via type hints
6. Easy to test resolution independently
Resolution chain examples that would benefit:
- fetch_backend: watch → tag → global (see get_fetch_backend property)
- notification_urls: watch → tag → global
- time_between_check: watch → global (see threshold_seconds)
- restock_settings: watch → tag (see processors/restock_diff/processor.py:184-192)
- history_snapshot_max_length: watch → global (see save_history_blob:550-556)
- All processor_config_* settings could use tag overrides
## Database Backend Abstraction with Pydantic
Current: Watch inherits dict, tightly coupled to file-based JSON storage
Future: Domain model (Watch) separate from persistence layer
```python
# Domain model (database-agnostic)
class Watch(BaseModel):
uuid: str
url: str
# ... validation, business logic
# Pluggable backends
class DataStoreBackend(ABC):
def save_watch(self, watch: Watch): ...
def load_watch(self, uuid: str) -> Watch: ...
# Implementations: FileBackend, MongoBackend, PostgresBackend, etc.
```
This would enable:
- Easy migration between storage backends (file → postgres → mongodb)
- Pydantic handles serialization/deserialization automatically
- Domain logic stays clean (no storage concerns in Watch methods)
## Migration Path
Given existing codebase, incremental migration recommended:
1. Create Pydantic models alongside existing dict-based models
2. Add .to_pydantic() / .from_pydantic() bridge methods
3. Gradually migrate code to use Pydantic models
4. Remove dict inheritance once migration complete
See: watch_base docstring for technical debt discussion
See: processors/restock_diff/processor.py:184-192 for manual resolution example
See: Watch.py:550-556 for nested dict navigation that would become watch.resolved_*
"""
__newest_history_key = None
__history_n = 0
jitter_seconds = 0
def __init__(self, *arg, **kw):
# Validate __datastore before calling parent (Watch requires it)
if not kw.get('__datastore'):
raise ValueError("Watch object requires '__datastore' reference - cannot access global settings without it")
# Parent class (watch_base) handles __datastore and __datastore_path
super(model, self).__init__(*arg, **kw)
if kw.get('default'):
self.update(kw['default'])
del kw['default']
if self.get('default'):
del self['default']
# Be sure the cached timestamp is ready
bump = self.history
# Note: __deepcopy__, __getstate__, and __setstate__ are inherited from watch_base
# This prevents memory leaks by sharing __datastore reference instead of copying it
@property
def viewed(self):
# Don't return viewed when last_viewed is 0 and newest_key is 0
if int(self['last_viewed']) and int(self['last_viewed']) >= int(self.newest_history_key) :
return True
return False
@property
def has_unviewed(self):
return int(self.newest_history_key) > int(self['last_viewed']) and self.__history_n >= 2
@property
def link(self):
url = self.get('url', '')
if not is_safe_valid_url(url):
return 'DISABLED'
ready_url = url
if '{%' in url or '{{' in url:
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
try:
ready_url = jinja_render(template_str=url)
except Exception as e:
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
from flask import flash, url_for
from markupsafe import Markup
message = Markup('<a href="{}#general">The URL {} is invalid and cannot be used, click to edit</a>'.format(
url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', '')))
flash(message, 'error')
return ''
if ready_url.startswith('source:'):
ready_url=ready_url.replace('source:', '')
# Also double check it after any Jinja2 formatting just incase
if not is_safe_valid_url(ready_url):
return 'DISABLED'
return ready_url
@property
def domain_only_from_link(self):
from urllib.parse import urlparse
parsed = urlparse(self.link)
domain = parsed.hostname
return domain
@property
def history_index_filename(self):
# So that you dont try to view different histories in different 'diff' setups, can confuse cdio.
processor = self.get('processor')
if not processor or self.get('processor') == 'text_json_diff':
return 'history.txt'
else:
return f'history-{processor}.txt'
def clear_watch(self):
import pathlib
# Get list of processor config files to preserve
from changedetectionio.processors import find_processors
processor_names = [name for cls, name in find_processors()]
processor_config_files = {f"{name}.json" for name in processor_names}
# JSON Data, Screenshots, Textfiles (history index and snapshots), HTML in the future etc
# But preserve processor config files (they're configuration, not history data)
# Use glob not rglob here for safety.
for item in pathlib.Path(str(self.data_dir)).glob("*.*"):
# Skip processor config files
if item.name in processor_config_files:
continue
os.unlink(item)
# Force the attr to recalculate
bump = self.history
# Do this last because it will trigger a recheck due to last_checked being zero
self.update({
'browser_steps_last_error_step': None,
'check_count': 0,
'fetch_time': 0.0,
'has_ldjson_price_data': None,
'last_checked': 0,
'last_error': False,
'last_notification_error': False,
'last_viewed': 0,
'previous_md5': False,
'previous_md5_before_filters': False,
'remote_server_reply': None,
'track_ldjson_price_data': None
})
watch_check_update = signal('watch_check_update')
if watch_check_update:
watch_check_update.send(watch_uuid=self.get('uuid'))
return
@property
def is_source_type_url(self):
return self.get('url', '').startswith('source:')
@property
def get_fetch_backend(self):
"""
Get the fetch backend for this watch with special case handling.
CHAIN RESOLUTION OPPORTUNITY:
Currently returns watch.fetch_backend directly, but doesn't implement
Watch → Tag → Global resolution chain. With Pydantic:
@computed_field
def resolved_fetch_backend(self) -> str:
# Special case: PDFs always use html_requests
if self.is_pdf:
return 'html_requests'
# Watch override
if self.fetch_backend and self.fetch_backend != 'system':
return self.fetch_backend
# Tag override (first tag with overrides_watch=True wins)
for tag_uuid in self.tags:
tag = self._datastore.get_tag(tag_uuid)
if tag.overrides_watch and tag.fetch_backend:
return tag.fetch_backend
# Global default
return self._datastore.settings.fetch_backend
"""
# Maybe also if is_image etc?
# This is because chrome/playwright wont render the PDF in the browser and we will just fetch it and use pdf2html to see the text.
if self.is_pdf:
return 'html_requests'
return self.get('fetch_backend')
@property
def is_pdf(self):
# content_type field is set in the future
# https://github.com/dgtlmoon/changedetection.io/issues/1392
# Not sure the best logic here
return self.get('url', '').lower().endswith('.pdf') or 'pdf' in self.get('content_type', '').lower()
@property
def label(self):
# Used for sorting, display, etc
return self.get('title') or self.get('page_title') or self.link
@property
def last_changed(self):
# last_changed will be the newest snapshot, but when we have just one snapshot, it should be 0
if self.__history_n <= 1:
return 0
if self.__newest_history_key:
return int(self.__newest_history_key)
return 0
@property
def history_n(self):
return self.__history_n
@property
def history(self):
"""History index is just a text file as a list
{watch-uuid}/history.txt
contains a list like
{epoch-time},{filename}\n
We read in this list as the history information
"""
tmp_history = {}
# In the case we are only using the watch for processing without history
if not self.data_dir:
return []
# Read the history file as a dict
fname = os.path.join(self.data_dir, self.history_index_filename)
if os.path.isfile(fname):
logger.debug(f"Reading watch history index for {self.get('uuid')}")
with open(fname, "r", encoding='utf-8') as f:
for i in f.readlines():
if ',' in i:
k, v = i.strip().split(',', 2)
# The index history could contain a relative path, so we need to make the fullpath
# so that python can read it
# Cross-platform: check for any path separator (works on Windows and Unix)
if os.sep not in v and '/' not in v and '\\' not in v:
# Relative filename only, no path separators
v = os.path.join(self.data_dir, v)
else:
# It's possible that they moved the datadir on older versions
# So the snapshot exists but is in a different path
# Cross-platform: use os.path.basename instead of split('/')
snapshot_fname = os.path.basename(v)
proposed_new_path = os.path.join(self.data_dir, snapshot_fname)
if not os.path.exists(v) and os.path.exists(proposed_new_path):
v = proposed_new_path
tmp_history[k] = v
if len(tmp_history):
self.__newest_history_key = list(tmp_history.keys())[-1]
else:
self.__newest_history_key = None
self.__history_n = len(tmp_history)
return tmp_history
@property
def has_history(self):
fname = os.path.join(self.data_dir, self.history_index_filename)
return os.path.isfile(fname)
@property
def has_browser_steps(self):
has_browser_steps = self.get('browser_steps') and list(filter(
lambda s: (s['operation'] and len(s['operation']) and s['operation'] != 'Choose one' and s['operation'] != 'Goto site'),
self.get('browser_steps')))
return has_browser_steps
@property
def has_restock_info(self):
if self.get('restock') and self['restock'].get('in_stock') != None:
return True
return False
# Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0.
@property
def newest_history_key(self):
if self.__newest_history_key is not None:
return self.__newest_history_key
if len(self.history) <= 1:
return 0
bump = self.history
return self.__newest_history_key
# Given an arbitrary timestamp, find the best history key for the [diff] button so it can preset a smarter from_version
@property
def get_from_version_based_on_last_viewed(self):
"""Unfortunately for now timestamp is stored as string key"""
keys = list(self.history.keys())
if not keys:
return None
if len(keys) == 1:
return keys[0]
last_viewed = int(self.get('last_viewed'))
sorted_keys = sorted(keys, key=lambda x: int(x))
sorted_keys.reverse()
# When the 'last viewed' timestamp is greater than or equal the newest snapshot, return second newest
if last_viewed >= int(sorted_keys[0]):
return sorted_keys[1]
# When the 'last viewed' timestamp is between snapshots, return the older snapshot
for newer, older in list(zip(sorted_keys[0:], sorted_keys[1:])):
if last_viewed < int(newer) and last_viewed >= int(older):
return older
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
return sorted_keys[-1]
def get_history_snapshot(self, timestamp=None, filepath=None):
"""
Accepts either timestamp or filepath
:param timestamp:
:param filepath:
:return:
"""
import brotli
if not filepath:
filepath = self.history[timestamp]
# Check if binary file (image, PDF, etc.)
# Binary files are NEVER saved with .br compression, only text files are
binary_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.pdf', '.bin', '.jfif')
is_binary = any(filepath.endswith(ext) for ext in binary_extensions)
# Only look for .br versions for text files
if not is_binary:
# See if a brotli version exists and switch to that (text files only)
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
filepath = f"{filepath}.br"
# OR in the backup case that the .br does not exist, but the plain one does
if filepath.endswith('.br') and not os.path.isfile(filepath):
if os.path.isfile(filepath.replace('.br', '')):
filepath = filepath.replace('.br', '')
# Handle .br compressed text files
if filepath.endswith('.br'):
# Brotli doesnt have a fileheader to detect it, so we rely on filename
# https://www.rfc-editor.org/rfc/rfc7932
# Note: .br should ONLY exist for text files, never binary
with open(filepath, 'rb') as f:
return brotli.decompress(f.read()).decode('utf-8')
# Binary file - return raw bytes
if is_binary:
with open(filepath, 'rb') as f:
return f.read()
# Text file - decode to string
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
def _write_atomic(self, dest, data, mode='wb'):
"""Write data atomically to dest using a temp file"""
import tempfile
with tempfile.NamedTemporaryFile(mode, delete=False, dir=self.data_dir) as tmp:
tmp.write(data)
tmp.flush()
os.fsync(tmp.fileno())
tmp_path = tmp.name
os.replace(tmp_path, dest)
def history_trim(self, newest_n_items):
from pathlib import Path
import gc
# Sort by timestamp (key)
sorted_items = sorted(self.history.items(), key=lambda x: int(x[0]))
keep_part = dict(sorted_items[-newest_n_items:])
delete_part = dict(sorted_items[:-newest_n_items])
logger.info( f"[{self.get('uuid')}] Trimming history to most recent {newest_n_items} items, keeping {len(keep_part)} items deleting {len(delete_part)} items.")
if delete_part:
for item in delete_part.items():
try:
Path(item[1]).unlink(missing_ok=True)
except Exception as e:
logger.critical(f"{str(e)}")
finally:
logger.debug(f"[{self.get('uuid')}] Deleted {item[1]} history snapshot")
try:
dest = os.path.join(self.data_dir, self.history_index_filename)
output = "\r\n".join(
f"{k},{Path(v).name}"
for k, v in keep_part.items()
)+"\r\n"
self._write_atomic(dest=dest, data=output, mode='w')
except Exception as e:
logger.critical(f"{str(e)}")
finally:
logger.debug(f"[{self.get('uuid')}] Updated history index {dest}")
# reimport
bump = self.history
gc.collect()
# Save some text file to the appropriate path and bump the history
# result_obj from fetch_site_status.run()
def save_history_blob(self, contents, timestamp, snapshot_id):
logger.trace(f"{self.get('uuid')} - Updating {self.history_index_filename} with timestamp {timestamp}")
self.ensure_data_dir_exists()
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
# Binary data - detect file type and save without compression
if isinstance(contents, bytes):
try:
import puremagic
detections = puremagic.magic_string(contents[:2048])
ext = detections[0].extension if detections else 'bin'
# Strip leading dot if present (puremagic returns extensions like '.jfif')
ext = ext.lstrip('.')
if detections:
logger.trace(f"Detected file type: {detections[0].mime_type} -> extension: {ext}")
except Exception as e:
logger.warning(f"puremagic detection failed: {e}, using 'bin' extension")
ext = 'bin'
snapshot_fname = f"{snapshot_id}.{ext}"
dest = os.path.join(self.data_dir, snapshot_fname)
self._write_atomic(dest, contents)
logger.trace(f"Saved binary snapshot as {snapshot_fname} ({len(contents)} bytes)")
# Text data - use brotli compression if enabled and above threshold
else:
if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
# Compressed text
import brotli
snapshot_fname = f"{snapshot_id}.txt.br"
dest = os.path.join(self.data_dir, snapshot_fname)
if not os.path.exists(dest):
try:
actual_dest = _brotli_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
if actual_dest != dest:
snapshot_fname = os.path.basename(actual_dest)
except Exception as e:
logger.error(f"{self.get('uuid')} - Brotli compression failed: {e}")
# Fallback to uncompressed
snapshot_fname = f"{snapshot_id}.txt"
dest = os.path.join(self.data_dir, snapshot_fname)
self._write_atomic(dest, contents.encode('utf-8'))
else:
# Plain text
snapshot_fname = f"{snapshot_id}.txt"
dest = os.path.join(self.data_dir, snapshot_fname)
self._write_atomic(dest, contents.encode('utf-8'))
# Append to history.txt atomically
index_fname = os.path.join(self.data_dir, self.history_index_filename)
index_line = f"{timestamp},{snapshot_fname}\n"
with open(index_fname, 'a', encoding='utf-8') as f:
f.write(index_line)
f.flush()
os.fsync(f.fileno())
# Update internal state
self.__newest_history_key = timestamp
self.__history_n += 1
# MANUAL CHAIN RESOLUTION: Watch → Global
# With Pydantic, this would become: maxlen = watch.resolved_history_snapshot_max_length
# @computed_field def resolved_history_snapshot_max_length(self) -> Optional[int]:
# if self.history_snapshot_max_length: return self.history_snapshot_max_length
# if tag := self._get_override_tag(): return tag.history_snapshot_max_length
# return self._datastore.settings.history_snapshot_max_length
maxlen = self.get('history_snapshot_max_length') or self.get_global_setting('application', 'history_snapshot_max_length')
if maxlen and self.__history_n and self.__history_n > maxlen:
self.history_trim(newest_n_items=maxlen)
# @todo bump static cache of the last timestamp so we dont need to examine the file to set a proper ''viewed'' status
return snapshot_fname
@property
def has_empty_checktime(self):
# using all() + dictionary comprehension
# Check if all values are 0 in dictionary
res = all(x == None or x == False or x==0 for x in self.get('time_between_check', {}).values())
return res
def threshold_seconds(self):
seconds = 0
for m, n in mtable.items():
x = self.get('time_between_check', {}).get(m, None)
if x:
seconds += x * n
return seconds
# Iterate over all history texts and see if something new exists
# Always applying .strip() to start/end but optionally replace any other whitespace
def lines_contain_something_unique_compared_to_history(self, lines: list, ignore_whitespace=False):
local_lines = set([])
if lines:
if ignore_whitespace:
if isinstance(lines[0], str): # Can be either str or bytes depending on what was on the disk
local_lines = set([l.translate(TRANSLATE_WHITESPACE_TABLE).lower() for l in lines])
else:
local_lines = set([l.decode('utf-8').translate(TRANSLATE_WHITESPACE_TABLE).lower() for l in lines])
else:
if isinstance(lines[0], str): # Can be either str or bytes depending on what was on the disk
local_lines = set([l.strip().lower() for l in lines])
else:
local_lines = set([l.decode('utf-8').strip().lower() for l in lines])
# Compare each lines (set) against each history text file (set) looking for something new..
existing_history = set({})
for k, v in self.history.items():
content = self.get_history_snapshot(filepath=v)
if ignore_whitespace:
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
else:
alist = set([line.strip().lower() for line in content.splitlines()])
existing_history = existing_history.union(alist)
# Check that everything in local_lines(new stuff) already exists in existing_history - it should
# if not, something new happened
return not local_lines.issubset(existing_history)
def get_screenshot(self):
fname = os.path.join(self.data_dir, "last-screenshot.png")
if os.path.isfile(fname):
return fname
# False is not an option for AppRise, must be type None
return None
def favicon_is_expired(self):
favicon_fname = self.get_favicon_filename()
import glob
import time
if not favicon_fname:
return True
try:
fname = next(iter(glob.glob(os.path.join(self.data_dir, "favicon.*"))), None)
logger.trace(f"Favicon file maybe found at {fname}")
if os.path.isfile(fname):
file_age = int(time.time() - os.path.getmtime(fname))
logger.trace(f"Favicon file age is {file_age}s")
if file_age < FAVICON_RESAVE_THRESHOLD_SECONDS:
return False
except Exception as e:
logger.critical(f"Exception checking Favicon age {str(e)}")
return True
# Also in the case that the file didnt exist
return True
def bump_favicon(self, url, favicon_base_64: str) -> None:
from urllib.parse import urlparse
import base64
import binascii
decoded = None
if url:
try:
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
(base, extension) = filename.lower().strip().rsplit('.', 1)
except ValueError:
logger.error(f"UUID: {self.get('uuid')} Cant work out file extension from '{url}'")
return None
else:
# Assume favicon.ico
base = "favicon"
extension = "ico"
fname = os.path.join(self.data_dir, f"favicon.{extension}")
try:
# validate=True makes sure the string only contains valid base64 chars
decoded = base64.b64decode(favicon_base_64, validate=True)
except (binascii.Error, ValueError) as e:
logger.warning(f"UUID: {self.get('uuid')} FavIcon save data (Base64) corrupt? {str(e)}")
else:
if decoded:
try:
with open(fname, 'wb') as f:
f.write(decoded)
# Invalidate favicon filename cache
if hasattr(self, '_favicon_filename_cache'):
delattr(self, '_favicon_filename_cache')
# A signal that could trigger the socket server to update the browser also
watch_check_update = signal('watch_favicon_bump')
if watch_check_update:
watch_check_update.send(watch_uuid=self.get('uuid'))
except Exception as e:
logger.warning(f"UUID: {self.get('uuid')} error saving FavIcon to {fname} - {str(e)}")
# @todo - Store some checksum and only write when its different
logger.debug(f"UUID: {self.get('uuid')} updated favicon to at {fname}")
def get_favicon_filename(self) -> str | None:
"""
Find any favicon.* file in the current working directory
and return the contents of the newest one.
MEMORY LEAK FIX: Cache the result to avoid repeated glob.glob() operations.
glob.glob() causes millions of fnmatch allocations when called for every watch on page load.
Returns:
str: Basename of the newest favicon file, or None if not found.
"""
# Check cache first (prevents 26M+ allocations from repeated glob operations)
cache_key = '_favicon_filename_cache'
if hasattr(self, cache_key):
return getattr(self, cache_key)
import glob
# Search for all favicon.* files
files = glob.glob(os.path.join(self.data_dir, "favicon.*"))
if not files:
result = None
else:
# Find the newest by modification time
newest_file = max(files, key=os.path.getmtime)
result = os.path.basename(newest_file)
# Cache the result
setattr(self, cache_key, result)
return result
def get_screenshot_as_thumbnail(self, max_age=3200):
"""Return path to a square thumbnail of the most recent screenshot.
Creates a 150x150 pixel thumbnail from the top portion of the screenshot.
Args:
max_age: Maximum age in seconds before recreating thumbnail
Returns:
Path to thumbnail or None if no screenshot exists
"""
import os
import time
thumbnail_path = os.path.join(self.data_dir, "thumbnail.jpeg")
top_trim = 500 # Pixels from top of screenshot to use
screenshot_path = self.get_screenshot()
if not screenshot_path:
return None
# Reuse thumbnail if it's fresh and screenshot hasn't changed
if os.path.isfile(thumbnail_path):
thumbnail_mtime = os.path.getmtime(thumbnail_path)
screenshot_mtime = os.path.getmtime(screenshot_path)
if screenshot_mtime <= thumbnail_mtime and time.time() - thumbnail_mtime < max_age:
return thumbnail_path
try:
from PIL import Image
with Image.open(screenshot_path) as img:
# Crop top portion first (full width, top_trim height)
top_crop_height = min(top_trim, img.height)
img = img.crop((0, 0, img.width, top_crop_height))
# Create a smaller intermediate image (to reduce memory usage)
aspect = img.width / img.height
interim_width = min(top_trim, img.width)
interim_height = int(interim_width / aspect) if aspect > 0 else top_trim
img = img.resize((interim_width, interim_height), Image.NEAREST)
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
# Crop to square from top center
square_size = min(img.width, img.height)
left = (img.width - square_size) // 2
img = img.crop((left, 0, left + square_size, square_size))
# Final resize to exact thumbnail size with better filter
img = img.resize((350, 350), Image.BILINEAR)
# Save with optimized settings
img.save(thumbnail_path, "JPEG", quality=75, optimize=True)
return thumbnail_path
except Exception as e:
logger.error(f"Error creating thumbnail for {self.get('uuid')}: {str(e)}")
return None
def __get_file_ctime(self, filename):
fname = os.path.join(self.data_dir, filename)
if os.path.isfile(fname):
return int(os.path.getmtime(fname))
return False
@property
def error_text_ctime(self):
return self.__get_file_ctime('last-error.txt')
@property
def snapshot_text_ctime(self):
if self.history_n==0:
return False
timestamp = list(self.history.keys())[-1]
return int(timestamp)
@property
def snapshot_screenshot_ctime(self):
return self.__get_file_ctime('last-screenshot.png')
@property
def snapshot_error_screenshot_ctime(self):
return self.__get_file_ctime('last-error-screenshot.png')
def get_error_text(self):
"""Return the text saved from a previous request that resulted in a non-200 error"""
fname = os.path.join(self.data_dir, "last-error.txt")
if os.path.isfile(fname):
with open(fname, 'r', encoding='utf-8') as f:
return f.read()
return False
def get_error_snapshot(self):
"""Return path to the screenshot that resulted in a non-200 error"""
fname = os.path.join(self.data_dir, "last-error-screenshot.png")
if os.path.isfile(fname):
return fname
return False
def pause(self):
self['paused'] = True
def unpause(self):
self['paused'] = False
def toggle_pause(self):
self['paused'] ^= True
def mute(self):
self['notification_muted'] = True
def unmute(self):
self['notification_muted'] = False
def toggle_mute(self):
self['notification_muted'] ^= True
def _get_commit_data(self):
"""
Prepare watch data for commit.
Excludes processor_config_* keys (stored in separate files).
Normalizes browser_steps to empty list if no meaningful steps.
"""
import copy
# Get base snapshot with lock
lock = self._datastore.lock if self._datastore and hasattr(self._datastore, 'lock') else None
if lock:
with lock:
snapshot = dict(self)
else:
snapshot = dict(self)
# Exclude processor config keys (stored separately)
watch_dict = {k: copy.deepcopy(v) for k, v in snapshot.items() if not k.startswith('processor_config_')}
# Normalize browser_steps: if no meaningful steps, save as empty list
if not self.has_browser_steps:
watch_dict['browser_steps'] = []
return watch_dict
# _save_to_disk() method provided by EntityPersistenceMixin
# commit() method inherited from watch_base
def extra_notification_token_values(self):
# Used for providing extra tokens
# return {'widget': 555}
return {}
def extra_notification_token_placeholder_info(self):
# Used for providing extra tokens
# return [('widget', "Get widget amounts")]
return []
def extract_regex_from_all_history(self, regex):
import csv
import re
import datetime
csv_output_filename = False
csv_writer = False
f = None
# self.history will be keyed with the full path
for k, fname in self.history.items():
if os.path.isfile(fname):
if True:
contents = self.get_history_snapshot(timestamp=k)
res = re.findall(regex, contents, re.MULTILINE)
if res:
if not csv_writer:
# A file on the disk can be transferred much faster via flask than a string reply
csv_output_filename = f"report-{self.get('uuid')}.csv"
f = open(os.path.join(self.data_dir, csv_output_filename), 'w')
# @todo some headers in the future
#fieldnames = ['Epoch seconds', 'Date']
csv_writer = csv.writer(f,
delimiter=',',
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
#fieldnames=fieldnames
)
csv_writer.writerow(['Epoch seconds', 'Date'])
# csv_writer.writeheader()
date_str = datetime.datetime.fromtimestamp(int(k)).strftime('%Y-%m-%d %H:%M:%S')
for r in res:
row = [k, date_str]
if isinstance(r, str):
row.append(r)
else:
row+=r
csv_writer.writerow(row)
if f:
f.close()
return csv_output_filename
def has_special_diff_filter_options_set(self):
# All False - nothing would be done, so act like it's not processable
if not self.get('filter_text_added', True) and not self.get('filter_text_replaced', True) and not self.get('filter_text_removed', True):
return False
# Or one is set
if not self.get('filter_text_added', True) or not self.get('filter_text_replaced', True) or not self.get('filter_text_removed', True):
return True
# None is set
return False
def save_error_text(self, contents):
self.ensure_data_dir_exists()
target_path = os.path.join(self.data_dir, "last-error.txt")
with open(target_path, 'w', encoding='utf-8') as f:
f.write(contents)
def save_xpath_data(self, data, as_error=False):
import json
import zlib
if as_error:
target_path = os.path.join(str(self.data_dir), "elements-error.deflate")
else:
target_path = os.path.join(str(self.data_dir), "elements.deflate")
self.ensure_data_dir_exists()
with open(target_path, 'wb') as f:
if not isinstance(data, str):
f.write(zlib.compress(json.dumps(data).encode()))
else:
f.write(zlib.compress(data.encode()))
f.close()
# Save as PNG, PNG is larger but better for doing visual diff in the future
def save_screenshot(self, screenshot: bytes, as_error=False):
if as_error:
target_path = os.path.join(self.data_dir, "last-error-screenshot.png")
else:
target_path = os.path.join(self.data_dir, "last-screenshot.png")
self.ensure_data_dir_exists()
with open(target_path, 'wb') as f:
f.write(screenshot)
f.close()
def get_last_fetched_text_before_filters(self):
import brotli
filepath = os.path.join(self.data_dir, 'last-fetched.br')
if not os.path.isfile(filepath) or os.path.getsize(filepath) == 0:
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
dates = list(self.history.keys())
if len(dates):
return self.get_history_snapshot(timestamp=dates[-1])
else:
return ''
with open(filepath, 'rb') as f:
return(brotli.decompress(f.read()).decode('utf-8'))
def save_last_text_fetched_before_filters(self, contents):
import brotli
filepath = os.path.join(self.data_dir, 'last-fetched.br')
_brotli_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
def save_last_fetched_html(self, timestamp, contents):
self.ensure_data_dir_exists()
snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(self.data_dir, snapshot_fname)
_brotli_save(contents, filepath, mode=None, fallback_uncompressed=True)
self._prune_last_fetched_html_snapshots()
def get_fetched_html(self, timestamp):
import brotli
snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(self.data_dir, snapshot_fname)
if os.path.isfile(filepath):
with open(filepath, 'rb') as f:
return (brotli.decompress(f.read()).decode('utf-8'))
return False
def _prune_last_fetched_html_snapshots(self):
dates = list(self.history.keys())
dates.reverse()
for index, timestamp in enumerate(dates):
snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(self.data_dir, snapshot_fname)
# Keep only the first 2
if index > 1 and os.path.isfile(filepath):
os.remove(filepath)
@property
def get_browsersteps_available_screenshots(self):
"For knowing which screenshots are available to show the user in BrowserSteps UI"
available = []
for f in Path(self.data_dir).glob('step_before-*.jpeg'):
step_n=re.search(r'step_before-(\d+)', f.name)
if step_n:
available.append(step_n.group(1))
return available
def compile_error_texts(self, has_proxies=None):
"""Compile error texts for this watch.
Accepts has_proxies parameter to ensure it works even outside app context"""
from flask import url_for
from markupsafe import Markup
output = [] # Initialize as list since we're using append
last_error = self.get('last_error','')
try:
url_for('settings.settings_page')
except Exception as e:
has_app_context = False
else:
has_app_context = True
# has app+request context, we can use url_for()
if has_app_context:
if last_error:
last_error = safe_jinja.render_fully_escaped(last_error)
if '403' in last_error:
if has_proxies:
output.append(str(Markup(f"{last_error} - <a href=\"{url_for('settings.settings_page', uuid=self.get('uuid'))}\">Try other proxies/location</a>&nbsp;'")))
else:
output.append(str(Markup(f"{last_error} - <a href=\"{url_for('settings.settings_page', uuid=self.get('uuid'))}\">Try adding external proxies/locations</a>&nbsp;'")))
else:
output.append(str(Markup(last_error)))
if self.get('last_notification_error'):
txt = safe_jinja.render_fully_escaped(self.get('last_notification_error'))
result = f'<div class="notification-error"><a href="{url_for("settings.notification_logs")}">{txt}</a></div>'
output.append(result)
else:
# Lo_Fi version - no app context, cant rely on Jinja2 Markup
if last_error:
output.append(safe_jinja.render_fully_escaped(last_error))
if self.get('last_notification_error'):
output.append(safe_jinja.render_fully_escaped(self.get('last_notification_error')))
res = "\n".join(output)
return res