From 931f0c65afb90d84afe11132439e73df73f658f0 Mon Sep 17 00:00:00 2001 From: Kyle Denney <4227399+kdenney@users.noreply.github.com> Date: Mon, 24 Nov 2025 16:11:52 -0600 Subject: [PATCH] [PM-28265] storage reconciliation job (#6615) --- src/Billing/Controllers/JobsController.cs | 36 + src/Billing/Jobs/AliveJob.cs | 9 + src/Billing/Jobs/JobsHostedService.cs | 77 ++- .../Jobs/ReconcileAdditionalStorageJob.cs | 207 ++++++ src/Billing/Services/IStripeFacade.cs | 5 + .../Services/Implementations/StripeFacade.cs | 6 + src/Core/Billing/Constants/StripeConstants.cs | 1 + src/Core/Constants.cs | 2 + .../RequireLowerEnvironmentAttribute.cs | 24 + .../ReconcileAdditionalStorageJobTests.cs | 641 ++++++++++++++++++ 10 files changed, 993 insertions(+), 15 deletions(-) create mode 100644 src/Billing/Controllers/JobsController.cs create mode 100644 src/Billing/Jobs/ReconcileAdditionalStorageJob.cs create mode 100644 src/Core/Utilities/RequireLowerEnvironmentAttribute.cs create mode 100644 test/Billing.Test/Jobs/ReconcileAdditionalStorageJobTests.cs diff --git a/src/Billing/Controllers/JobsController.cs b/src/Billing/Controllers/JobsController.cs new file mode 100644 index 0000000000..6a5e8e5531 --- /dev/null +++ b/src/Billing/Controllers/JobsController.cs @@ -0,0 +1,36 @@ +using Bit.Billing.Jobs; +using Bit.Core.Utilities; +using Microsoft.AspNetCore.Mvc; + +namespace Bit.Billing.Controllers; + +[Route("jobs")] +[SelfHosted(NotSelfHostedOnly = true)] +[RequireLowerEnvironment] +public class JobsController( + JobsHostedService jobsHostedService) : Controller +{ + [HttpPost("run/{jobName}")] + public async Task RunJobAsync(string jobName) + { + if (jobName == nameof(ReconcileAdditionalStorageJob)) + { + await jobsHostedService.RunJobAdHocAsync(); + return Ok(new { message = $"Job {jobName} scheduled successfully" }); + } + + return BadRequest(new { error = $"Unknown job name: {jobName}" }); + } + + [HttpPost("stop/{jobName}")] + public async Task StopJobAsync(string jobName) + { + if (jobName == nameof(ReconcileAdditionalStorageJob)) + { + await jobsHostedService.InterruptAdHocJobAsync(); + return Ok(new { message = $"Job {jobName} queued for cancellation" }); + } + + return BadRequest(new { error = $"Unknown job name: {jobName}" }); + } +} diff --git a/src/Billing/Jobs/AliveJob.cs b/src/Billing/Jobs/AliveJob.cs index 42f64099ac..1769cc94e2 100644 --- a/src/Billing/Jobs/AliveJob.cs +++ b/src/Billing/Jobs/AliveJob.cs @@ -10,4 +10,13 @@ public class AliveJob(ILogger logger) : BaseJob(logger) _logger.LogInformation(Core.Constants.BypassFiltersEventId, null, "Billing service is alive!"); return Task.FromResult(0); } + + public static ITrigger GetTrigger() + { + return TriggerBuilder.Create() + .WithIdentity("EveryTopOfTheHourTrigger") + .StartNow() + .WithCronSchedule("0 0 * * * ?") + .Build(); + } } diff --git a/src/Billing/Jobs/JobsHostedService.cs b/src/Billing/Jobs/JobsHostedService.cs index a6e702c662..25c57044da 100644 --- a/src/Billing/Jobs/JobsHostedService.cs +++ b/src/Billing/Jobs/JobsHostedService.cs @@ -1,29 +1,27 @@ -using Bit.Core.Jobs; +using Bit.Core.Exceptions; +using Bit.Core.Jobs; using Bit.Core.Settings; using Quartz; namespace Bit.Billing.Jobs; -public class JobsHostedService : BaseJobsHostedService +public class JobsHostedService( + GlobalSettings globalSettings, + IServiceProvider serviceProvider, + ILogger logger, + ILogger listenerLogger, + ISchedulerFactory schedulerFactory) + : BaseJobsHostedService(globalSettings, serviceProvider, logger, listenerLogger) { - public JobsHostedService( - GlobalSettings globalSettings, - IServiceProvider serviceProvider, - ILogger logger, - ILogger listenerLogger) - : base(globalSettings, serviceProvider, logger, listenerLogger) { } + private List AdHocJobKeys { get; } = []; + private IScheduler? _adHocScheduler; public override async Task StartAsync(CancellationToken cancellationToken) { - var everyTopOfTheHourTrigger = TriggerBuilder.Create() - .WithIdentity("EveryTopOfTheHourTrigger") - .StartNow() - .WithCronSchedule("0 0 * * * ?") - .Build(); - Jobs = new List> { - new Tuple(typeof(AliveJob), everyTopOfTheHourTrigger) + new(typeof(AliveJob), AliveJob.GetTrigger()), + new(typeof(ReconcileAdditionalStorageJob), ReconcileAdditionalStorageJob.GetTrigger()) }; await base.StartAsync(cancellationToken); @@ -33,5 +31,54 @@ public class JobsHostedService : BaseJobsHostedService { services.AddTransient(); services.AddTransient(); + services.AddTransient(); + // add this service as a singleton so we can inject it where needed + services.AddSingleton(); + services.AddHostedService(sp => sp.GetRequiredService()); + } + + public async Task InterruptAdHocJobAsync(CancellationToken cancellationToken = default) where T : class, IJob + { + if (_adHocScheduler == null) + { + throw new InvalidOperationException("AdHocScheduler is null, cannot interrupt ad-hoc job."); + } + + var jobKey = AdHocJobKeys.FirstOrDefault(j => j.Name == typeof(T).ToString()); + if (jobKey == null) + { + throw new NotFoundException($"Cannot find job key: {typeof(T)}, not running?"); + } + logger.LogInformation("CANCELLING ad-hoc job with key: {JobKey}", jobKey); + AdHocJobKeys.Remove(jobKey); + await _adHocScheduler.Interrupt(jobKey, cancellationToken); + } + + public async Task RunJobAdHocAsync(CancellationToken cancellationToken = default) where T : class, IJob + { + _adHocScheduler ??= await schedulerFactory.GetScheduler(cancellationToken); + + var jobKey = new JobKey(typeof(T).ToString()); + + var currentlyExecuting = await _adHocScheduler.GetCurrentlyExecutingJobs(cancellationToken); + if (currentlyExecuting.Any(j => j.JobDetail.Key.Equals(jobKey))) + { + throw new InvalidOperationException($"Job {jobKey} is already running"); + } + + AdHocJobKeys.Add(jobKey); + + var job = JobBuilder.Create() + .WithIdentity(jobKey) + .Build(); + + var trigger = TriggerBuilder.Create() + .WithIdentity(typeof(T).ToString()) + .StartNow() + .Build(); + + logger.LogInformation("Scheduling ad-hoc job with key: {JobKey}", jobKey); + + await _adHocScheduler.ScheduleJob(job, trigger, cancellationToken); } } diff --git a/src/Billing/Jobs/ReconcileAdditionalStorageJob.cs b/src/Billing/Jobs/ReconcileAdditionalStorageJob.cs new file mode 100644 index 0000000000..d891fc18ff --- /dev/null +++ b/src/Billing/Jobs/ReconcileAdditionalStorageJob.cs @@ -0,0 +1,207 @@ +using System.Globalization; +using System.Text.Json; +using Bit.Billing.Services; +using Bit.Core; +using Bit.Core.Billing.Constants; +using Bit.Core.Jobs; +using Bit.Core.Services; +using Quartz; +using Stripe; + +namespace Bit.Billing.Jobs; + +public class ReconcileAdditionalStorageJob( + IStripeFacade stripeFacade, + ILogger logger, + IFeatureService featureService) : BaseJob(logger) +{ + private const string _storageGbMonthlyPriceId = "storage-gb-monthly"; + private const string _storageGbAnnuallyPriceId = "storage-gb-annually"; + private const string _personalStorageGbAnnuallyPriceId = "personal-storage-gb-annually"; + private const int _storageGbToRemove = 4; + + protected override async Task ExecuteJobAsync(IJobExecutionContext context) + { + if (!featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob)) + { + logger.LogInformation("Skipping ReconcileAdditionalStorageJob, feature flag off."); + return; + } + + var liveMode = featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode); + + // Execution tracking + var subscriptionsFound = 0; + var subscriptionsUpdated = 0; + var subscriptionsWithErrors = 0; + var failures = new List(); + + logger.LogInformation("Starting ReconcileAdditionalStorageJob (live mode: {LiveMode})", liveMode); + + var priceIds = new[] { _storageGbMonthlyPriceId, _storageGbAnnuallyPriceId, _personalStorageGbAnnuallyPriceId }; + + foreach (var priceId in priceIds) + { + var options = new SubscriptionListOptions + { + Limit = 100, + Status = StripeConstants.SubscriptionStatus.Active, + Price = priceId + }; + + await foreach (var subscription in stripeFacade.ListSubscriptionsAutoPagingAsync(options)) + { + if (context.CancellationToken.IsCancellationRequested) + { + logger.LogWarning( + "Job cancelled!! Exiting. Progress at time of cancellation: Subscriptions found: {SubscriptionsFound}, " + + "Updated: {SubscriptionsUpdated}, Errors: {SubscriptionsWithErrors}{Failures}", + subscriptionsFound, + liveMode + ? subscriptionsUpdated + : $"(In live mode, would have updated) {subscriptionsUpdated}", + subscriptionsWithErrors, + failures.Count > 0 + ? $", Failures: {Environment.NewLine}{string.Join(Environment.NewLine, failures)}" + : string.Empty + ); + return; + } + + if (subscription == null) + { + continue; + } + + logger.LogInformation("Processing subscription: {SubscriptionId}", subscription.Id); + subscriptionsFound++; + + if (subscription.Metadata?.TryGetValue(StripeConstants.MetadataKeys.StorageReconciled2025, out var dateString) == true) + { + if (DateTime.TryParse(dateString, null, DateTimeStyles.RoundtripKind, out var dateProcessed)) + { + logger.LogInformation("Skipping subscription {SubscriptionId} - already processed on {Date}", + subscription.Id, + dateProcessed.ToString("f")); + continue; + } + } + + var updateOptions = BuildSubscriptionUpdateOptions(subscription, priceId); + + if (updateOptions == null) + { + logger.LogInformation("Skipping subscription {SubscriptionId} - no updates needed", subscription.Id); + continue; + } + + subscriptionsUpdated++; + + if (!liveMode) + { + logger.LogInformation( + "Not live mode (dry-run): Would have updated subscription {SubscriptionId} with item changes: {NewLine}{UpdateOptions}", + subscription.Id, + Environment.NewLine, + JsonSerializer.Serialize(updateOptions)); + continue; + } + + try + { + await stripeFacade.UpdateSubscription(subscription.Id, updateOptions); + logger.LogInformation("Successfully updated subscription: {SubscriptionId}", subscription.Id); + } + catch (Exception ex) + { + subscriptionsWithErrors++; + failures.Add($"Subscription {subscription.Id}: {ex.Message}"); + logger.LogError(ex, "Failed to update subscription {SubscriptionId}: {ErrorMessage}", + subscription.Id, ex.Message); + } + } + } + + logger.LogInformation( + "ReconcileAdditionalStorageJob completed. Subscriptions found: {SubscriptionsFound}, " + + "Updated: {SubscriptionsUpdated}, Errors: {SubscriptionsWithErrors}{Failures}", + subscriptionsFound, + liveMode + ? subscriptionsUpdated + : $"(In live mode, would have updated) {subscriptionsUpdated}", + subscriptionsWithErrors, + failures.Count > 0 + ? $", Failures: {Environment.NewLine}{string.Join(Environment.NewLine, failures)}" + : string.Empty + ); + } + + private SubscriptionUpdateOptions? BuildSubscriptionUpdateOptions( + Subscription subscription, + string targetPriceId) + { + if (subscription.Items?.Data == null) + { + return null; + } + + var updateOptions = new SubscriptionUpdateOptions + { + ProrationBehavior = StripeConstants.ProrationBehavior.CreateProrations, + Metadata = new Dictionary + { + [StripeConstants.MetadataKeys.StorageReconciled2025] = DateTime.UtcNow.ToString("o") + }, + Items = [] + }; + + var hasUpdates = false; + + foreach (var item in subscription.Items.Data.Where(item => item?.Price?.Id == targetPriceId)) + { + hasUpdates = true; + var currentQuantity = item.Quantity; + + if (currentQuantity > _storageGbToRemove) + { + var newQuantity = currentQuantity - _storageGbToRemove; + logger.LogInformation( + "Subscription {SubscriptionId}: reducing quantity from {CurrentQuantity} to {NewQuantity} for price {PriceId}", + subscription.Id, + currentQuantity, + newQuantity, + item.Price.Id); + + updateOptions.Items.Add(new SubscriptionItemOptions + { + Id = item.Id, + Quantity = newQuantity + }); + } + else + { + logger.LogInformation("Subscription {SubscriptionId}: deleting storage item with quantity {CurrentQuantity} for price {PriceId}", + subscription.Id, + currentQuantity, + item.Price.Id); + + updateOptions.Items.Add(new SubscriptionItemOptions + { + Id = item.Id, + Deleted = true + }); + } + } + + return hasUpdates ? updateOptions : null; + } + + public static ITrigger GetTrigger() + { + return TriggerBuilder.Create() + .WithIdentity("EveryMorningTrigger") + .StartNow() + .WithCronSchedule("0 0 16 * * ?") // 10am CST daily; the pods execute in UTC time + .Build(); + } +} diff --git a/src/Billing/Services/IStripeFacade.cs b/src/Billing/Services/IStripeFacade.cs index 280a3aca3c..90db4a4c82 100644 --- a/src/Billing/Services/IStripeFacade.cs +++ b/src/Billing/Services/IStripeFacade.cs @@ -78,6 +78,11 @@ public interface IStripeFacade RequestOptions requestOptions = null, CancellationToken cancellationToken = default); + IAsyncEnumerable ListSubscriptionsAutoPagingAsync( + SubscriptionListOptions options = null, + RequestOptions requestOptions = null, + CancellationToken cancellationToken = default); + Task GetSubscription( string subscriptionId, SubscriptionGetOptions subscriptionGetOptions = null, diff --git a/src/Billing/Services/Implementations/StripeFacade.cs b/src/Billing/Services/Implementations/StripeFacade.cs index eef7ce009e..7b714b4a8e 100644 --- a/src/Billing/Services/Implementations/StripeFacade.cs +++ b/src/Billing/Services/Implementations/StripeFacade.cs @@ -98,6 +98,12 @@ public class StripeFacade : IStripeFacade CancellationToken cancellationToken = default) => await _subscriptionService.ListAsync(options, requestOptions, cancellationToken); + public IAsyncEnumerable ListSubscriptionsAutoPagingAsync( + SubscriptionListOptions options = null, + RequestOptions requestOptions = null, + CancellationToken cancellationToken = default) => + _subscriptionService.ListAutoPagingAsync(options, requestOptions, cancellationToken); + public async Task GetSubscription( string subscriptionId, SubscriptionGetOptions subscriptionGetOptions = null, diff --git a/src/Core/Billing/Constants/StripeConstants.cs b/src/Core/Billing/Constants/StripeConstants.cs index 11f043fc69..c062351a91 100644 --- a/src/Core/Billing/Constants/StripeConstants.cs +++ b/src/Core/Billing/Constants/StripeConstants.cs @@ -65,6 +65,7 @@ public static class StripeConstants public const string Region = "region"; public const string RetiredBraintreeCustomerId = "btCustomerId_old"; public const string UserId = "userId"; + public const string StorageReconciled2025 = "storage_reconciled_2025"; } public static class PaymentBehavior diff --git a/src/Core/Constants.cs b/src/Core/Constants.cs index 0e5e7bf3ca..4f839f367b 100644 --- a/src/Core/Constants.cs +++ b/src/Core/Constants.cs @@ -197,6 +197,8 @@ public static class FeatureFlagKeys public const string PM26793_FetchPremiumPriceFromPricingService = "pm-26793-fetch-premium-price-from-pricing-service"; public const string PM23341_Milestone_2 = "pm-23341-milestone-2"; public const string PM26462_Milestone_3 = "pm-26462-milestone-3"; + public const string PM28265_EnableReconcileAdditionalStorageJob = "pm-28265-enable-reconcile-additional-storage-job"; + public const string PM28265_ReconcileAdditionalStorageJobEnableLiveMode = "pm-28265-reconcile-additional-storage-job-enable-live-mode"; /* Key Management Team */ public const string ReturnErrorOnExistingKeypair = "return-error-on-existing-keypair"; diff --git a/src/Core/Utilities/RequireLowerEnvironmentAttribute.cs b/src/Core/Utilities/RequireLowerEnvironmentAttribute.cs new file mode 100644 index 0000000000..a8208844a8 --- /dev/null +++ b/src/Core/Utilities/RequireLowerEnvironmentAttribute.cs @@ -0,0 +1,24 @@ +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Mvc; +using Microsoft.AspNetCore.Mvc.Filters; +using Microsoft.Extensions.Hosting; + +namespace Bit.Core.Utilities; + +/// +/// Authorization attribute that restricts controller/action access to Development and QA environments only. +/// Returns 404 Not Found in all other environments. +/// +public class RequireLowerEnvironmentAttribute() : TypeFilterAttribute(typeof(LowerEnvironmentFilter)) +{ + private class LowerEnvironmentFilter(IWebHostEnvironment environment) : IAuthorizationFilter + { + public void OnAuthorization(AuthorizationFilterContext context) + { + if (!environment.IsDevelopment() && !environment.IsEnvironment("QA")) + { + context.Result = new NotFoundResult(); + } + } + } +} diff --git a/test/Billing.Test/Jobs/ReconcileAdditionalStorageJobTests.cs b/test/Billing.Test/Jobs/ReconcileAdditionalStorageJobTests.cs new file mode 100644 index 0000000000..deb164f232 --- /dev/null +++ b/test/Billing.Test/Jobs/ReconcileAdditionalStorageJobTests.cs @@ -0,0 +1,641 @@ +using Bit.Billing.Jobs; +using Bit.Billing.Services; +using Bit.Core; +using Bit.Core.Billing.Constants; +using Bit.Core.Services; +using Microsoft.Extensions.Logging; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Quartz; +using Stripe; +using Xunit; + +namespace Bit.Billing.Test.Jobs; + +public class ReconcileAdditionalStorageJobTests +{ + private readonly IStripeFacade _stripeFacade; + private readonly ILogger _logger; + private readonly IFeatureService _featureService; + private readonly ReconcileAdditionalStorageJob _sut; + + public ReconcileAdditionalStorageJobTests() + { + _stripeFacade = Substitute.For(); + _logger = Substitute.For>(); + _featureService = Substitute.For(); + _sut = new ReconcileAdditionalStorageJob(_stripeFacade, _logger, _featureService); + } + + #region Feature Flag Tests + + [Fact] + public async Task Execute_FeatureFlagDisabled_SkipsProcessing() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob) + .Returns(false); + + // Act + await _sut.Execute(context); + + // Assert + _stripeFacade.DidNotReceiveWithAnyArgs().ListSubscriptionsAutoPagingAsync(); + } + + [Fact] + public async Task Execute_FeatureFlagEnabled_ProcessesSubscriptions() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob) + .Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode) + .Returns(false); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Empty()); + + // Act + await _sut.Execute(context); + + // Assert + _stripeFacade.Received(3).ListSubscriptionsAutoPagingAsync( + Arg.Is(o => o.Status == "active")); + } + + #endregion + + #region Dry Run Mode Tests + + [Fact] + public async Task Execute_DryRunMode_DoesNotUpdateSubscriptions() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(false); // Dry run ON + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10); + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.DidNotReceiveWithAnyArgs().UpdateSubscription(null!); + } + + [Fact] + public async Task Execute_DryRunModeDisabled_UpdatesSubscriptions() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); // Dry run OFF + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10); + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription( + "sub_123", + Arg.Is(o => o.Items.Count == 1)); + } + + #endregion + + #region Price ID Processing Tests + + [Fact] + public async Task Execute_ProcessesAllThreePriceIds() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(false); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Empty()); + + // Act + await _sut.Execute(context); + + // Assert + _stripeFacade.Received(1).ListSubscriptionsAutoPagingAsync( + Arg.Is(o => o.Price == "storage-gb-monthly")); + _stripeFacade.Received(1).ListSubscriptionsAutoPagingAsync( + Arg.Is(o => o.Price == "storage-gb-annually")); + _stripeFacade.Received(1).ListSubscriptionsAutoPagingAsync( + Arg.Is(o => o.Price == "personal-storage-gb-annually")); + } + + #endregion + + #region Already Processed Tests + + [Fact] + public async Task Execute_SubscriptionAlreadyProcessed_SkipsUpdate() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var metadata = new Dictionary + { + [StripeConstants.MetadataKeys.StorageReconciled2025] = DateTime.UtcNow.ToString("o") + }; + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10, metadata: metadata); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.DidNotReceiveWithAnyArgs().UpdateSubscription(null!); + } + + [Fact] + public async Task Execute_SubscriptionWithInvalidProcessedDate_ProcessesSubscription() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var metadata = new Dictionary + { + [StripeConstants.MetadataKeys.StorageReconciled2025] = "invalid-date" + }; + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10, metadata: metadata); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription("sub_123", Arg.Any()); + } + + [Fact] + public async Task Execute_SubscriptionWithoutMetadata_ProcessesSubscription() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10, metadata: null); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription("sub_123", Arg.Any()); + } + + #endregion + + #region Quantity Reduction Logic Tests + + [Fact] + public async Task Execute_QuantityGreaterThan4_ReducesBy4() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription( + "sub_123", + Arg.Is(o => + o.Items.Count == 1 && + o.Items[0].Quantity == 6 && + o.Items[0].Deleted != true)); + } + + [Fact] + public async Task Execute_QuantityEquals4_DeletesItem() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 4); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription( + "sub_123", + Arg.Is(o => + o.Items.Count == 1 && + o.Items[0].Deleted == true)); + } + + [Fact] + public async Task Execute_QuantityLessThan4_DeletesItem() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 2); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription( + "sub_123", + Arg.Is(o => + o.Items.Count == 1 && + o.Items[0].Deleted == true)); + } + + #endregion + + #region Update Options Tests + + [Fact] + public async Task Execute_UpdateOptions_SetsProrationBehaviorToCreateProrations() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription( + "sub_123", + Arg.Is(o => o.ProrationBehavior == StripeConstants.ProrationBehavior.CreateProrations)); + } + + [Fact] + public async Task Execute_UpdateOptions_SetsReconciledMetadata() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(subscription); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription( + "sub_123", + Arg.Is(o => + o.Metadata.ContainsKey(StripeConstants.MetadataKeys.StorageReconciled2025) && + !string.IsNullOrEmpty(o.Metadata[StripeConstants.MetadataKeys.StorageReconciled2025]))); + } + + #endregion + + #region Subscription Filtering Tests + + [Fact] + public async Task Execute_SubscriptionWithNoItems_SkipsUpdate() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = new Subscription + { + Id = "sub_123", + Items = null + }; + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.DidNotReceiveWithAnyArgs().UpdateSubscription(null!); + } + + [Fact] + public async Task Execute_SubscriptionWithDifferentPriceId_SkipsUpdate() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "different-price-id", quantity: 10); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.DidNotReceiveWithAnyArgs().UpdateSubscription(null!); + } + + [Fact] + public async Task Execute_NullSubscription_SkipsProcessing() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(null!)); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.DidNotReceiveWithAnyArgs().UpdateSubscription(null!); + } + + #endregion + + #region Multiple Subscriptions Tests + + [Fact] + public async Task Execute_MultipleSubscriptions_ProcessesAll() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription1 = CreateSubscription("sub_1", "storage-gb-monthly", quantity: 10); + var subscription2 = CreateSubscription("sub_2", "storage-gb-monthly", quantity: 5); + var subscription3 = CreateSubscription("sub_3", "storage-gb-monthly", quantity: 3); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription1, subscription2, subscription3)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(callInfo => callInfo.Arg() switch + { + "sub_1" => subscription1, + "sub_2" => subscription2, + "sub_3" => subscription3, + _ => null + }); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription("sub_1", Arg.Any()); + await _stripeFacade.Received(1).UpdateSubscription("sub_2", Arg.Any()); + await _stripeFacade.Received(1).UpdateSubscription("sub_3", Arg.Any()); + } + + [Fact] + public async Task Execute_MixedSubscriptionsWithProcessed_OnlyProcessesUnprocessed() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var processedMetadata = new Dictionary + { + [StripeConstants.MetadataKeys.StorageReconciled2025] = DateTime.UtcNow.ToString("o") + }; + + var subscription1 = CreateSubscription("sub_1", "storage-gb-monthly", quantity: 10); + var subscription2 = CreateSubscription("sub_2", "storage-gb-monthly", quantity: 5, metadata: processedMetadata); + var subscription3 = CreateSubscription("sub_3", "storage-gb-monthly", quantity: 3); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription1, subscription2, subscription3)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Returns(callInfo => callInfo.Arg() switch + { + "sub_1" => subscription1, + "sub_3" => subscription3, + _ => null + }); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription("sub_1", Arg.Any()); + await _stripeFacade.DidNotReceive().UpdateSubscription("sub_2", Arg.Any()); + await _stripeFacade.Received(1).UpdateSubscription("sub_3", Arg.Any()); + } + + #endregion + + #region Error Handling Tests + + [Fact] + public async Task Execute_UpdateFails_ContinuesProcessingOthers() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription1 = CreateSubscription("sub_1", "storage-gb-monthly", quantity: 10); + var subscription2 = CreateSubscription("sub_2", "storage-gb-monthly", quantity: 5); + var subscription3 = CreateSubscription("sub_3", "storage-gb-monthly", quantity: 3); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription1, subscription2, subscription3)); + + _stripeFacade.UpdateSubscription("sub_1", Arg.Any()) + .Returns(subscription1); + _stripeFacade.UpdateSubscription("sub_2", Arg.Any()) + .Throws(new Exception("Stripe API error")); + _stripeFacade.UpdateSubscription("sub_3", Arg.Any()) + .Returns(subscription3); + + // Act + await _sut.Execute(context); + + // Assert + await _stripeFacade.Received(1).UpdateSubscription("sub_1", Arg.Any()); + await _stripeFacade.Received(1).UpdateSubscription("sub_2", Arg.Any()); + await _stripeFacade.Received(1).UpdateSubscription("sub_3", Arg.Any()); + } + + [Fact] + public async Task Execute_UpdateFails_LogsError() + { + // Arrange + var context = CreateJobExecutionContext(); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription = CreateSubscription("sub_123", "storage-gb-monthly", quantity: 10); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription)); + _stripeFacade.UpdateSubscription(Arg.Any(), Arg.Any()) + .Throws(new Exception("Stripe API error")); + + // Act + await _sut.Execute(context); + + // Assert + _logger.Received().Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + #endregion + + #region Cancellation Tests + + [Fact] + public async Task Execute_CancellationRequested_LogsWarningAndExits() + { + // Arrange + var cts = new CancellationTokenSource(); + cts.Cancel(); // Cancel immediately + var context = CreateJobExecutionContext(cts.Token); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob).Returns(true); + _featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode).Returns(true); + + var subscription1 = CreateSubscription("sub_1", "storage-gb-monthly", quantity: 10); + + _stripeFacade.ListSubscriptionsAutoPagingAsync(Arg.Any()) + .Returns(AsyncEnumerable.Create(subscription1)); + + // Act + await _sut.Execute(context); + + // Assert - Should not process any subscriptions due to immediate cancellation + await _stripeFacade.DidNotReceiveWithAnyArgs().UpdateSubscription(null); + _logger.Received().Log( + LogLevel.Warning, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + #endregion + + #region Helper Methods + + private static IJobExecutionContext CreateJobExecutionContext(CancellationToken cancellationToken = default) + { + var context = Substitute.For(); + context.CancellationToken.Returns(cancellationToken); + return context; + } + + private static Subscription CreateSubscription( + string id, + string priceId, + long? quantity = null, + Dictionary? metadata = null) + { + var price = new Price { Id = priceId }; + var item = new SubscriptionItem + { + Id = $"si_{id}", + Price = price, + Quantity = quantity ?? 0 + }; + + return new Subscription + { + Id = id, + Metadata = metadata, + Items = new StripeList + { + Data = new List { item } + } + }; + } + + #endregion +} + +internal static class AsyncEnumerable +{ + public static async IAsyncEnumerable Create(params T[] items) + { + foreach (var item in items) + { + yield return item; + } + await Task.CompletedTask; + } + + public static async IAsyncEnumerable Empty() + { + await Task.CompletedTask; + yield break; + } +}