mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-15 08:54:02 -06:00
619 lines
22 KiB
Python
619 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import time
|
|
from flask import url_for
|
|
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client, delete_all_watches
|
|
import os
|
|
|
|
from ..store import ChangeDetectionStore
|
|
|
|
|
|
# def test_setup(client, live_server, measure_memory_usage, datastore_path):
|
|
# live_server_setup(live_server) # Setup on conftest per function
|
|
|
|
def set_original_response(datastore_path):
|
|
test_return_data = """<html>
|
|
<body>
|
|
Some initial text<br>
|
|
<p id="only-this">Should be only this</p>
|
|
<br>
|
|
<p id="not-this">And never this</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
|
f.write(test_return_data)
|
|
return None
|
|
|
|
def set_modified_response(datastore_path):
|
|
test_return_data = """<html>
|
|
<body>
|
|
Some initial text<br>
|
|
<p id="only-this">Should be REALLY only this</p>
|
|
<br>
|
|
<p id="not-this">And never this</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
|
f.write(test_return_data)
|
|
return None
|
|
|
|
def test_setup_group_tag(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
set_original_response(datastore_path=datastore_path)
|
|
|
|
# Add a tag with some config, import a tag and it should roughly work
|
|
res = client.post(
|
|
url_for("tags.form_tag_add"),
|
|
data={"name": "test-tag"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Tag added" in res.data
|
|
assert b"test-tag" in res.data
|
|
|
|
res = client.post(
|
|
url_for("tags.form_tag_edit_submit", uuid="first"),
|
|
data={"name": "test-tag",
|
|
"include_filters": '#only-this',
|
|
"subtractive_selectors": '#not-this'},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Updated" in res.data
|
|
tag_uuid = get_UUID_for_tag_name(client, name="test-tag")
|
|
res = client.get(
|
|
url_for("tags.form_tag_edit", uuid="first")
|
|
)
|
|
assert b"#only-this" in res.data
|
|
assert b"#not-this" in res.data
|
|
|
|
# Tag should be setup and ready, now add a watch
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
url_for("imports.import_page"),
|
|
data={"urls": test_url + "?first-imported=1 test-tag, extra-import-tag"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"1 Imported" in res.data
|
|
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert b'import-tag' in res.data
|
|
assert b'extra-import-tag' in res.data
|
|
|
|
res = client.get(
|
|
url_for("tags.tags_overview_page"),
|
|
follow_redirects=True
|
|
)
|
|
assert b'import-tag' in res.data
|
|
assert b'extra-import-tag' in res.data
|
|
|
|
wait_for_all_checks(client)
|
|
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert b'Warning, no filters were found' not in res.data
|
|
|
|
res = client.get(
|
|
url_for("ui.ui_preview.preview_page", uuid="first"),
|
|
follow_redirects=True
|
|
)
|
|
assert b'Should be only this' in res.data
|
|
assert b'And never this' not in res.data
|
|
|
|
res = client.get(
|
|
url_for("ui.ui_edit.edit_page", uuid="first"),
|
|
follow_redirects=True
|
|
)
|
|
# 2307 the UI notice should appear in the placeholder
|
|
assert b'WARNING: Watch has tag/groups set with special filters' in res.data
|
|
|
|
# RSS Group tag filter
|
|
# An extra one that should be excluded
|
|
res = client.post(
|
|
url_for("imports.import_page"),
|
|
data={"urls": test_url + "?should-be-excluded=1 some-tag"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"1 Imported" in res.data
|
|
wait_for_all_checks(client)
|
|
set_modified_response(datastore_path=datastore_path)
|
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
|
wait_for_all_checks(client)
|
|
rss_token = extract_rss_token_from_UI(client)
|
|
res = client.get(
|
|
url_for("rss.feed", token=rss_token, tag="extra-import-tag", _external=True),
|
|
follow_redirects=True
|
|
)
|
|
assert b"should-be-excluded" not in res.data
|
|
assert res.status_code == 200
|
|
assert b"first-imported=1" in res.data
|
|
delete_all_watches(client)
|
|
|
|
def test_tag_import_singular(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
url_for("imports.import_page"),
|
|
data={"urls": test_url + " test-tag, test-tag\r\n"+ test_url + "?x=1 test-tag, test-tag\r\n"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"2 Imported" in res.data
|
|
|
|
res = client.get(
|
|
url_for("tags.tags_overview_page"),
|
|
follow_redirects=True
|
|
)
|
|
# Should be only 1 tag because they both had the same
|
|
assert len(live_server.app.config['DATASTORE'].data['settings']['application'].get('tags')) ==1
|
|
|
|
delete_all_watches(client)
|
|
|
|
def test_tag_add_in_ui(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
#
|
|
res = client.post(
|
|
url_for("tags.form_tag_add"),
|
|
data={"name": "new-test-tag"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Tag added" in res.data
|
|
assert b"new-test-tag" in res.data
|
|
|
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
|
assert b'All tags deleted' in res.data
|
|
|
|
delete_all_watches(client)
|
|
|
|
def test_group_tag_notification(client, live_server, measure_memory_usage, datastore_path):
|
|
delete_all_watches(client)
|
|
|
|
set_original_response(datastore_path=datastore_path)
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
url_for("ui.ui_views.form_quick_watch_add"),
|
|
data={"url": test_url, "tags": 'test-tag, other-tag'},
|
|
follow_redirects=True
|
|
)
|
|
|
|
assert b"Watch added" in res.data
|
|
|
|
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
|
|
notification_form_data = {"notification_urls": notification_url,
|
|
"notification_title": "New GROUP TAG ChangeDetection.io Notification - {{watch_url}}",
|
|
"notification_body": "BASE URL: {{base_url}}\n"
|
|
"Watch URL: {{watch_url}}\n"
|
|
"Watch UUID: {{watch_uuid}}\n"
|
|
"Watch title: {{watch_title}}\n"
|
|
"Watch tag: {{watch_tag}}\n"
|
|
"Preview: {{preview_url}}\n"
|
|
"Diff URL: {{diff_url}}\n"
|
|
"Snapshot: {{current_snapshot}}\n"
|
|
"Diff: {{diff}}\n"
|
|
"Diff Added: {{diff_added}}\n"
|
|
"Diff Removed: {{diff_removed}}\n"
|
|
"Diff Full: {{diff_full}}\n"
|
|
"Diff as Patch: {{diff_patch}}\n"
|
|
":-)",
|
|
"notification_screenshot": True,
|
|
"notification_format": 'text',
|
|
"title": "test-tag"}
|
|
|
|
res = client.post(
|
|
url_for("tags.form_tag_edit_submit", uuid=get_UUID_for_tag_name(client, name="test-tag")),
|
|
data=notification_form_data,
|
|
follow_redirects=True
|
|
)
|
|
assert b"Updated" in res.data
|
|
|
|
wait_for_all_checks(client)
|
|
|
|
set_modified_response(datastore_path=datastore_path)
|
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
|
wait_for_all_checks(client)
|
|
|
|
time.sleep(3)
|
|
|
|
assert os.path.isfile(os.path.join(datastore_path, "notification.txt"))
|
|
|
|
# Verify what was sent as a notification, this file should exist
|
|
with open(os.path.join(datastore_path, "notification.txt"), "r") as f:
|
|
notification_submission = f.read()
|
|
os.unlink(os.path.join(datastore_path, "notification.txt"))
|
|
|
|
# Did we see the URL that had a change, in the notification?
|
|
# Diff was correctly executed
|
|
assert test_url in notification_submission
|
|
assert ':-)' in notification_submission
|
|
assert "Diff Full: Some initial text" in notification_submission
|
|
assert "New GROUP TAG ChangeDetection.io" in notification_submission
|
|
assert "test-tag" in notification_submission
|
|
assert "other-tag" in notification_submission
|
|
|
|
#@todo Test that multiple notifications fired
|
|
#@todo Test that each of multiple notifications with different settings
|
|
delete_all_watches(client)
|
|
|
|
def test_limit_tag_ui(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
test_url = url_for('test_random_content_endpoint', _external=True)
|
|
|
|
# A space can label the tag, only the first one will have a tag
|
|
client.post(
|
|
url_for("imports.import_page"),
|
|
data={"urls": f"{test_url} test-tag\r\n{test_url}"},
|
|
follow_redirects=True
|
|
)
|
|
tag_uuid = get_UUID_for_tag_name(client, name="test-tag")
|
|
assert tag_uuid
|
|
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert b'test-tag' in res.data
|
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
|
wait_for_all_checks(client)
|
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
|
wait_for_all_checks(client)
|
|
|
|
# Should be both unviewed
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert res.data.count(b' unviewed ') == 2
|
|
|
|
|
|
# Now we recheck only the tag
|
|
client.get(url_for('ui.mark_all_viewed', tag=tag_uuid), follow_redirects=True)
|
|
wait_for_all_checks(client)
|
|
|
|
# Should be only 1 unviewed
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert res.data.count(b' unviewed ') == 1
|
|
|
|
|
|
delete_all_watches(client)
|
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
|
assert b'All tags deleted' in res.data
|
|
|
|
def test_clone_tag_on_import(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
url_for("imports.import_page"),
|
|
data={"urls": test_url + " test-tag, another-tag\r\n"},
|
|
follow_redirects=True
|
|
)
|
|
|
|
assert b"1 Imported" in res.data
|
|
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert b'test-tag' in res.data
|
|
assert b'another-tag' in res.data
|
|
|
|
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
|
res = client.get(url_for("ui.form_clone", uuid=watch_uuid), follow_redirects=True)
|
|
|
|
assert b'Cloned' in res.data
|
|
res = client.get(url_for("watchlist.index"))
|
|
# 2 times plus the top link to tag
|
|
assert res.data.count(b'test-tag') == 3
|
|
assert res.data.count(b'another-tag') == 3
|
|
delete_all_watches(client)
|
|
|
|
def test_clone_tag_on_quickwatchform_add(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
|
|
res = client.post(
|
|
url_for("ui.ui_views.form_quick_watch_add"),
|
|
data={"url": test_url, "tags": ' test-tag, another-tag '},
|
|
follow_redirects=True
|
|
)
|
|
|
|
assert b"Watch added" in res.data
|
|
|
|
res = client.get(url_for("watchlist.index"))
|
|
assert b'test-tag' in res.data
|
|
assert b'another-tag' in res.data
|
|
|
|
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
|
res = client.get(url_for("ui.form_clone", uuid=watch_uuid), follow_redirects=True)
|
|
assert b'Cloned' in res.data
|
|
|
|
res = client.get(url_for("watchlist.index"))
|
|
# 2 times plus the top link to tag
|
|
assert res.data.count(b'test-tag') == 3
|
|
assert res.data.count(b'another-tag') == 3
|
|
delete_all_watches(client)
|
|
|
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
|
assert b'All tags deleted' in res.data
|
|
|
|
def test_order_of_filters_tag_filter_and_watch_filter(client, live_server, measure_memory_usage, datastore_path):
|
|
|
|
# Add a tag with some config, import a tag and it should roughly work
|
|
res = client.post(
|
|
url_for("tags.form_tag_add"),
|
|
data={"name": "test-tag-keep-order"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Tag added" in res.data
|
|
assert b"test-tag-keep-order" in res.data
|
|
tag_filters = [
|
|
'#only-this', # duplicated filters
|
|
'#only-this',
|
|
'#only-this',
|
|
'#only-this',
|
|
]
|
|
|
|
res = client.post(
|
|
url_for("tags.form_tag_edit_submit", uuid="first"),
|
|
data={"name": "test-tag-keep-order",
|
|
"include_filters": '\n'.join(tag_filters) },
|
|
follow_redirects=True
|
|
)
|
|
assert b"Updated" in res.data
|
|
tag_uuid = get_UUID_for_tag_name(client, name="test-tag-keep-order")
|
|
res = client.get(
|
|
url_for("tags.form_tag_edit", uuid="first")
|
|
)
|
|
assert b"#only-this" in res.data
|
|
|
|
|
|
d = """<html>
|
|
<body>
|
|
Some initial text<br>
|
|
<p id="only-this">And 1 this</p>
|
|
<br>
|
|
<p id="not-this">And 2 this</p>
|
|
<p id="">And 3 this</p><!--/html/body/p[3]/-->
|
|
<p id="">And 4 this</p><!--/html/body/p[4]/-->
|
|
<p id="">And 5 this</p><!--/html/body/p[5]/-->
|
|
<p id="">And 6 this</p><!--/html/body/p[6]/-->
|
|
<p id="">And 7 this</p><!--/html/body/p[7]/-->
|
|
<p id="">And 8 this</p><!--/html/body/p[8]/-->
|
|
<p id="">And 9 this</p><!--/html/body/p[9]/-->
|
|
<p id="">And 10 this</p><!--/html/body/p[10]/-->
|
|
<p id="">And 11 this</p><!--/html/body/p[11]/-->
|
|
<p id="">And 12 this</p><!--/html/body/p[12]/-->
|
|
<p id="">And 13 this</p><!--/html/body/p[13]/-->
|
|
<p id="">And 14 this</p><!--/html/body/p[14]/-->
|
|
<p id="not-this">And 15 this</p><!--/html/body/p[15]/-->
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
|
f.write(d)
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
|
wait_for_all_checks(client)
|
|
|
|
filters = [
|
|
'/html/body/p[3]',
|
|
'/html/body/p[4]',
|
|
'/html/body/p[5]',
|
|
'/html/body/p[6]',
|
|
'/html/body/p[7]',
|
|
'/html/body/p[8]',
|
|
'/html/body/p[9]',
|
|
'/html/body/p[10]',
|
|
'/html/body/p[11]',
|
|
'/html/body/p[12]',
|
|
'/html/body/p[13]', # duplicated tags
|
|
'/html/body/p[13]',
|
|
'/html/body/p[13]',
|
|
'/html/body/p[13]',
|
|
'/html/body/p[13]',
|
|
'/html/body/p[14]',
|
|
]
|
|
|
|
res = client.post(
|
|
url_for("ui.ui_edit.edit_page", uuid="first"),
|
|
data={"include_filters": '\n'.join(filters),
|
|
"url": test_url,
|
|
"tags": "test-tag-keep-order",
|
|
"headers": "",
|
|
'fetch_backend': "html_requests",
|
|
"time_between_check_use_default": "y"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Updated watch." in res.data
|
|
wait_for_all_checks(client)
|
|
|
|
res = client.get(
|
|
url_for("ui.ui_preview.preview_page", uuid="first"),
|
|
follow_redirects=True
|
|
)
|
|
|
|
assert b"And 1 this" in res.data # test-tag-keep-order
|
|
|
|
a_tag_filter_check = b'And 1 this' #'#only-this' of tag_filters
|
|
# check there is no duplication of tag_filters
|
|
assert res.data.count(a_tag_filter_check) == 1, f"duplicated filters didn't removed {res.data.count(a_tag_filter_check)} of {a_tag_filter_check} in {res.data=}"
|
|
|
|
a_filter_check = b"And 13 this" # '/html/body/p[13]'
|
|
# check there is no duplication of filters
|
|
assert res.data.count(a_filter_check) == 1, f"duplicated filters didn't removed. {res.data.count(a_filter_check)} of {a_filter_check} in {res.data=}"
|
|
|
|
a_filter_check_not_include = b"And 2 this" # '/html/body/p[2]'
|
|
assert a_filter_check_not_include not in res.data
|
|
|
|
checklist = [
|
|
b"And 3 this",
|
|
b"And 4 this",
|
|
b"And 5 this",
|
|
b"And 6 this",
|
|
b"And 7 this",
|
|
b"And 8 this",
|
|
b"And 9 this",
|
|
b"And 10 this",
|
|
b"And 11 this",
|
|
b"And 12 this",
|
|
b"And 13 this",
|
|
b"And 14 this",
|
|
b"And 1 this", # result of filter from tag.
|
|
]
|
|
# check whether everything a user requested is there
|
|
for test in checklist:
|
|
assert test in res.data
|
|
|
|
# check whether everything a user requested is in order of filters.
|
|
n = 0
|
|
for test in checklist:
|
|
t_index = res.data[n:].find(test)
|
|
# if the text is not searched, return -1.
|
|
assert t_index >= 0, f"""failed because {test=} not in {res.data[n:]=}
|
|
#####################
|
|
Looks like some feature changed the order of result of filters.
|
|
#####################
|
|
the {test} appeared before. {test in res.data[:n]=}
|
|
{res.data[:n]=}
|
|
"""
|
|
n += t_index + len(test)
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_tag_json_persistence(client, live_server, measure_memory_usage, datastore_path):
|
|
"""
|
|
Test that tags are saved to individual tag.json files and loaded correctly.
|
|
|
|
This test verifies the update_27 tag storage refactoring:
|
|
- Tags are saved to {uuid}/tag.json files
|
|
- Tags persist across datastore restarts
|
|
- Tag edits write to tag.json
|
|
- Tag deletion removes tag.json file
|
|
"""
|
|
import json
|
|
|
|
datastore = client.application.config.get('DATASTORE')
|
|
|
|
# 1. Create a tag
|
|
res = client.post(
|
|
url_for("tags.form_tag_add"),
|
|
data={"name": "persistence-test-tag"},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Tag added" in res.data
|
|
|
|
tag_uuid = get_UUID_for_tag_name(client, name="persistence-test-tag")
|
|
assert tag_uuid, "Tag UUID should exist"
|
|
|
|
# 2. Verify tag.json file was created
|
|
tag_json_path = os.path.join(datastore_path, tag_uuid, "tag.json")
|
|
assert os.path.exists(tag_json_path), f"tag.json should exist at {tag_json_path}"
|
|
|
|
# 3. Verify tag.json contains correct data
|
|
with open(tag_json_path, 'r') as f:
|
|
tag_data = json.load(f)
|
|
assert tag_data['title'] == 'persistence-test-tag'
|
|
assert tag_data['uuid'] == tag_uuid
|
|
assert 'date_created' in tag_data
|
|
|
|
# 4. Edit the tag
|
|
res = client.post(
|
|
url_for("tags.form_tag_edit_submit", uuid=tag_uuid),
|
|
data={
|
|
"name": "persistence-test-tag",
|
|
"notification_muted": True,
|
|
"include_filters": '#test-filter'
|
|
},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Updated" in res.data
|
|
|
|
# 5. Verify tag.json was updated
|
|
with open(tag_json_path, 'r') as f:
|
|
tag_data = json.load(f)
|
|
assert tag_data['notification_muted'] == True
|
|
assert '#test-filter' in tag_data.get('include_filters', [])
|
|
|
|
# 5a. Verify tag is NOT in changedetection.json (tags should be in tag.json only)
|
|
changedetection_json_path = os.path.join(datastore_path, "changedetection.json")
|
|
with open(changedetection_json_path, 'r') as f:
|
|
settings_data = json.load(f)
|
|
# Tags dict should be empty in settings (all tags are in individual files)
|
|
assert settings_data['settings']['application']['tags'] == {}, \
|
|
"Tags should NOT be saved to changedetection.json (should be empty dict)"
|
|
|
|
# 6. Simulate restart - reload datastore
|
|
datastore2 = ChangeDetectionStore(datastore_path=datastore_path, include_default_watches=False, version_tag='test')
|
|
|
|
# 7. Verify tag was loaded from tag.json
|
|
assert tag_uuid in datastore2.data['settings']['application']['tags']
|
|
loaded_tag = datastore2.data['settings']['application']['tags'][tag_uuid]
|
|
assert loaded_tag['title'] == 'persistence-test-tag'
|
|
assert loaded_tag['notification_muted'] == True
|
|
assert '#test-filter' in loaded_tag.get('include_filters', [])
|
|
|
|
# 8. Delete the tag via API
|
|
res = client.get(url_for("tags.delete", uuid=tag_uuid), follow_redirects=True)
|
|
assert b"Tag deleted" in res.data
|
|
|
|
# 9. Verify tag.json file was deleted
|
|
assert not os.path.exists(tag_json_path), f"tag.json should be deleted at {tag_json_path}"
|
|
|
|
# 10. Verify tag is removed from settings
|
|
assert tag_uuid not in datastore.data['settings']['application']['tags']
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_tag_json_migration_update_27(client, live_server, measure_memory_usage, datastore_path):
|
|
"""
|
|
Test that update_27 migration correctly moves tags to individual files.
|
|
|
|
This simulates a pre-update_27 datastore and verifies migration works.
|
|
"""
|
|
import json
|
|
|
|
# 1. Create multiple tags
|
|
tag_names = ['migration-tag-1', 'migration-tag-2', 'migration-tag-3']
|
|
tag_uuids = []
|
|
|
|
for tag_name in tag_names:
|
|
res = client.post(
|
|
url_for("tags.form_tag_add"),
|
|
data={"name": tag_name},
|
|
follow_redirects=True
|
|
)
|
|
assert b"Tag added" in res.data
|
|
tag_uuid = get_UUID_for_tag_name(client, name=tag_name)
|
|
tag_uuids.append(tag_uuid)
|
|
|
|
# 2. Verify all tag.json files exist (update_27 already ran during add_tag)
|
|
for tag_uuid in tag_uuids:
|
|
tag_json_path = os.path.join(datastore_path, tag_uuid, "tag.json")
|
|
assert os.path.exists(tag_json_path), f"tag.json should exist for {tag_uuid}"
|
|
|
|
# 2a. Verify tags are NOT in changedetection.json
|
|
changedetection_json_path = os.path.join(datastore_path, "changedetection.json")
|
|
with open(changedetection_json_path, 'r') as f:
|
|
settings_data = json.load(f)
|
|
assert settings_data['settings']['application']['tags'] == {}, \
|
|
"Tags should NOT be in changedetection.json after migration"
|
|
|
|
# 3. Simulate restart
|
|
datastore2 = ChangeDetectionStore(datastore_path=datastore_path, include_default_watches=False, version_tag='test')
|
|
|
|
# 4. Verify all tags loaded from tag.json files
|
|
for idx, tag_uuid in enumerate(tag_uuids):
|
|
assert tag_uuid in datastore2.data['settings']['application']['tags']
|
|
loaded_tag = datastore2.data['settings']['application']['tags'][tag_uuid]
|
|
assert loaded_tag['title'] == tag_names[idx]
|
|
|
|
# Cleanup
|
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
|
assert b'All tags deleted' in res.data
|
|
|
|
# Verify all tag.json files were deleted
|
|
for tag_uuid in tag_uuids:
|
|
tag_json_path = os.path.join(datastore_path, tag_uuid, "tag.json")
|
|
assert not os.path.exists(tag_json_path), f"tag.json should be deleted for {tag_uuid}"
|
|
|
|
delete_all_watches(client)
|