import Foundation import GRDB import HAKit import UIKit // MARK: - AppDatabaseUpdater /// AppDatabaseUpdater coordinates fetching data from servers and persisting it into the local database. /// It ensures only one update per server runs at a time (different servers can update concurrently), /// applies per-server throttling with backoff, and performs careful cancellation and batched DB writes. public protocol AppDatabaseUpdaterProtocol { func stop() func update(server: Server, forceUpdate: Bool) } final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { enum UpdateError: Error { case noAPI } // MARK: - Cancellation Helper /// Centralized cancellation check that can be customized in the future. /// Returns `true` if the current task has been cancelled or the app is no longer in the foreground. private func isUpdateCancelled() -> Bool { Task.isCancelled || !Current.isForegroundApp() } /// Represents each step in the server update process enum UpdateStep: Int, CaseIterable { case entities = 1 case entitiesRegistryListForDisplay = 2 case entitiesRegistry = 3 case devicesRegistry = 4 case areas = 5 /// The total number of update steps static var totalSteps: Int { allCases.count } /// Human-readable description of the step var description: String { switch self { case .entities: return "Fetching entities" case .entitiesRegistryListForDisplay: return "Fetching entity display data" case .entitiesRegistry: return "Fetching entity registry" case .devicesRegistry: return "Fetching device registry" case .areas: return "Fetching areas" } } } // Actor for thread-safe task management and queuing private actor TaskCoordinator { private var currentUpdateTasks: [String: Task] = [:] private var updateQueue: [(serverId: String, task: () async -> Void)] = [] private var isProcessingQueue = false func getTask(for serverId: String) -> Task? { currentUpdateTasks[serverId] } func setTask(_ task: Task, for serverId: String) { currentUpdateTasks[serverId] = task } func removeTask(for serverId: String) { currentUpdateTasks.removeValue(forKey: serverId) } func cancelAllTasks() { for (_, task) in currentUpdateTasks { task.cancel() } currentUpdateTasks.removeAll() updateQueue.removeAll() isProcessingQueue = false } /// Enqueues a server update task to be processed sequentially func enqueueUpdate(serverId: String, task: @escaping () async -> Void) { // Check if this server is already in the queue if updateQueue.contains(where: { $0.serverId == serverId }) { Current.Log.verbose("Update for server \(serverId) already queued, skipping duplicate") return } updateQueue.append((serverId: serverId, task: task)) // Start processing if not already running if !isProcessingQueue { Task { await processQueue() } } } /// Processes queued updates one at a time private func processQueue() async { guard !isProcessingQueue else { return } isProcessingQueue = true while !updateQueue.isEmpty { let queuedUpdate = updateQueue.removeFirst() Current.Log.verbose("Processing queued update for server: \(queuedUpdate.serverId)") await queuedUpdate.task() } isProcessingQueue = false } } private let taskCoordinator = TaskCoordinator() // Simple adaptive throttling/backoff // - Tracks consecutive failures per server to increase delay between attempts. // - Tracks per-server last successful (or attempted) update times to avoid over-fetching. private var consecutiveFailuresByServer: [String: Int] = [:] private var perServerLastUpdate: [String: Date] = [:] // Base throttle applied to all servers; backoff is added on top of this. private let baseThrottleSeconds: TimeInterval = 120 static var shared = AppDatabaseUpdater() init() { NotificationCenter.default.addObserver( self, selector: #selector(enterBackground), name: UIApplication.didEnterBackgroundNotification, object: nil ) } @objc private func enterBackground() { stop() } /// Cancels any in-flight work and clears transient state. /// Called when app enters background or when we need to abort updates early. func stop() { Task { await taskCoordinator.cancelAllTasks() } // Reset backoff tracking to free memory and avoid stale penalties consecutiveFailuresByServer.removeAll() } /// Starts an update for a specific server in the background. /// This method returns immediately and does not block the caller. /// - Parameter server: The specific server to update. /// - Parameter forceUpdate: Forces update regardless of other conditions /// - Server updates are queued and processed sequentially, one at a time. /// - Applies per-server throttling with exponential backoff on failures. func update(server: Server, forceUpdate: Bool) { // Explicitly detach from the calling context to ensure we don't block the main thread // Returns immediately while work continues in the background Task.detached(priority: .userInitiated) { [weak self] in guard let self else { return } let serverId = server.identifier.rawValue // Enqueue the update to be processed sequentially await taskCoordinator.enqueueUpdate(serverId: serverId) { [weak self] in guard let self else { return } Current.Log.verbose("Updating database for server \(server.info.name)\(forceUpdate ? " (forced)" : "")") // Show toast indicating update has started await showUpdateToast(for: server) // Launch the server-specific update task let updateTask = Task { [weak self] in guard let self else { return } defer { // Hide toast and clean up task reference when complete Task { await self.hideUpdateToast(for: server) await self.taskCoordinator.removeTask(for: serverId) } } await performSingleServerUpdate(server: server, forceUpdate: forceUpdate) } // Store the task for this server await taskCoordinator.setTask(updateTask, for: serverId) await updateTask.value } } } /// Determines if a specific server should be updated based on connection and throttle rules. private func shouldUpdateServer(_ server: Server, forceUpdate: Bool) -> Bool { guard server.info.connection.activeURL() != nil else { return false } if isUpdateCancelled() { return false } // Skip throttle checks if forceUpdate is true if forceUpdate { return true } // Per-server throttle with exponential backoff if let last = perServerLastUpdate[server.identifier.rawValue] { let failures = consecutiveFailuresByServer[server.identifier.rawValue] ?? 0 let backoff = min(pow(2.0, Double(failures)) * 10.0, 300.0) // 10s, 20s, 40s... up to 5m let threshold = -(baseThrottleSeconds + backoff) return last.timeIntervalSinceNow <= threshold } return true } /// Performs an update for a single specific server. private func performSingleServerUpdate(server: Server, forceUpdate: Bool) async { guard !isUpdateCancelled() else { return } guard shouldUpdateServer(server, forceUpdate: forceUpdate) else { Current.Log.verbose("Skipping update for server \(server.info.name) - throttled") return } let success = await safeUpdateServer(server: server) updateServerTracking(serverId: server.identifier.rawValue, success: success) } /// Updates per-server tracking after an update attempt completes. private func updateServerTracking(serverId: String, success: Bool) { if success { perServerLastUpdate[serverId] = Date() consecutiveFailuresByServer[serverId] = 0 } else { consecutiveFailuresByServer[serverId, default: 0] += 1 } } /// Wraps a per-server update with cancellation checks and returns whether it succeeded. /// This allows the scheduler to apply backoff on failures and update last-run times on success. private func safeUpdateServer(server: Server) async -> Bool { if isUpdateCancelled() { return false } await updateServer(server: server) if isUpdateCancelled() { return false } return true } /// Runs the full update pipeline for a single server in sequence. /// Each phase checks for cancellation to bail out quickly when needed. private func updateServer(server: Server) async { guard !isUpdateCancelled() else { return } let totalTimer = ProfilingTimer("Starting full update for server: \(server.info.name)") // Step 1: Entities (fetch_states) await updateToastStep(for: server, step: .entities) do { let timer = ProfilingTimer("Step 1 (Entities)") await updateEntitiesDatabase(server: server) timer.end() } if isUpdateCancelled() { return } // Step 2: Entities registry list for display await updateToastStep(for: server, step: .entitiesRegistryListForDisplay) do { let timer = ProfilingTimer("Step 2 (Entities Registry List For Display)") await updateEntitiesRegistryListForDisplay(server: server) timer.end() } if isUpdateCancelled() { return } // Step 3: Entities registry await updateToastStep(for: server, step: .entitiesRegistry) do { let timer = ProfilingTimer("Step 3 (Entities Registry)") await updateEntitiesRegistry(server: server) timer.end() } if isUpdateCancelled() { return } // Step 4: Devices registry await updateToastStep(for: server, step: .devicesRegistry) do { let timer = ProfilingTimer("Step 4 (Devices Registry)") await updateDevicesRegistry(server: server) timer.end() } if isUpdateCancelled() { return } // Step 5: Areas with their entities // IMPORTANT: This must be executed after entities and device registry // since we rely on that data to map entities to areas await updateToastStep(for: server, step: .areas) do { let timer = ProfilingTimer("Step 5 (Areas)") await updateAreasDatabase(server: server) timer.end() } totalTimer.end() Current.Log.info("✅ [Profiling] Full update for server \(server.info.name) completed") } /// Fetches entities' states from the API and forwards results to persistence. /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateEntitiesDatabase(server: Server) async { guard !isUpdateCancelled() else { return } await withCheckedContinuation { (continuation: CheckedContinuation) in guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume() return } // If cancelled after acquiring API, resume the continuation to avoid hanging. if self.isUpdateCancelled() { continuation.resume() return } api.connection.send(HATypedRequest<[HAEntity]>.fetchStates()) { result in switch result { case let .success(entities): Current.appEntitiesModel().updateModel(Set(entities), server: server) case let .failure(error): Current.Log.error("Failed to fetch states: \(error)") Current.clientEventStore.addEvent(.init( text: "Failed to fetch states on server \(server.info.name)", type: .networkRequest, payload: [ "error": error.localizedDescription, ] )) } continuation.resume() } } } /// Fetches entity registry from the API and forwards results to persistence. /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateEntitiesRegistry(server: Server) async { guard !isUpdateCancelled() else { return } let registryEntries: [EntityRegistryEntry]? = await withCheckedContinuation { (continuation: CheckedContinuation< [EntityRegistryEntry]?, Never >) in guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume(returning: nil) return } // If cancelled after acquiring API, resume the continuation to avoid hanging. if self.isUpdateCancelled() { continuation.resume(returning: nil) return } api.connection.send(.configEntityRegistryList()) { result in switch result { case let .success(entries): Current.Log.verbose("Successfully fetched entity registry for server \(server.info.name)") continuation.resume(returning: entries) case let .failure(error): Current.Log.error("Failed to fetch entity registry: \(error)") Current.clientEventStore.addEvent(.init( text: "Failed to fetch entity registry on server \(server.info.name)", type: .networkRequest, payload: [ "error": error.localizedDescription, ] )) continuation.resume(returning: nil) } } } if let registryEntries { await saveEntityRegistry(registryEntries, serverId: server.identifier.rawValue) } } /// Fetches device registry from the API and forwards results to persistence. /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateDevicesRegistry(server: Server) async { guard !isUpdateCancelled() else { return } let registryEntries: [DeviceRegistryEntry]? = await withCheckedContinuation { (continuation: CheckedContinuation< [DeviceRegistryEntry]?, Never >) in guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume(returning: nil) return } // If cancelled after acquiring API, resume the continuation to avoid hanging. if self.isUpdateCancelled() { continuation.resume(returning: nil) return } api.connection.send(.configDeviceRegistryList()) { result in switch result { case let .success(entries): Current.Log.verbose("Successfully fetched device registry for server \(server.info.name)") continuation.resume(returning: entries) case let .failure(error): Current.Log.error("Failed to fetch device registry: \(error)") Current.clientEventStore.addEvent(.init( text: "Failed to fetch device registry on server \(server.info.name)", type: .networkRequest, payload: [ "error": error.localizedDescription, ] )) continuation.resume(returning: nil) } } } if let registryEntries { await saveDeviceRegistry(registryEntries, serverId: server.identifier.rawValue) } } /// Fetches entity registry list-for-display from the API and forwards results to persistence. /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateEntitiesRegistryListForDisplay(server: Server) async { guard !isUpdateCancelled() else { return } let response: EntityRegistryListForDisplay? = await withCheckedContinuation { (continuation: CheckedContinuation< EntityRegistryListForDisplay?, Never >) in guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume(returning: nil) return } // If cancelled after acquiring API, resume the continuation to avoid hanging. if self.isUpdateCancelled() { continuation.resume(returning: nil) return } api.connection.send( HATypedRequest.configEntityRegistryListForDisplay() ) { result in switch result { case let .success(response): continuation.resume(returning: response) case let .failure(error): Current.Log.error("Failed to fetch EntityRegistryListForDisplay: \(error)") Current.clientEventStore.addEvent(.init( text: "Failed to fetch EntityRegistryListForDisplay on server \(server.info.name)", type: .networkRequest, payload: [ "error": error.localizedDescription, ] )) continuation.resume(returning: nil) } } } if let response { await saveEntityRegistryListForDisplay(response, serverId: server.identifier.rawValue) } } private func updateAreasDatabase(server: Server) async { // Ensure this work happens off the main thread await Task.detached(priority: .utility) { let fetchTimer = ProfilingTimer("Step 5.1: fetchAreasAndItsEntities") let areasAndEntities = await Current.areasProvider().fetchAreasAndItsEntities(for: server) fetchTimer.end() guard let areas = Current.areasProvider().areas[server.identifier.rawValue] else { Current.Log.verbose("No areas found for server \(server.info.name)") return } let saveTimer = ProfilingTimer("Step 5.2: saveAreasToDatabase (count: \(areas.count))") await self.saveAreasToDatabase( areas: areas, areasAndEntities: areasAndEntities, serverId: server.identifier.rawValue ) saveTimer.end() }.value } /// Persists areas and their entity relationships for a server. /// Deletes all existing areas for the server and inserts fresh data in a single transaction. private func saveAreasToDatabase( areas: [HAAreasRegistryResponse], areasAndEntities: [String: Set], serverId: String ) async { // Check for cancellation before starting database work guard !isUpdateCancelled() else { Current.Log.verbose("Skipping areas database save - task cancelled") return } // Ensure model building happens off the main thread let appAreas = await Task.detached(priority: .utility) { let modelTimer = ProfilingTimer("Step 5.2.1: Building AppArea models (count: \(areas.count))") let result = areas.map { area in AppArea( from: area, serverId: serverId, entities: areasAndEntities[area.areaId] ) } modelTimer.end() return result }.value do { let dbTimer = ProfilingTimer("Step 5.2.2: Database write transaction") try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in Current.database().asyncWrite { db in // Delete all existing areas for this server try AppArea .filter(Column(DatabaseTables.AppArea.serverId.rawValue) == serverId) .deleteAll(db) // Insert fresh areas for area in appAreas { try area.insert(db) } } completion: { _, result in switch result { case .success: continuation.resume() case let .failure(error): continuation.resume(throwing: error) } } } dbTimer.end() Current.Log.verbose("Successfully saved \(appAreas.count) areas for server \(serverId)") } catch is CancellationError { Current.Log.verbose("Areas database save cancelled for server \(serverId)") } catch { Current.Log.error("Failed to save areas in database, error: \(error.localizedDescription)") Current.clientEventStore.addEvent(.init( text: "Failed to save areas in database, error on serverId \(serverId)", type: .database, payload: [ "error": error.localizedDescription, ] )) assertionFailure("Failed to save areas in database: \(error)") } } /// Persists the entity registry list-for-display for a server. /// Deletes all existing records for the server and inserts fresh data in a single transaction. private func saveEntityRegistryListForDisplay(_ response: EntityRegistryListForDisplay, serverId: String) async { // Check for cancellation before starting database work guard !isUpdateCancelled() else { Current.Log.verbose("Skipping EntityRegistryListForDisplay database save - task cancelled") return } var entitiesListForDisplay: [AppEntityRegistryListForDisplay] = [] entitiesListForDisplay.reserveCapacity(response.entities.count) for registry in response.entities { if registry.decimalPlaces != nil || registry.entityCategory != nil { entitiesListForDisplay.append( AppEntityRegistryListForDisplay( id: ServerEntity.uniqueId(serverId: serverId, entityId: registry.entityId), serverId: serverId, entityId: registry.entityId, registry: registry ) ) } } do { try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self else { continuation.resume(throwing: CancellationError()) return } guard !isUpdateCancelled() else { continuation.resume(throwing: CancellationError()) return } Current.database().asyncWrite { [entitiesListForDisplay] db in // Delete all existing records for this server try AppEntityRegistryListForDisplay .filter(Column(DatabaseTables.AppEntityRegistryListForDisplay.serverId.rawValue) == serverId) .deleteAll(db) // Insert fresh records for record in entitiesListForDisplay { try record.insert(db) } } completion: { _, result in switch result { case .success: continuation.resume() case let .failure(error): continuation.resume(throwing: error) } } } } catch is CancellationError { Current.Log.verbose("EntityRegistryListForDisplay database save cancelled for server \(serverId)") } catch { Current.Log .error("Failed to save EntityRegistryListForDisplay in database, error: \(error.localizedDescription)") Current.clientEventStore.addEvent(.init( text: "Failed to save EntityRegistryListForDisplay in database, error on serverId \(serverId)", type: .database, payload: [ "error": error.localizedDescription, ] )) assertionFailure("Failed to save EntityRegistryListForDisplay in database: \(error)") } } /// Persists the entity registry for a server. /// Deletes all existing records for the server and inserts fresh data in a single transaction. private func saveEntityRegistry(_ registryEntries: [EntityRegistryEntry], serverId: String) async { // If cancelled before touching the DB, bail out early to avoid unnecessary work. guard !isUpdateCancelled() else { Current.Log.verbose("Skipping entity registry database save - task cancelled") return } let appEntityRegistries = registryEntries.map { entry in AppEntityRegistry(serverId: serverId, registry: entry) } do { try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self else { continuation.resume(throwing: CancellationError()) return } guard !isUpdateCancelled() else { continuation.resume(throwing: CancellationError()) return } Current.database().asyncWrite { db in // Delete all existing registry entries for this server try AppEntityRegistry .filter(Column(DatabaseTables.EntityRegistry.serverId.rawValue) == serverId) .deleteAll(db) // Insert fresh registry entries for registry in appEntityRegistries { try registry.insert(db) } } completion: { _, result in switch result { case .success: continuation.resume() case let .failure(error): continuation.resume(throwing: error) } } } Current.Log .verbose( "Successfully saved \(appEntityRegistries.count) entity registry entries for server \(serverId)" ) } catch is CancellationError { Current.Log.verbose("Entity registry database save cancelled for server \(serverId)") } catch { Current.Log.error("Failed to save entity registry in database, error: \(error.localizedDescription)") Current.clientEventStore.addEvent(.init( text: "Failed to save entity registry in database, error on serverId \(serverId)", type: .database, payload: [ "error": error.localizedDescription, ] )) assertionFailure("Failed to save entity registry in database: \(error)") } } /// Persists the device registry for a server. /// Deletes all existing records for the server and inserts fresh data in a single transaction. private func saveDeviceRegistry(_ registryEntries: [DeviceRegistryEntry], serverId: String) async { // If cancelled before touching the DB, bail out early to avoid unnecessary work. guard !isUpdateCancelled() else { Current.Log.verbose("Skipping device registry database save - task cancelled") return } let appDeviceRegistries = registryEntries.map { entry in AppDeviceRegistry(serverId: serverId, registry: entry) } do { try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self else { continuation.resume(throwing: CancellationError()) return } guard !isUpdateCancelled() else { continuation.resume(throwing: CancellationError()) return } Current.database().asyncWrite { db in // Delete all existing device registry entries for this server try AppDeviceRegistry .filter(Column(DatabaseTables.DeviceRegistry.serverId.rawValue) == serverId) .deleteAll(db) // Insert fresh registry entries for registry in appDeviceRegistries { try registry.insert(db) } } completion: { _, result in switch result { case .success: continuation.resume() case let .failure(error): continuation.resume(throwing: error) } } } Current.Log .verbose( "Successfully saved \(appDeviceRegistries.count) device registry entries for server \(serverId)" ) } catch is CancellationError { Current.Log.verbose("Device registry database save cancelled for server \(serverId)") } catch { Current.Log.error("Failed to save device registry in database, error: \(error.localizedDescription)") Current.clientEventStore.addEvent(.init( text: "Failed to save device registry in database, error on serverId \(serverId)", type: .database, payload: [ "error": error.localizedDescription, ] )) assertionFailure("Failed to save device registry in database: \(error)") } } // MARK: - Toast Management /// Generates a unique toast identifier for a server update. @MainActor private func toastId(for server: Server) -> String { "server-update-\(server.identifier.rawValue)" } /// Shows a toast notification indicating a server update is in progress. @MainActor private func showUpdateToast(for server: Server, step: UpdateStep? = nil) { if #available(iOS 18, *) { let message: String if let step { message = L10n.DatabaseUpdater.Toast.syncingWithProgress(step.rawValue, UpdateStep.totalSteps) } else { message = L10n.DatabaseUpdater.Toast.syncing } ToastManager.shared.show( id: toastId(for: server), symbol: "arrow.triangle.2.circlepath.circle.fill", symbolForegroundStyle: (.white, .blue), title: L10n.DatabaseUpdater.Toast.title(server.info.name), message: message ) } } /// Updates the toast notification with the current step. @MainActor private func updateToastStep(for server: Server, step: UpdateStep) { showUpdateToast(for: server, step: step) } /// Hides the toast notification for a completed server update. @MainActor private func hideUpdateToast(for server: Server) { if #available(iOS 18, *) { ToastManager.shared.hide(id: toastId(for: server)) } } } // MARK: - Profiling Helper /// A simple timing helper that works across iOS versions private struct ProfilingTimer { private let startTime: CFAbsoluteTime private let label: String init(_ label: String) { self.label = label self.startTime = CFAbsoluteTimeGetCurrent() Current.Log.info("🔍 [Profiling] \(label)") } func end() { let duration = CFAbsoluteTimeGetCurrent() - startTime Current.Log.info("⏱️ [Profiling] \(label) completed in \(String(format: "%.3f", duration))s") } }