mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-15 08:54:02 -06:00
53 lines
2.5 KiB
Python
53 lines
2.5 KiB
Python
import os
|
|
import time
|
|
from flask import url_for
|
|
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
|
|
from ..notification import valid_notification_formats
|
|
from loguru import logger
|
|
|
|
def test_queue_system(client, live_server, measure_memory_usage, datastore_path):
|
|
"""Test that multiple workers can process queue concurrently without blocking each other"""
|
|
# (pytest) Werkzeug's threaded server uses ThreadPoolExecutor with a default limit of around 40 threads (or min(32, os.cpu_count() + 4)).
|
|
items = os.cpu_count() +3
|
|
delay = 10
|
|
# Auto-queue is off here.
|
|
live_server.app.config['DATASTORE'].data['settings']['application']['all_paused'] = True
|
|
|
|
test_urls = [
|
|
f"{url_for('test_endpoint', _external=True)}?delay={delay}&id={i}&content=hello+test+content+{i}"
|
|
for i in range(0, items)
|
|
]
|
|
|
|
# Import 30 URLs to queue
|
|
res = client.post(
|
|
url_for("imports.import_page"),
|
|
data={"urls": "\r\n".join(test_urls)},
|
|
follow_redirects=True
|
|
)
|
|
assert f"{items} Imported".encode('utf-8') in res.data
|
|
|
|
client.application.set_workers(items)
|
|
|
|
start = time.time()
|
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
|
time.sleep(delay/2)
|
|
|
|
# Verify all workers are idle (no UUIDs being processed)
|
|
from changedetectionio import worker_pool
|
|
running_uuids = worker_pool.get_running_uuids()
|
|
logger.debug( f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}")
|
|
assert len(running_uuids) != 0, f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}"
|
|
|
|
wait_for_all_checks(client)
|
|
|
|
# all workers should be done in less than say 10 seconds (they take time to 'see' something is in the queue too)
|
|
total_time = (time.time() - start)
|
|
logger.debug(f"All workers finished {items} items in less than {delay} seconds per job. {total_time}s total")
|
|
# if there was a bug in queue handler not running parallel, this would blow out to items*delay seconds
|
|
assert total_time < delay + 10, f"All workers finished {items} items in less than {delay} seconds per job, total time {total_time}s"
|
|
|
|
# Verify all workers are idle (no UUIDs being processed)
|
|
from changedetectionio import worker_pool
|
|
running_uuids = worker_pool.get_running_uuids()
|
|
assert len(running_uuids) == 0, f"Expected all workers to be idle, but {len(running_uuids)} UUIDs still being processed: {running_uuids}"
|