mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-15 08:54:02 -06:00
662 lines
24 KiB
Python
662 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for immediate commit-based persistence system.
|
|
|
|
Tests cover:
|
|
- Watch.commit() persistence to disk
|
|
- Concurrent commit safety (race conditions)
|
|
- Processor config separation
|
|
- Data loss prevention (settings, tags, watch modifications)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
from flask import url_for
|
|
from .util import wait_for_all_checks
|
|
|
|
|
|
# ==============================================================================
|
|
# 2. Commit() Persistence Tests
|
|
# ==============================================================================
|
|
|
|
def test_watch_commit_persists_to_disk(client, live_server):
|
|
"""Test that watch.commit() actually writes to watch.json immediately"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Create a watch
|
|
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Original Title'})
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Modify and commit
|
|
watch['title'] = 'Modified Title'
|
|
watch['paused'] = True
|
|
watch.commit()
|
|
|
|
# Read directly from disk (bypass datastore cache)
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
assert os.path.exists(watch_json_path), "watch.json should exist on disk"
|
|
|
|
with open(watch_json_path, 'r') as f:
|
|
disk_data = json.load(f)
|
|
|
|
assert disk_data['title'] == 'Modified Title', "Title should be persisted to disk"
|
|
assert disk_data['paused'] == True, "Paused state should be persisted to disk"
|
|
assert disk_data['uuid'] == uuid, "UUID should match"
|
|
|
|
|
|
def test_watch_commit_survives_reload(client, live_server):
|
|
"""Test that committed changes survive datastore reload"""
|
|
from changedetectionio.store import ChangeDetectionStore
|
|
|
|
datastore = client.application.config.get('DATASTORE')
|
|
datastore_path = datastore.datastore_path
|
|
|
|
# Create and modify a watch
|
|
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test Watch'})
|
|
watch = datastore.data['watching'][uuid]
|
|
watch['title'] = 'Persisted Title'
|
|
watch['paused'] = True
|
|
watch['tags'] = ['tag-1', 'tag-2']
|
|
watch.commit()
|
|
|
|
# Simulate app restart - create new datastore instance
|
|
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
|
|
datastore2.reload_state(
|
|
datastore_path=datastore_path,
|
|
include_default_watches=False,
|
|
version_tag='test'
|
|
)
|
|
|
|
# Check data survived
|
|
assert uuid in datastore2.data['watching'], "Watch should exist after reload"
|
|
reloaded_watch = datastore2.data['watching'][uuid]
|
|
assert reloaded_watch['title'] == 'Persisted Title', "Title should survive reload"
|
|
assert reloaded_watch['paused'] == True, "Paused state should survive reload"
|
|
assert reloaded_watch['tags'] == ['tag-1', 'tag-2'], "Tags should survive reload"
|
|
|
|
|
|
def test_watch_commit_atomic_on_crash(client, live_server):
|
|
"""Test that atomic writes prevent corruption (temp file pattern)"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Original'})
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# First successful commit
|
|
watch['title'] = 'First Save'
|
|
watch.commit()
|
|
|
|
# Verify watch.json exists and is valid
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f) # Should not raise JSONDecodeError
|
|
assert data['title'] == 'First Save'
|
|
|
|
# Second commit - even if interrupted, original file should be intact
|
|
# (atomic write uses temp file + rename, so original is never corrupted)
|
|
watch['title'] = 'Second Save'
|
|
watch.commit()
|
|
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data['title'] == 'Second Save'
|
|
|
|
|
|
def test_multiple_watches_commit_independently(client, live_server):
|
|
"""Test that committing one watch doesn't affect others"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Create multiple watches
|
|
uuid1 = datastore.add_watch(url='http://example1.com', extras={'title': 'Watch 1'})
|
|
uuid2 = datastore.add_watch(url='http://example2.com', extras={'title': 'Watch 2'})
|
|
uuid3 = datastore.add_watch(url='http://example3.com', extras={'title': 'Watch 3'})
|
|
|
|
watch1 = datastore.data['watching'][uuid1]
|
|
watch2 = datastore.data['watching'][uuid2]
|
|
watch3 = datastore.data['watching'][uuid3]
|
|
|
|
# Modify and commit only watch2
|
|
watch2['title'] = 'Modified Watch 2'
|
|
watch2['paused'] = True
|
|
watch2.commit()
|
|
|
|
# Read all from disk
|
|
def read_watch_json(uuid):
|
|
watch = datastore.data['watching'][uuid]
|
|
path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
data1 = read_watch_json(uuid1)
|
|
data2 = read_watch_json(uuid2)
|
|
data3 = read_watch_json(uuid3)
|
|
|
|
# Only watch2 should have changes
|
|
assert data1['title'] == 'Watch 1', "Watch 1 should be unchanged"
|
|
assert data1['paused'] == False, "Watch 1 should not be paused"
|
|
|
|
assert data2['title'] == 'Modified Watch 2', "Watch 2 should be modified"
|
|
assert data2['paused'] == True, "Watch 2 should be paused"
|
|
|
|
assert data3['title'] == 'Watch 3', "Watch 3 should be unchanged"
|
|
assert data3['paused'] == False, "Watch 3 should not be paused"
|
|
|
|
|
|
# ==============================================================================
|
|
# 3. Concurrency/Race Condition Tests
|
|
# ==============================================================================
|
|
|
|
def test_concurrent_watch_commits_dont_corrupt(client, live_server):
|
|
"""Test that simultaneous commits to same watch don't corrupt JSON"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test'})
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
errors = []
|
|
|
|
def modify_and_commit(field, value):
|
|
try:
|
|
watch[field] = value
|
|
watch.commit()
|
|
except Exception as e:
|
|
errors.append(e)
|
|
|
|
# Run 10 concurrent commits
|
|
threads = []
|
|
for i in range(10):
|
|
t = threading.Thread(target=modify_and_commit, args=('title', f'Title {i}'))
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Should not have any errors
|
|
assert len(errors) == 0, f"Expected no errors, got: {errors}"
|
|
|
|
# JSON file should still be valid (not corrupted)
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f) # Should not raise JSONDecodeError
|
|
assert data['uuid'] == uuid, "UUID should still be correct"
|
|
assert 'Title' in data['title'], "Title should contain 'Title'"
|
|
|
|
|
|
def test_concurrent_modifications_during_commit(client, live_server):
|
|
"""Test that modifying watch during commit doesn't cause RuntimeError"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test'})
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
errors = []
|
|
stop_flag = threading.Event()
|
|
|
|
def keep_modifying():
|
|
"""Continuously modify watch"""
|
|
try:
|
|
i = 0
|
|
while not stop_flag.is_set():
|
|
watch['title'] = f'Title {i}'
|
|
watch['paused'] = i % 2 == 0
|
|
i += 1
|
|
time.sleep(0.001)
|
|
except Exception as e:
|
|
errors.append(('modifier', e))
|
|
|
|
def keep_committing():
|
|
"""Continuously commit watch"""
|
|
try:
|
|
for _ in range(20):
|
|
watch.commit()
|
|
time.sleep(0.005)
|
|
except Exception as e:
|
|
errors.append(('committer', e))
|
|
|
|
# Start concurrent modification and commits
|
|
modifier = threading.Thread(target=keep_modifying)
|
|
committer = threading.Thread(target=keep_committing)
|
|
|
|
modifier.start()
|
|
committer.start()
|
|
|
|
committer.join()
|
|
stop_flag.set()
|
|
modifier.join()
|
|
|
|
# Should not have RuntimeError from dict changing during iteration
|
|
runtime_errors = [e for source, e in errors if isinstance(e, RuntimeError)]
|
|
assert len(runtime_errors) == 0, f"Should not have RuntimeError, got: {runtime_errors}"
|
|
|
|
|
|
def test_datastore_lock_protects_commit_snapshot(client, live_server):
|
|
"""Test that datastore.lock prevents race conditions during deepcopy"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test'})
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Add some complex nested data
|
|
watch['browser_steps'] = [
|
|
{'operation': 'click', 'selector': '#foo'},
|
|
{'operation': 'wait', 'seconds': 5}
|
|
]
|
|
|
|
errors = []
|
|
commits_succeeded = [0]
|
|
|
|
def rapid_commits():
|
|
try:
|
|
for i in range(50):
|
|
watch['title'] = f'Title {i}'
|
|
watch.commit()
|
|
commits_succeeded[0] += 1
|
|
time.sleep(0.001)
|
|
except Exception as e:
|
|
errors.append(e)
|
|
|
|
# Multiple threads doing rapid commits
|
|
threads = [threading.Thread(target=rapid_commits) for _ in range(3)]
|
|
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert len(errors) == 0, f"Expected no errors, got: {errors}"
|
|
assert commits_succeeded[0] == 150, f"Expected 150 commits, got {commits_succeeded[0]}"
|
|
|
|
# Final JSON should be valid
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data['uuid'] == uuid
|
|
|
|
|
|
# ==============================================================================
|
|
# 4. Processor Config Separation Tests
|
|
# ==============================================================================
|
|
|
|
def test_processor_config_never_in_watch_json(client, live_server):
|
|
"""Test that processor_config_* fields are filtered out of watch.json"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
uuid = datastore.add_watch(
|
|
url='http://example.com',
|
|
extras={
|
|
'title': 'Test Watch',
|
|
'processor': 'restock_diff'
|
|
}
|
|
)
|
|
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Try to set processor config fields (these should be filtered during commit)
|
|
watch['processor_config_price_threshold'] = 10.0
|
|
watch['processor_config_some_setting'] = 'value'
|
|
watch['processor_config_another'] = {'nested': 'data'}
|
|
watch.commit()
|
|
|
|
# Read watch.json from disk
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
# Verify processor_config_* fields are NOT in watch.json
|
|
for key in data.keys():
|
|
assert not key.startswith('processor_config_'), \
|
|
f"Found {key} in watch.json - processor configs should be in separate file!"
|
|
|
|
# Normal fields should still be there
|
|
assert data['title'] == 'Test Watch'
|
|
assert data['processor'] == 'restock_diff'
|
|
|
|
|
|
def test_api_post_saves_processor_config_separately(client, live_server):
|
|
"""Test that API POST saves processor configs to {processor}.json"""
|
|
import json
|
|
from changedetectionio.processors import extract_processor_config_from_form_data
|
|
|
|
# Get API key
|
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
|
|
|
# Create watch via API with processor config
|
|
response = client.post(
|
|
url_for("createwatch"),
|
|
data=json.dumps({
|
|
'url': 'http://example.com',
|
|
'processor': 'restock_diff',
|
|
'processor_config_price_threshold': 10.0,
|
|
'processor_config_in_stock_only': True
|
|
}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_key}
|
|
)
|
|
|
|
assert response.status_code in (200, 201), f"Expected 200/201, got {response.status_code}"
|
|
uuid = response.json.get('uuid')
|
|
assert uuid, "Should return UUID"
|
|
|
|
datastore = client.application.config.get('DATASTORE')
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Check that processor config file exists
|
|
processor_config_path = os.path.join(watch.data_dir, 'restock_diff.json')
|
|
assert os.path.exists(processor_config_path), "Processor config file should exist"
|
|
|
|
with open(processor_config_path, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
# Verify fields are saved WITHOUT processor_config_ prefix
|
|
assert config.get('price_threshold') == 10.0, "Should have price_threshold (no prefix)"
|
|
assert config.get('in_stock_only') == True, "Should have in_stock_only (no prefix)"
|
|
assert 'processor_config_price_threshold' not in config, "Should NOT have prefixed keys"
|
|
|
|
|
|
def test_api_put_saves_processor_config_separately(client, live_server):
|
|
"""Test that API PUT updates processor configs in {processor}.json"""
|
|
import json
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Get API key
|
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
|
|
|
# Create watch
|
|
uuid = datastore.add_watch(
|
|
url='http://example.com',
|
|
extras={'processor': 'restock_diff'}
|
|
)
|
|
|
|
# Update via API with processor config
|
|
response = client.put(
|
|
url_for("watch", uuid=uuid),
|
|
data=json.dumps({
|
|
'processor_config_price_threshold': 15.0,
|
|
'processor_config_min_stock': 5
|
|
}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_key}
|
|
)
|
|
|
|
# PUT might return different status codes, 200 or 204 are both OK
|
|
assert response.status_code in (200, 204), f"Expected 200/204, got {response.status_code}: {response.data}"
|
|
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Check processor config file
|
|
processor_config_path = os.path.join(watch.data_dir, 'restock_diff.json')
|
|
assert os.path.exists(processor_config_path), "Processor config file should exist"
|
|
|
|
with open(processor_config_path, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
assert config.get('price_threshold') == 15.0, "Should have updated price_threshold"
|
|
assert config.get('min_stock') == 5, "Should have min_stock"
|
|
|
|
|
|
def test_ui_edit_saves_processor_config_separately(client, live_server):
|
|
"""Test that processor_config_* fields never appear in watch.json (even from UI)"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Create watch
|
|
uuid = datastore.add_watch(
|
|
url='http://example.com',
|
|
extras={'processor': 'text_json_diff', 'title': 'Test'}
|
|
)
|
|
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Simulate someone accidentally trying to set processor_config fields directly
|
|
watch['processor_config_should_not_save'] = 'test_value'
|
|
watch['processor_config_another_field'] = 123
|
|
watch['normal_field'] = 'this_should_save'
|
|
watch.commit()
|
|
|
|
# Check watch.json has NO processor_config_* fields (main point of this test)
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
watch_data = json.load(f)
|
|
|
|
for key in watch_data.keys():
|
|
assert not key.startswith('processor_config_'), \
|
|
f"Found {key} in watch.json - processor configs should be filtered during commit"
|
|
|
|
# Verify normal fields still save
|
|
assert watch_data['normal_field'] == 'this_should_save', "Normal fields should save"
|
|
assert watch_data['title'] == 'Test', "Original fields should still be there"
|
|
|
|
|
|
def test_browser_steps_normalized_to_empty_list(client, live_server):
|
|
"""Test that meaningless browser_steps are normalized to [] during commit"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com')
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Set browser_steps to meaningless values
|
|
watch['browser_steps'] = [
|
|
{'operation': 'Choose one', 'selector': ''},
|
|
{'operation': 'Goto site', 'selector': ''},
|
|
{'operation': '', 'selector': '#foo'}
|
|
]
|
|
watch.commit()
|
|
|
|
# Read from disk
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
# Should be normalized to empty list
|
|
assert data['browser_steps'] == [], "Meaningless browser_steps should be normalized to []"
|
|
|
|
|
|
# ==============================================================================
|
|
# 5. Data Loss Prevention Tests
|
|
# ==============================================================================
|
|
|
|
def test_settings_persist_after_update(client, live_server):
|
|
"""Test that settings updates are committed and survive restart"""
|
|
from changedetectionio.store import ChangeDetectionStore
|
|
|
|
datastore = client.application.config.get('DATASTORE')
|
|
datastore_path = datastore.datastore_path
|
|
|
|
# Update settings directly (bypass form validation issues)
|
|
datastore.data['settings']['application']['empty_pages_are_a_change'] = True
|
|
datastore.data['settings']['application']['fetch_backend'] = 'html_requests'
|
|
datastore.data['settings']['requests']['time_between_check']['minutes'] = 120
|
|
datastore.commit()
|
|
|
|
# Simulate restart
|
|
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
|
|
datastore2.reload_state(
|
|
datastore_path=datastore_path,
|
|
include_default_watches=False,
|
|
version_tag='test'
|
|
)
|
|
|
|
# Verify settings survived
|
|
assert datastore2.data['settings']['application']['empty_pages_are_a_change'] == True, "empty_pages_are_a_change should persist"
|
|
assert datastore2.data['settings']['application']['fetch_backend'] == 'html_requests', "fetch_backend should persist"
|
|
assert datastore2.data['settings']['requests']['time_between_check']['minutes'] == 120, "time_between_check should persist"
|
|
|
|
|
|
def test_tag_mute_persists(client, live_server):
|
|
"""Test that tag mute/unmute operations persist"""
|
|
from changedetectionio.store import ChangeDetectionStore
|
|
|
|
datastore = client.application.config.get('DATASTORE')
|
|
datastore_path = datastore.datastore_path
|
|
|
|
# Add a tag
|
|
tag_uuid = datastore.add_tag('Test Tag')
|
|
|
|
# Mute the tag
|
|
response = client.get(url_for("tags.mute", uuid=tag_uuid))
|
|
assert response.status_code == 302 # Redirect
|
|
|
|
# Verify muted in memory
|
|
assert datastore.data['settings']['application']['tags'][tag_uuid]['notification_muted'] == True
|
|
|
|
# Simulate restart
|
|
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
|
|
datastore2.reload_state(
|
|
datastore_path=datastore_path,
|
|
include_default_watches=False,
|
|
version_tag='test'
|
|
)
|
|
|
|
# Verify mute state survived
|
|
assert tag_uuid in datastore2.data['settings']['application']['tags']
|
|
assert datastore2.data['settings']['application']['tags'][tag_uuid]['notification_muted'] == True
|
|
|
|
|
|
def test_tag_delete_removes_from_watches(client, live_server):
|
|
"""Test that deleting a tag removes it from all watches"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Create a tag
|
|
tag_uuid = datastore.add_tag('Test Tag')
|
|
|
|
# Create watches with this tag
|
|
uuid1 = datastore.add_watch(url='http://example1.com')
|
|
uuid2 = datastore.add_watch(url='http://example2.com')
|
|
uuid3 = datastore.add_watch(url='http://example3.com')
|
|
|
|
watch1 = datastore.data['watching'][uuid1]
|
|
watch2 = datastore.data['watching'][uuid2]
|
|
watch3 = datastore.data['watching'][uuid3]
|
|
|
|
watch1['tags'] = [tag_uuid]
|
|
watch1.commit()
|
|
watch2['tags'] = [tag_uuid, 'other-tag']
|
|
watch2.commit()
|
|
# watch3 has no tags
|
|
|
|
# Delete the tag
|
|
response = client.get(url_for("tags.delete", uuid=tag_uuid))
|
|
assert response.status_code == 302
|
|
|
|
# Wait for background thread to complete
|
|
time.sleep(1)
|
|
|
|
# Tag should be removed from settings
|
|
assert tag_uuid not in datastore.data['settings']['application']['tags']
|
|
|
|
# Tag should be removed from watches and persisted
|
|
def check_watch_tags(uuid):
|
|
watch = datastore.data['watching'][uuid]
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
return json.load(f)['tags']
|
|
|
|
assert tag_uuid not in check_watch_tags(uuid1), "Tag should be removed from watch1"
|
|
assert tag_uuid not in check_watch_tags(uuid2), "Tag should be removed from watch2"
|
|
assert 'other-tag' in check_watch_tags(uuid2), "Other tags should remain in watch2"
|
|
assert check_watch_tags(uuid3) == [], "Watch3 should still have empty tags"
|
|
|
|
|
|
def test_watch_pause_unpause_persists(client, live_server):
|
|
"""Test that pause/unpause operations commit and persist"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Get API key
|
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com')
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Pause via API
|
|
response = client.get(url_for("watch", uuid=uuid, paused='paused'), headers={'x-api-key': api_key})
|
|
assert response.status_code == 200
|
|
|
|
# Check persisted to disk
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data['paused'] == True, "Pause should be persisted"
|
|
|
|
# Unpause
|
|
response = client.get(url_for("watch", uuid=uuid, paused='unpaused'), headers={'x-api-key': api_key})
|
|
assert response.status_code == 200
|
|
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data['paused'] == False, "Unpause should be persisted"
|
|
|
|
|
|
def test_watch_mute_unmute_persists(client, live_server):
|
|
"""Test that mute/unmute operations commit and persist"""
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# Get API key
|
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
|
|
|
uuid = datastore.add_watch(url='http://example.com')
|
|
watch = datastore.data['watching'][uuid]
|
|
|
|
# Mute via API
|
|
response = client.get(url_for("watch", uuid=uuid, muted='muted'), headers={'x-api-key': api_key})
|
|
assert response.status_code == 200
|
|
|
|
# Check persisted to disk
|
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data['notification_muted'] == True, "Mute should be persisted"
|
|
|
|
# Unmute
|
|
response = client.get(url_for("watch", uuid=uuid, muted='unmuted'), headers={'x-api-key': api_key})
|
|
assert response.status_code == 200
|
|
|
|
with open(watch_json_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data['notification_muted'] == False, "Unmute should be persisted"
|
|
|
|
|
|
def test_ui_watch_edit_persists_all_fields(client, live_server):
|
|
"""Test that UI watch edit form persists all modified fields"""
|
|
from changedetectionio.store import ChangeDetectionStore
|
|
|
|
datastore = client.application.config.get('DATASTORE')
|
|
datastore_path = datastore.datastore_path
|
|
|
|
# Create watch
|
|
uuid = datastore.add_watch(url='http://example.com')
|
|
|
|
# Edit via UI with multiple field changes
|
|
response = client.post(
|
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
|
data={
|
|
'url': 'http://updated-example.com',
|
|
'title': 'Updated Watch Title',
|
|
'time_between_check-hours': '2',
|
|
'time_between_check-minutes': '30',
|
|
'include_filters': '#content',
|
|
'fetch_backend': 'html_requests',
|
|
'method': 'POST',
|
|
'ignore_text': 'Advertisement\nTracking'
|
|
},
|
|
follow_redirects=True
|
|
)
|
|
|
|
assert b"Updated watch" in response.data or b"Saved" in response.data
|
|
|
|
# Simulate restart
|
|
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
|
|
datastore2.reload_state(
|
|
datastore_path=datastore_path,
|
|
include_default_watches=False,
|
|
version_tag='test'
|
|
)
|
|
|
|
# Verify all fields survived
|
|
watch = datastore2.data['watching'][uuid]
|
|
assert watch['url'] == 'http://updated-example.com'
|
|
assert watch['title'] == 'Updated Watch Title'
|
|
assert watch['time_between_check']['hours'] == 2
|
|
assert watch['time_between_check']['minutes'] == 30
|
|
assert watch['fetch_backend'] == 'html_requests'
|
|
assert watch['method'] == 'POST'
|