mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-15 00:43:36 -06:00
1220 lines
47 KiB
Python
1220 lines
47 KiB
Python
"""
|
|
Watch domain model for change detection monitoring.
|
|
|
|
ARCHITECTURE NOTE: Configuration Override Hierarchy
|
|
===================================================
|
|
|
|
This module implements Watch objects that inherit from dict (technical debt).
|
|
The dream architecture would use Pydantic for:
|
|
|
|
1. CHAIN RESOLUTION (Watch → Tag → Global Settings)
|
|
- Current: Manual resolution scattered across codebase
|
|
- Future: @computed_field properties with automatic resolution
|
|
- Examples: resolved_fetch_backend, resolved_restock_settings, etc.
|
|
|
|
2. DATABASE BACKEND ABSTRACTION
|
|
- Current: Domain model tightly coupled to file-based JSON storage
|
|
- Future: Domain model (Pydantic) separate from persistence layer
|
|
- Enables: Easy migration to PostgreSQL, MongoDB, etc.
|
|
|
|
3. TYPE SAFETY & VALIDATION
|
|
- Current: Dict access with no compile-time checks
|
|
- Future: Type hints, IDE autocomplete, validation at boundaries
|
|
|
|
See class model docstring for detailed explanation and examples.
|
|
See: processors/restock_diff/processor.py:184-192 for manual resolution example
|
|
"""
|
|
|
|
from blinker import signal
|
|
from changedetectionio.validate_url import is_safe_valid_url
|
|
|
|
from changedetectionio.strtobool import strtobool
|
|
from changedetectionio.jinja2_custom import render as jinja_render
|
|
from . import watch_base
|
|
from .persistence import EntityPersistenceMixin
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from loguru import logger
|
|
|
|
from .. import jinja2_custom as safe_jinja
|
|
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
|
|
|
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
|
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024*20))
|
|
|
|
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
|
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
|
|
|
def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
|
|
"""
|
|
Save compressed data using native brotli with streaming compression.
|
|
Uses chunked compression to minimize peak memory usage and malloc_trim()
|
|
to force release of C-level memory back to the OS.
|
|
|
|
Args:
|
|
contents: data to compress (str or bytes)
|
|
filepath: destination file path
|
|
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
|
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
|
|
|
|
Returns:
|
|
str: actual filepath saved (may differ from input if fallback used)
|
|
|
|
Raises:
|
|
Exception: if compression fails and fallback_uncompressed is False
|
|
"""
|
|
import brotli
|
|
import gc
|
|
import ctypes
|
|
|
|
# Ensure contents are bytes
|
|
if isinstance(contents, str):
|
|
contents = contents.encode('utf-8')
|
|
|
|
try:
|
|
original_size = len(contents)
|
|
logger.debug(f"Starting brotli streaming compression of {original_size} bytes.")
|
|
|
|
# Create streaming compressor
|
|
compressor = brotli.Compressor(quality=6, mode=mode if mode is not None else brotli.MODE_GENERIC)
|
|
|
|
# Stream compress in chunks to minimize memory usage
|
|
chunk_size = 65536 # 64KB chunks
|
|
total_compressed_size = 0
|
|
|
|
with open(filepath, 'wb') as f:
|
|
# Process data in chunks
|
|
offset = 0
|
|
while offset < len(contents):
|
|
chunk = contents[offset:offset + chunk_size]
|
|
compressed_chunk = compressor.process(chunk)
|
|
if compressed_chunk:
|
|
f.write(compressed_chunk)
|
|
total_compressed_size += len(compressed_chunk)
|
|
offset += chunk_size
|
|
|
|
# Finalize compression - critical for proper cleanup
|
|
final_chunk = compressor.finish()
|
|
if final_chunk:
|
|
f.write(final_chunk)
|
|
total_compressed_size += len(final_chunk)
|
|
|
|
logger.debug(f"Finished brotli compression - From {original_size} to {total_compressed_size} bytes.")
|
|
|
|
# Cleanup: Delete compressor, force Python GC, then force C-level memory release
|
|
del compressor
|
|
gc.collect()
|
|
|
|
# Force release of C-level memory back to OS (since brotli is a C library)
|
|
try:
|
|
ctypes.CDLL('libc.so.6').malloc_trim(0)
|
|
except Exception:
|
|
pass # malloc_trim not available on all systems (e.g., macOS)
|
|
|
|
return filepath
|
|
|
|
except Exception as e:
|
|
logger.error(f"Brotli compression error: {e}")
|
|
|
|
# Compression failed
|
|
if fallback_uncompressed:
|
|
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
|
|
fallback_path = filepath.replace('.br', '')
|
|
with open(fallback_path, 'wb') as f:
|
|
f.write(contents)
|
|
return fallback_path
|
|
else:
|
|
raise Exception(f"Brotli compression failed for {filepath}: {e}")
|
|
|
|
|
|
class model(EntityPersistenceMixin, watch_base):
|
|
"""
|
|
Watch domain model for monitoring URL changes.
|
|
|
|
Inherits from watch_base (which inherits dict) - see watch_base docstring for field documentation.
|
|
|
|
## Configuration Override Hierarchy (Chain Resolution)
|
|
|
|
The dream architecture uses a 3-level resolution chain:
|
|
Watch settings → Tag/Group settings → Global settings
|
|
|
|
Current implementation is MANUAL (see processor.py:184-192 for example):
|
|
- Processors manually check watch.get('field')
|
|
- Then loop through watch.tags to find first tag with overrides_watch=True
|
|
- Finally fall back to datastore['settings']['application']['field']
|
|
|
|
FUTURE: Pydantic-based chain resolution would enable:
|
|
|
|
```python
|
|
# Instead of manual resolution in every processor:
|
|
restock_settings = watch.get('restock_settings', {})
|
|
for tag_uuid in watch.get('tags'):
|
|
tag = datastore['settings']['application']['tags'][tag_uuid]
|
|
if tag.get('overrides_watch'):
|
|
restock_settings = tag.get('restock_settings', {})
|
|
break
|
|
|
|
# Clean computed properties with automatic resolution:
|
|
@computed_field
|
|
def resolved_restock_settings(self) -> dict:
|
|
if self.restock_settings:
|
|
return self.restock_settings
|
|
for tag_uuid in self.tags:
|
|
tag = self._datastore.get_tag(tag_uuid)
|
|
if tag.overrides_watch and tag.restock_settings:
|
|
return tag.restock_settings
|
|
return self._datastore.settings.restock_settings or {}
|
|
|
|
# Usage: watch.resolved_restock_settings (automatic, type-safe, tested once)
|
|
```
|
|
|
|
Benefits of Pydantic migration:
|
|
1. Single source of truth for resolution logic (not scattered across processors)
|
|
2. Type safety + IDE autocomplete (watch.resolved_fetch_backend vs dict navigation)
|
|
3. Database backend abstraction (domain model separate from persistence)
|
|
4. Automatic validation at boundaries
|
|
5. Self-documenting via type hints
|
|
6. Easy to test resolution independently
|
|
|
|
Resolution chain examples that would benefit:
|
|
- fetch_backend: watch → tag → global (see get_fetch_backend property)
|
|
- notification_urls: watch → tag → global
|
|
- time_between_check: watch → global (see threshold_seconds)
|
|
- restock_settings: watch → tag (see processors/restock_diff/processor.py:184-192)
|
|
- history_snapshot_max_length: watch → global (see save_history_blob:550-556)
|
|
- All processor_config_* settings could use tag overrides
|
|
|
|
## Database Backend Abstraction with Pydantic
|
|
|
|
Current: Watch inherits dict, tightly coupled to file-based JSON storage
|
|
Future: Domain model (Watch) separate from persistence layer
|
|
|
|
```python
|
|
# Domain model (database-agnostic)
|
|
class Watch(BaseModel):
|
|
uuid: str
|
|
url: str
|
|
# ... validation, business logic
|
|
|
|
# Pluggable backends
|
|
class DataStoreBackend(ABC):
|
|
def save_watch(self, watch: Watch): ...
|
|
def load_watch(self, uuid: str) -> Watch: ...
|
|
|
|
# Implementations: FileBackend, MongoBackend, PostgresBackend, etc.
|
|
```
|
|
|
|
This would enable:
|
|
- Easy migration between storage backends (file → postgres → mongodb)
|
|
- Pydantic handles serialization/deserialization automatically
|
|
- Domain logic stays clean (no storage concerns in Watch methods)
|
|
|
|
## Migration Path
|
|
|
|
Given existing codebase, incremental migration recommended:
|
|
1. Create Pydantic models alongside existing dict-based models
|
|
2. Add .to_pydantic() / .from_pydantic() bridge methods
|
|
3. Gradually migrate code to use Pydantic models
|
|
4. Remove dict inheritance once migration complete
|
|
|
|
See: watch_base docstring for technical debt discussion
|
|
See: processors/restock_diff/processor.py:184-192 for manual resolution example
|
|
See: Watch.py:550-556 for nested dict navigation that would become watch.resolved_*
|
|
"""
|
|
__newest_history_key = None
|
|
__history_n = 0
|
|
jitter_seconds = 0
|
|
|
|
def __init__(self, *arg, **kw):
|
|
# Validate __datastore before calling parent (Watch requires it)
|
|
if not kw.get('__datastore'):
|
|
raise ValueError("Watch object requires '__datastore' reference - cannot access global settings without it")
|
|
|
|
# Parent class (watch_base) handles __datastore and __datastore_path
|
|
super(model, self).__init__(*arg, **kw)
|
|
|
|
if kw.get('default'):
|
|
self.update(kw['default'])
|
|
del kw['default']
|
|
|
|
if self.get('default'):
|
|
del self['default']
|
|
|
|
# Be sure the cached timestamp is ready
|
|
bump = self.history
|
|
|
|
# Note: __deepcopy__, __getstate__, and __setstate__ are inherited from watch_base
|
|
# This prevents memory leaks by sharing __datastore reference instead of copying it
|
|
|
|
@property
|
|
def viewed(self):
|
|
# Don't return viewed when last_viewed is 0 and newest_key is 0
|
|
if int(self['last_viewed']) and int(self['last_viewed']) >= int(self.newest_history_key) :
|
|
return True
|
|
|
|
return False
|
|
|
|
@property
|
|
def has_unviewed(self):
|
|
return int(self.newest_history_key) > int(self['last_viewed']) and self.__history_n >= 2
|
|
|
|
@property
|
|
def link(self):
|
|
|
|
url = self.get('url', '')
|
|
if not is_safe_valid_url(url):
|
|
return 'DISABLED'
|
|
|
|
ready_url = url
|
|
if '{%' in url or '{{' in url:
|
|
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
|
|
try:
|
|
ready_url = jinja_render(template_str=url)
|
|
except Exception as e:
|
|
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
|
|
from flask import flash, url_for
|
|
from markupsafe import Markup
|
|
message = Markup('<a href="{}#general">The URL {} is invalid and cannot be used, click to edit</a>'.format(
|
|
url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', '')))
|
|
flash(message, 'error')
|
|
return ''
|
|
|
|
if ready_url.startswith('source:'):
|
|
ready_url=ready_url.replace('source:', '')
|
|
|
|
# Also double check it after any Jinja2 formatting just incase
|
|
if not is_safe_valid_url(ready_url):
|
|
return 'DISABLED'
|
|
return ready_url
|
|
|
|
@property
|
|
def domain_only_from_link(self):
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(self.link)
|
|
domain = parsed.hostname
|
|
return domain
|
|
|
|
@property
|
|
def history_index_filename(self):
|
|
# So that you dont try to view different histories in different 'diff' setups, can confuse cdio.
|
|
processor = self.get('processor')
|
|
if not processor or self.get('processor') == 'text_json_diff':
|
|
return 'history.txt'
|
|
else:
|
|
return f'history-{processor}.txt'
|
|
|
|
def clear_watch(self):
|
|
import pathlib
|
|
|
|
# Get list of processor config files to preserve
|
|
from changedetectionio.processors import find_processors
|
|
processor_names = [name for cls, name in find_processors()]
|
|
processor_config_files = {f"{name}.json" for name in processor_names}
|
|
|
|
# JSON Data, Screenshots, Textfiles (history index and snapshots), HTML in the future etc
|
|
# But preserve processor config files (they're configuration, not history data)
|
|
# Use glob not rglob here for safety.
|
|
for item in pathlib.Path(str(self.data_dir)).glob("*.*"):
|
|
# Skip processor config files
|
|
if item.name in processor_config_files:
|
|
continue
|
|
os.unlink(item)
|
|
|
|
# Force the attr to recalculate
|
|
bump = self.history
|
|
|
|
# Do this last because it will trigger a recheck due to last_checked being zero
|
|
self.update({
|
|
'browser_steps_last_error_step': None,
|
|
'check_count': 0,
|
|
'fetch_time': 0.0,
|
|
'has_ldjson_price_data': None,
|
|
'last_checked': 0,
|
|
'last_error': False,
|
|
'last_notification_error': False,
|
|
'last_viewed': 0,
|
|
'previous_md5': False,
|
|
'previous_md5_before_filters': False,
|
|
'remote_server_reply': None,
|
|
'track_ldjson_price_data': None
|
|
})
|
|
watch_check_update = signal('watch_check_update')
|
|
if watch_check_update:
|
|
watch_check_update.send(watch_uuid=self.get('uuid'))
|
|
|
|
return
|
|
|
|
@property
|
|
def is_source_type_url(self):
|
|
return self.get('url', '').startswith('source:')
|
|
|
|
@property
|
|
def get_fetch_backend(self):
|
|
"""
|
|
Get the fetch backend for this watch with special case handling.
|
|
|
|
CHAIN RESOLUTION OPPORTUNITY:
|
|
Currently returns watch.fetch_backend directly, but doesn't implement
|
|
Watch → Tag → Global resolution chain. With Pydantic:
|
|
|
|
@computed_field
|
|
def resolved_fetch_backend(self) -> str:
|
|
# Special case: PDFs always use html_requests
|
|
if self.is_pdf:
|
|
return 'html_requests'
|
|
|
|
# Watch override
|
|
if self.fetch_backend and self.fetch_backend != 'system':
|
|
return self.fetch_backend
|
|
|
|
# Tag override (first tag with overrides_watch=True wins)
|
|
for tag_uuid in self.tags:
|
|
tag = self._datastore.get_tag(tag_uuid)
|
|
if tag.overrides_watch and tag.fetch_backend:
|
|
return tag.fetch_backend
|
|
|
|
# Global default
|
|
return self._datastore.settings.fetch_backend
|
|
"""
|
|
# Maybe also if is_image etc?
|
|
# This is because chrome/playwright wont render the PDF in the browser and we will just fetch it and use pdf2html to see the text.
|
|
if self.is_pdf:
|
|
return 'html_requests'
|
|
|
|
return self.get('fetch_backend')
|
|
|
|
@property
|
|
def is_pdf(self):
|
|
# content_type field is set in the future
|
|
# https://github.com/dgtlmoon/changedetection.io/issues/1392
|
|
# Not sure the best logic here
|
|
return self.get('url', '').lower().endswith('.pdf') or 'pdf' in self.get('content_type', '').lower()
|
|
|
|
@property
|
|
def label(self):
|
|
# Used for sorting, display, etc
|
|
return self.get('title') or self.get('page_title') or self.link
|
|
|
|
@property
|
|
def last_changed(self):
|
|
# last_changed will be the newest snapshot, but when we have just one snapshot, it should be 0
|
|
if self.__history_n <= 1:
|
|
return 0
|
|
if self.__newest_history_key:
|
|
return int(self.__newest_history_key)
|
|
return 0
|
|
|
|
@property
|
|
def history_n(self):
|
|
return self.__history_n
|
|
|
|
@property
|
|
def history(self):
|
|
"""History index is just a text file as a list
|
|
{watch-uuid}/history.txt
|
|
|
|
contains a list like
|
|
|
|
{epoch-time},{filename}\n
|
|
|
|
We read in this list as the history information
|
|
|
|
"""
|
|
tmp_history = {}
|
|
|
|
# In the case we are only using the watch for processing without history
|
|
if not self.data_dir:
|
|
return []
|
|
|
|
# Read the history file as a dict
|
|
fname = os.path.join(self.data_dir, self.history_index_filename)
|
|
if os.path.isfile(fname):
|
|
logger.debug(f"Reading watch history index for {self.get('uuid')}")
|
|
with open(fname, "r", encoding='utf-8') as f:
|
|
for i in f.readlines():
|
|
if ',' in i:
|
|
k, v = i.strip().split(',', 2)
|
|
|
|
# The index history could contain a relative path, so we need to make the fullpath
|
|
# so that python can read it
|
|
# Cross-platform: check for any path separator (works on Windows and Unix)
|
|
if os.sep not in v and '/' not in v and '\\' not in v:
|
|
# Relative filename only, no path separators
|
|
v = os.path.join(self.data_dir, v)
|
|
else:
|
|
# It's possible that they moved the datadir on older versions
|
|
# So the snapshot exists but is in a different path
|
|
# Cross-platform: use os.path.basename instead of split('/')
|
|
snapshot_fname = os.path.basename(v)
|
|
proposed_new_path = os.path.join(self.data_dir, snapshot_fname)
|
|
if not os.path.exists(v) and os.path.exists(proposed_new_path):
|
|
v = proposed_new_path
|
|
|
|
tmp_history[k] = v
|
|
|
|
if len(tmp_history):
|
|
self.__newest_history_key = list(tmp_history.keys())[-1]
|
|
else:
|
|
self.__newest_history_key = None
|
|
|
|
self.__history_n = len(tmp_history)
|
|
|
|
return tmp_history
|
|
|
|
@property
|
|
def has_history(self):
|
|
fname = os.path.join(self.data_dir, self.history_index_filename)
|
|
return os.path.isfile(fname)
|
|
|
|
@property
|
|
def has_browser_steps(self):
|
|
has_browser_steps = self.get('browser_steps') and list(filter(
|
|
lambda s: (s['operation'] and len(s['operation']) and s['operation'] != 'Choose one' and s['operation'] != 'Goto site'),
|
|
self.get('browser_steps')))
|
|
|
|
return has_browser_steps
|
|
|
|
@property
|
|
def has_restock_info(self):
|
|
if self.get('restock') and self['restock'].get('in_stock') != None:
|
|
return True
|
|
|
|
return False
|
|
|
|
# Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0.
|
|
@property
|
|
def newest_history_key(self):
|
|
if self.__newest_history_key is not None:
|
|
return self.__newest_history_key
|
|
|
|
if len(self.history) <= 1:
|
|
return 0
|
|
|
|
|
|
bump = self.history
|
|
return self.__newest_history_key
|
|
|
|
# Given an arbitrary timestamp, find the best history key for the [diff] button so it can preset a smarter from_version
|
|
@property
|
|
def get_from_version_based_on_last_viewed(self):
|
|
|
|
"""Unfortunately for now timestamp is stored as string key"""
|
|
keys = list(self.history.keys())
|
|
if not keys:
|
|
return None
|
|
if len(keys) == 1:
|
|
return keys[0]
|
|
|
|
last_viewed = int(self.get('last_viewed'))
|
|
sorted_keys = sorted(keys, key=lambda x: int(x))
|
|
sorted_keys.reverse()
|
|
|
|
# When the 'last viewed' timestamp is greater than or equal the newest snapshot, return second newest
|
|
if last_viewed >= int(sorted_keys[0]):
|
|
return sorted_keys[1]
|
|
|
|
# When the 'last viewed' timestamp is between snapshots, return the older snapshot
|
|
for newer, older in list(zip(sorted_keys[0:], sorted_keys[1:])):
|
|
if last_viewed < int(newer) and last_viewed >= int(older):
|
|
return older
|
|
|
|
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
|
|
return sorted_keys[-1]
|
|
|
|
def get_history_snapshot(self, timestamp=None, filepath=None):
|
|
"""
|
|
Accepts either timestamp or filepath
|
|
:param timestamp:
|
|
:param filepath:
|
|
:return:
|
|
"""
|
|
import brotli
|
|
|
|
if not filepath:
|
|
filepath = self.history[timestamp]
|
|
|
|
# Check if binary file (image, PDF, etc.)
|
|
# Binary files are NEVER saved with .br compression, only text files are
|
|
binary_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.pdf', '.bin', '.jfif')
|
|
is_binary = any(filepath.endswith(ext) for ext in binary_extensions)
|
|
|
|
# Only look for .br versions for text files
|
|
if not is_binary:
|
|
# See if a brotli version exists and switch to that (text files only)
|
|
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
|
|
filepath = f"{filepath}.br"
|
|
|
|
# OR in the backup case that the .br does not exist, but the plain one does
|
|
if filepath.endswith('.br') and not os.path.isfile(filepath):
|
|
if os.path.isfile(filepath.replace('.br', '')):
|
|
filepath = filepath.replace('.br', '')
|
|
|
|
# Handle .br compressed text files
|
|
if filepath.endswith('.br'):
|
|
# Brotli doesnt have a fileheader to detect it, so we rely on filename
|
|
# https://www.rfc-editor.org/rfc/rfc7932
|
|
# Note: .br should ONLY exist for text files, never binary
|
|
with open(filepath, 'rb') as f:
|
|
return brotli.decompress(f.read()).decode('utf-8')
|
|
|
|
# Binary file - return raw bytes
|
|
if is_binary:
|
|
with open(filepath, 'rb') as f:
|
|
return f.read()
|
|
|
|
# Text file - decode to string
|
|
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
|
|
return f.read()
|
|
|
|
def _write_atomic(self, dest, data, mode='wb'):
|
|
"""Write data atomically to dest using a temp file"""
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(mode, delete=False, dir=self.data_dir) as tmp:
|
|
tmp.write(data)
|
|
tmp.flush()
|
|
os.fsync(tmp.fileno())
|
|
tmp_path = tmp.name
|
|
os.replace(tmp_path, dest)
|
|
|
|
def history_trim(self, newest_n_items):
|
|
from pathlib import Path
|
|
import gc
|
|
# Sort by timestamp (key)
|
|
sorted_items = sorted(self.history.items(), key=lambda x: int(x[0]))
|
|
|
|
keep_part = dict(sorted_items[-newest_n_items:])
|
|
delete_part = dict(sorted_items[:-newest_n_items])
|
|
logger.info( f"[{self.get('uuid')}] Trimming history to most recent {newest_n_items} items, keeping {len(keep_part)} items deleting {len(delete_part)} items.")
|
|
|
|
if delete_part:
|
|
for item in delete_part.items():
|
|
try:
|
|
Path(item[1]).unlink(missing_ok=True)
|
|
except Exception as e:
|
|
logger.critical(f"{str(e)}")
|
|
finally:
|
|
logger.debug(f"[{self.get('uuid')}] Deleted {item[1]} history snapshot")
|
|
try:
|
|
dest = os.path.join(self.data_dir, self.history_index_filename)
|
|
output = "\r\n".join(
|
|
f"{k},{Path(v).name}"
|
|
for k, v in keep_part.items()
|
|
)+"\r\n"
|
|
self._write_atomic(dest=dest, data=output, mode='w')
|
|
except Exception as e:
|
|
logger.critical(f"{str(e)}")
|
|
finally:
|
|
logger.debug(f"[{self.get('uuid')}] Updated history index {dest}")
|
|
|
|
# reimport
|
|
bump = self.history
|
|
gc.collect()
|
|
|
|
# Save some text file to the appropriate path and bump the history
|
|
# result_obj from fetch_site_status.run()
|
|
def save_history_blob(self, contents, timestamp, snapshot_id):
|
|
|
|
logger.trace(f"{self.get('uuid')} - Updating {self.history_index_filename} with timestamp {timestamp}")
|
|
|
|
self.ensure_data_dir_exists()
|
|
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
|
|
|
# Binary data - detect file type and save without compression
|
|
if isinstance(contents, bytes):
|
|
try:
|
|
import puremagic
|
|
detections = puremagic.magic_string(contents[:2048])
|
|
ext = detections[0].extension if detections else 'bin'
|
|
# Strip leading dot if present (puremagic returns extensions like '.jfif')
|
|
ext = ext.lstrip('.')
|
|
if detections:
|
|
logger.trace(f"Detected file type: {detections[0].mime_type} -> extension: {ext}")
|
|
except Exception as e:
|
|
logger.warning(f"puremagic detection failed: {e}, using 'bin' extension")
|
|
ext = 'bin'
|
|
|
|
snapshot_fname = f"{snapshot_id}.{ext}"
|
|
dest = os.path.join(self.data_dir, snapshot_fname)
|
|
self._write_atomic(dest, contents)
|
|
logger.trace(f"Saved binary snapshot as {snapshot_fname} ({len(contents)} bytes)")
|
|
|
|
# Text data - use brotli compression if enabled and above threshold
|
|
else:
|
|
if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
|
|
# Compressed text
|
|
import brotli
|
|
snapshot_fname = f"{snapshot_id}.txt.br"
|
|
dest = os.path.join(self.data_dir, snapshot_fname)
|
|
|
|
if not os.path.exists(dest):
|
|
try:
|
|
actual_dest = _brotli_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
|
|
if actual_dest != dest:
|
|
snapshot_fname = os.path.basename(actual_dest)
|
|
except Exception as e:
|
|
logger.error(f"{self.get('uuid')} - Brotli compression failed: {e}")
|
|
# Fallback to uncompressed
|
|
snapshot_fname = f"{snapshot_id}.txt"
|
|
dest = os.path.join(self.data_dir, snapshot_fname)
|
|
self._write_atomic(dest, contents.encode('utf-8'))
|
|
else:
|
|
# Plain text
|
|
snapshot_fname = f"{snapshot_id}.txt"
|
|
dest = os.path.join(self.data_dir, snapshot_fname)
|
|
self._write_atomic(dest, contents.encode('utf-8'))
|
|
|
|
# Append to history.txt atomically
|
|
index_fname = os.path.join(self.data_dir, self.history_index_filename)
|
|
index_line = f"{timestamp},{snapshot_fname}\n"
|
|
|
|
with open(index_fname, 'a', encoding='utf-8') as f:
|
|
f.write(index_line)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
|
|
# Update internal state
|
|
self.__newest_history_key = timestamp
|
|
self.__history_n += 1
|
|
|
|
# MANUAL CHAIN RESOLUTION: Watch → Global
|
|
# With Pydantic, this would become: maxlen = watch.resolved_history_snapshot_max_length
|
|
# @computed_field def resolved_history_snapshot_max_length(self) -> Optional[int]:
|
|
# if self.history_snapshot_max_length: return self.history_snapshot_max_length
|
|
# if tag := self._get_override_tag(): return tag.history_snapshot_max_length
|
|
# return self._datastore.settings.history_snapshot_max_length
|
|
maxlen = self.get('history_snapshot_max_length') or self.get_global_setting('application', 'history_snapshot_max_length')
|
|
|
|
if maxlen and self.__history_n and self.__history_n > maxlen:
|
|
self.history_trim(newest_n_items=maxlen)
|
|
|
|
# @todo bump static cache of the last timestamp so we dont need to examine the file to set a proper ''viewed'' status
|
|
return snapshot_fname
|
|
|
|
@property
|
|
def has_empty_checktime(self):
|
|
# using all() + dictionary comprehension
|
|
# Check if all values are 0 in dictionary
|
|
res = all(x == None or x == False or x==0 for x in self.get('time_between_check', {}).values())
|
|
return res
|
|
|
|
def threshold_seconds(self):
|
|
seconds = 0
|
|
for m, n in mtable.items():
|
|
x = self.get('time_between_check', {}).get(m, None)
|
|
if x:
|
|
seconds += x * n
|
|
return seconds
|
|
|
|
# Iterate over all history texts and see if something new exists
|
|
# Always applying .strip() to start/end but optionally replace any other whitespace
|
|
def lines_contain_something_unique_compared_to_history(self, lines: list, ignore_whitespace=False):
|
|
local_lines = set([])
|
|
if lines:
|
|
if ignore_whitespace:
|
|
if isinstance(lines[0], str): # Can be either str or bytes depending on what was on the disk
|
|
local_lines = set([l.translate(TRANSLATE_WHITESPACE_TABLE).lower() for l in lines])
|
|
else:
|
|
local_lines = set([l.decode('utf-8').translate(TRANSLATE_WHITESPACE_TABLE).lower() for l in lines])
|
|
else:
|
|
if isinstance(lines[0], str): # Can be either str or bytes depending on what was on the disk
|
|
local_lines = set([l.strip().lower() for l in lines])
|
|
else:
|
|
local_lines = set([l.decode('utf-8').strip().lower() for l in lines])
|
|
|
|
|
|
# Compare each lines (set) against each history text file (set) looking for something new..
|
|
existing_history = set({})
|
|
for k, v in self.history.items():
|
|
content = self.get_history_snapshot(filepath=v)
|
|
|
|
if ignore_whitespace:
|
|
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
|
|
else:
|
|
alist = set([line.strip().lower() for line in content.splitlines()])
|
|
|
|
existing_history = existing_history.union(alist)
|
|
|
|
# Check that everything in local_lines(new stuff) already exists in existing_history - it should
|
|
# if not, something new happened
|
|
return not local_lines.issubset(existing_history)
|
|
|
|
def get_screenshot(self):
|
|
fname = os.path.join(self.data_dir, "last-screenshot.png")
|
|
if os.path.isfile(fname):
|
|
return fname
|
|
|
|
# False is not an option for AppRise, must be type None
|
|
return None
|
|
|
|
def favicon_is_expired(self):
|
|
favicon_fname = self.get_favicon_filename()
|
|
import glob
|
|
import time
|
|
|
|
if not favicon_fname:
|
|
return True
|
|
try:
|
|
fname = next(iter(glob.glob(os.path.join(self.data_dir, "favicon.*"))), None)
|
|
logger.trace(f"Favicon file maybe found at {fname}")
|
|
if os.path.isfile(fname):
|
|
file_age = int(time.time() - os.path.getmtime(fname))
|
|
logger.trace(f"Favicon file age is {file_age}s")
|
|
if file_age < FAVICON_RESAVE_THRESHOLD_SECONDS:
|
|
return False
|
|
except Exception as e:
|
|
logger.critical(f"Exception checking Favicon age {str(e)}")
|
|
return True
|
|
|
|
# Also in the case that the file didnt exist
|
|
return True
|
|
|
|
def bump_favicon(self, url, favicon_base_64: str) -> None:
|
|
from urllib.parse import urlparse
|
|
import base64
|
|
import binascii
|
|
decoded = None
|
|
|
|
if url:
|
|
try:
|
|
parsed = urlparse(url)
|
|
filename = os.path.basename(parsed.path)
|
|
(base, extension) = filename.lower().strip().rsplit('.', 1)
|
|
except ValueError:
|
|
logger.error(f"UUID: {self.get('uuid')} Cant work out file extension from '{url}'")
|
|
return None
|
|
else:
|
|
# Assume favicon.ico
|
|
base = "favicon"
|
|
extension = "ico"
|
|
|
|
fname = os.path.join(self.data_dir, f"favicon.{extension}")
|
|
|
|
try:
|
|
# validate=True makes sure the string only contains valid base64 chars
|
|
decoded = base64.b64decode(favicon_base_64, validate=True)
|
|
except (binascii.Error, ValueError) as e:
|
|
logger.warning(f"UUID: {self.get('uuid')} FavIcon save data (Base64) corrupt? {str(e)}")
|
|
else:
|
|
if decoded:
|
|
try:
|
|
with open(fname, 'wb') as f:
|
|
f.write(decoded)
|
|
|
|
# Invalidate favicon filename cache
|
|
if hasattr(self, '_favicon_filename_cache'):
|
|
delattr(self, '_favicon_filename_cache')
|
|
|
|
# A signal that could trigger the socket server to update the browser also
|
|
watch_check_update = signal('watch_favicon_bump')
|
|
if watch_check_update:
|
|
watch_check_update.send(watch_uuid=self.get('uuid'))
|
|
|
|
except Exception as e:
|
|
logger.warning(f"UUID: {self.get('uuid')} error saving FavIcon to {fname} - {str(e)}")
|
|
|
|
# @todo - Store some checksum and only write when its different
|
|
logger.debug(f"UUID: {self.get('uuid')} updated favicon to at {fname}")
|
|
|
|
def get_favicon_filename(self) -> str | None:
|
|
"""
|
|
Find any favicon.* file in the current working directory
|
|
and return the contents of the newest one.
|
|
|
|
MEMORY LEAK FIX: Cache the result to avoid repeated glob.glob() operations.
|
|
glob.glob() causes millions of fnmatch allocations when called for every watch on page load.
|
|
|
|
Returns:
|
|
str: Basename of the newest favicon file, or None if not found.
|
|
"""
|
|
# Check cache first (prevents 26M+ allocations from repeated glob operations)
|
|
cache_key = '_favicon_filename_cache'
|
|
if hasattr(self, cache_key):
|
|
return getattr(self, cache_key)
|
|
|
|
import glob
|
|
|
|
# Search for all favicon.* files
|
|
files = glob.glob(os.path.join(self.data_dir, "favicon.*"))
|
|
|
|
if not files:
|
|
result = None
|
|
else:
|
|
# Find the newest by modification time
|
|
newest_file = max(files, key=os.path.getmtime)
|
|
result = os.path.basename(newest_file)
|
|
|
|
# Cache the result
|
|
setattr(self, cache_key, result)
|
|
return result
|
|
|
|
def get_screenshot_as_thumbnail(self, max_age=3200):
|
|
"""Return path to a square thumbnail of the most recent screenshot.
|
|
|
|
Creates a 150x150 pixel thumbnail from the top portion of the screenshot.
|
|
|
|
Args:
|
|
max_age: Maximum age in seconds before recreating thumbnail
|
|
|
|
Returns:
|
|
Path to thumbnail or None if no screenshot exists
|
|
"""
|
|
import os
|
|
import time
|
|
|
|
thumbnail_path = os.path.join(self.data_dir, "thumbnail.jpeg")
|
|
top_trim = 500 # Pixels from top of screenshot to use
|
|
|
|
screenshot_path = self.get_screenshot()
|
|
if not screenshot_path:
|
|
return None
|
|
|
|
# Reuse thumbnail if it's fresh and screenshot hasn't changed
|
|
if os.path.isfile(thumbnail_path):
|
|
thumbnail_mtime = os.path.getmtime(thumbnail_path)
|
|
screenshot_mtime = os.path.getmtime(screenshot_path)
|
|
|
|
if screenshot_mtime <= thumbnail_mtime and time.time() - thumbnail_mtime < max_age:
|
|
return thumbnail_path
|
|
|
|
try:
|
|
from PIL import Image
|
|
|
|
with Image.open(screenshot_path) as img:
|
|
# Crop top portion first (full width, top_trim height)
|
|
top_crop_height = min(top_trim, img.height)
|
|
img = img.crop((0, 0, img.width, top_crop_height))
|
|
|
|
# Create a smaller intermediate image (to reduce memory usage)
|
|
aspect = img.width / img.height
|
|
interim_width = min(top_trim, img.width)
|
|
interim_height = int(interim_width / aspect) if aspect > 0 else top_trim
|
|
img = img.resize((interim_width, interim_height), Image.NEAREST)
|
|
|
|
# Convert to RGB if needed
|
|
if img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
|
|
# Crop to square from top center
|
|
square_size = min(img.width, img.height)
|
|
left = (img.width - square_size) // 2
|
|
img = img.crop((left, 0, left + square_size, square_size))
|
|
|
|
# Final resize to exact thumbnail size with better filter
|
|
img = img.resize((350, 350), Image.BILINEAR)
|
|
|
|
# Save with optimized settings
|
|
img.save(thumbnail_path, "JPEG", quality=75, optimize=True)
|
|
|
|
return thumbnail_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating thumbnail for {self.get('uuid')}: {str(e)}")
|
|
return None
|
|
|
|
def __get_file_ctime(self, filename):
|
|
fname = os.path.join(self.data_dir, filename)
|
|
if os.path.isfile(fname):
|
|
return int(os.path.getmtime(fname))
|
|
return False
|
|
|
|
@property
|
|
def error_text_ctime(self):
|
|
return self.__get_file_ctime('last-error.txt')
|
|
|
|
@property
|
|
def snapshot_text_ctime(self):
|
|
if self.history_n==0:
|
|
return False
|
|
|
|
timestamp = list(self.history.keys())[-1]
|
|
return int(timestamp)
|
|
|
|
@property
|
|
def snapshot_screenshot_ctime(self):
|
|
return self.__get_file_ctime('last-screenshot.png')
|
|
|
|
@property
|
|
def snapshot_error_screenshot_ctime(self):
|
|
return self.__get_file_ctime('last-error-screenshot.png')
|
|
|
|
def get_error_text(self):
|
|
"""Return the text saved from a previous request that resulted in a non-200 error"""
|
|
fname = os.path.join(self.data_dir, "last-error.txt")
|
|
if os.path.isfile(fname):
|
|
with open(fname, 'r', encoding='utf-8') as f:
|
|
return f.read()
|
|
return False
|
|
|
|
def get_error_snapshot(self):
|
|
"""Return path to the screenshot that resulted in a non-200 error"""
|
|
fname = os.path.join(self.data_dir, "last-error-screenshot.png")
|
|
if os.path.isfile(fname):
|
|
return fname
|
|
return False
|
|
|
|
|
|
def pause(self):
|
|
self['paused'] = True
|
|
|
|
def unpause(self):
|
|
self['paused'] = False
|
|
|
|
def toggle_pause(self):
|
|
self['paused'] ^= True
|
|
|
|
def mute(self):
|
|
self['notification_muted'] = True
|
|
|
|
def unmute(self):
|
|
self['notification_muted'] = False
|
|
|
|
def toggle_mute(self):
|
|
self['notification_muted'] ^= True
|
|
|
|
def _get_commit_data(self):
|
|
"""
|
|
Prepare watch data for commit.
|
|
|
|
Excludes processor_config_* keys (stored in separate files).
|
|
Normalizes browser_steps to empty list if no meaningful steps.
|
|
"""
|
|
import copy
|
|
|
|
# Get base snapshot with lock
|
|
lock = self._datastore.lock if self._datastore and hasattr(self._datastore, 'lock') else None
|
|
|
|
if lock:
|
|
with lock:
|
|
snapshot = dict(self)
|
|
else:
|
|
snapshot = dict(self)
|
|
|
|
# Exclude processor config keys (stored separately)
|
|
watch_dict = {k: copy.deepcopy(v) for k, v in snapshot.items() if not k.startswith('processor_config_')}
|
|
|
|
# Normalize browser_steps: if no meaningful steps, save as empty list
|
|
if not self.has_browser_steps:
|
|
watch_dict['browser_steps'] = []
|
|
|
|
return watch_dict
|
|
|
|
# _save_to_disk() method provided by EntityPersistenceMixin
|
|
# commit() method inherited from watch_base
|
|
|
|
|
|
def extra_notification_token_values(self):
|
|
# Used for providing extra tokens
|
|
# return {'widget': 555}
|
|
return {}
|
|
|
|
def extra_notification_token_placeholder_info(self):
|
|
# Used for providing extra tokens
|
|
# return [('widget', "Get widget amounts")]
|
|
return []
|
|
|
|
|
|
def extract_regex_from_all_history(self, regex):
|
|
import csv
|
|
import re
|
|
import datetime
|
|
csv_output_filename = False
|
|
csv_writer = False
|
|
f = None
|
|
|
|
# self.history will be keyed with the full path
|
|
for k, fname in self.history.items():
|
|
if os.path.isfile(fname):
|
|
if True:
|
|
contents = self.get_history_snapshot(timestamp=k)
|
|
res = re.findall(regex, contents, re.MULTILINE)
|
|
if res:
|
|
if not csv_writer:
|
|
# A file on the disk can be transferred much faster via flask than a string reply
|
|
csv_output_filename = f"report-{self.get('uuid')}.csv"
|
|
f = open(os.path.join(self.data_dir, csv_output_filename), 'w')
|
|
# @todo some headers in the future
|
|
#fieldnames = ['Epoch seconds', 'Date']
|
|
csv_writer = csv.writer(f,
|
|
delimiter=',',
|
|
quotechar='"',
|
|
quoting=csv.QUOTE_MINIMAL,
|
|
#fieldnames=fieldnames
|
|
)
|
|
csv_writer.writerow(['Epoch seconds', 'Date'])
|
|
# csv_writer.writeheader()
|
|
|
|
date_str = datetime.datetime.fromtimestamp(int(k)).strftime('%Y-%m-%d %H:%M:%S')
|
|
for r in res:
|
|
row = [k, date_str]
|
|
if isinstance(r, str):
|
|
row.append(r)
|
|
else:
|
|
row+=r
|
|
csv_writer.writerow(row)
|
|
|
|
if f:
|
|
f.close()
|
|
|
|
return csv_output_filename
|
|
|
|
|
|
def has_special_diff_filter_options_set(self):
|
|
|
|
# All False - nothing would be done, so act like it's not processable
|
|
if not self.get('filter_text_added', True) and not self.get('filter_text_replaced', True) and not self.get('filter_text_removed', True):
|
|
return False
|
|
|
|
# Or one is set
|
|
if not self.get('filter_text_added', True) or not self.get('filter_text_replaced', True) or not self.get('filter_text_removed', True):
|
|
return True
|
|
|
|
# None is set
|
|
return False
|
|
|
|
def save_error_text(self, contents):
|
|
self.ensure_data_dir_exists()
|
|
target_path = os.path.join(self.data_dir, "last-error.txt")
|
|
with open(target_path, 'w', encoding='utf-8') as f:
|
|
f.write(contents)
|
|
|
|
def save_xpath_data(self, data, as_error=False):
|
|
import json
|
|
import zlib
|
|
|
|
if as_error:
|
|
target_path = os.path.join(str(self.data_dir), "elements-error.deflate")
|
|
else:
|
|
target_path = os.path.join(str(self.data_dir), "elements.deflate")
|
|
|
|
self.ensure_data_dir_exists()
|
|
|
|
with open(target_path, 'wb') as f:
|
|
if not isinstance(data, str):
|
|
f.write(zlib.compress(json.dumps(data).encode()))
|
|
else:
|
|
f.write(zlib.compress(data.encode()))
|
|
f.close()
|
|
|
|
# Save as PNG, PNG is larger but better for doing visual diff in the future
|
|
def save_screenshot(self, screenshot: bytes, as_error=False):
|
|
|
|
if as_error:
|
|
target_path = os.path.join(self.data_dir, "last-error-screenshot.png")
|
|
else:
|
|
target_path = os.path.join(self.data_dir, "last-screenshot.png")
|
|
|
|
self.ensure_data_dir_exists()
|
|
|
|
with open(target_path, 'wb') as f:
|
|
f.write(screenshot)
|
|
f.close()
|
|
|
|
|
|
def get_last_fetched_text_before_filters(self):
|
|
import brotli
|
|
filepath = os.path.join(self.data_dir, 'last-fetched.br')
|
|
|
|
if not os.path.isfile(filepath) or os.path.getsize(filepath) == 0:
|
|
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
|
dates = list(self.history.keys())
|
|
if len(dates):
|
|
return self.get_history_snapshot(timestamp=dates[-1])
|
|
else:
|
|
return ''
|
|
|
|
with open(filepath, 'rb') as f:
|
|
return(brotli.decompress(f.read()).decode('utf-8'))
|
|
|
|
def save_last_text_fetched_before_filters(self, contents):
|
|
import brotli
|
|
filepath = os.path.join(self.data_dir, 'last-fetched.br')
|
|
_brotli_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
|
|
|
|
def save_last_fetched_html(self, timestamp, contents):
|
|
self.ensure_data_dir_exists()
|
|
snapshot_fname = f"{timestamp}.html.br"
|
|
filepath = os.path.join(self.data_dir, snapshot_fname)
|
|
_brotli_save(contents, filepath, mode=None, fallback_uncompressed=True)
|
|
self._prune_last_fetched_html_snapshots()
|
|
|
|
def get_fetched_html(self, timestamp):
|
|
import brotli
|
|
|
|
snapshot_fname = f"{timestamp}.html.br"
|
|
filepath = os.path.join(self.data_dir, snapshot_fname)
|
|
if os.path.isfile(filepath):
|
|
with open(filepath, 'rb') as f:
|
|
return (brotli.decompress(f.read()).decode('utf-8'))
|
|
|
|
return False
|
|
|
|
|
|
def _prune_last_fetched_html_snapshots(self):
|
|
|
|
dates = list(self.history.keys())
|
|
dates.reverse()
|
|
|
|
for index, timestamp in enumerate(dates):
|
|
snapshot_fname = f"{timestamp}.html.br"
|
|
filepath = os.path.join(self.data_dir, snapshot_fname)
|
|
|
|
# Keep only the first 2
|
|
if index > 1 and os.path.isfile(filepath):
|
|
os.remove(filepath)
|
|
|
|
|
|
@property
|
|
def get_browsersteps_available_screenshots(self):
|
|
"For knowing which screenshots are available to show the user in BrowserSteps UI"
|
|
available = []
|
|
for f in Path(self.data_dir).glob('step_before-*.jpeg'):
|
|
step_n=re.search(r'step_before-(\d+)', f.name)
|
|
if step_n:
|
|
available.append(step_n.group(1))
|
|
return available
|
|
|
|
def compile_error_texts(self, has_proxies=None):
|
|
"""Compile error texts for this watch.
|
|
Accepts has_proxies parameter to ensure it works even outside app context"""
|
|
from flask import url_for
|
|
from markupsafe import Markup
|
|
|
|
output = [] # Initialize as list since we're using append
|
|
last_error = self.get('last_error','')
|
|
|
|
try:
|
|
url_for('settings.settings_page')
|
|
except Exception as e:
|
|
has_app_context = False
|
|
else:
|
|
has_app_context = True
|
|
|
|
# has app+request context, we can use url_for()
|
|
if has_app_context:
|
|
if last_error:
|
|
last_error = safe_jinja.render_fully_escaped(last_error)
|
|
if '403' in last_error:
|
|
if has_proxies:
|
|
output.append(str(Markup(f"{last_error} - <a href=\"{url_for('settings.settings_page', uuid=self.get('uuid'))}\">Try other proxies/location</a> '")))
|
|
else:
|
|
output.append(str(Markup(f"{last_error} - <a href=\"{url_for('settings.settings_page', uuid=self.get('uuid'))}\">Try adding external proxies/locations</a> '")))
|
|
else:
|
|
output.append(str(Markup(last_error)))
|
|
|
|
if self.get('last_notification_error'):
|
|
txt = safe_jinja.render_fully_escaped(self.get('last_notification_error'))
|
|
result = f'<div class="notification-error"><a href="{url_for("settings.notification_logs")}">{txt}</a></div>'
|
|
output.append(result)
|
|
|
|
else:
|
|
# Lo_Fi version - no app context, cant rely on Jinja2 Markup
|
|
if last_error:
|
|
output.append(safe_jinja.render_fully_escaped(last_error))
|
|
if self.get('last_notification_error'):
|
|
output.append(safe_jinja.render_fully_escaped(self.get('last_notification_error')))
|
|
|
|
res = "\n".join(output)
|
|
return res
|
|
|