[PM-17562] Refactor event integration methods / declarations in ServiceCollectionExtensions (#6118)

* [PM-17562] Refactor event integration methods / declarations in ServiceCollectionExtensions

* Refactored ServiceCollectionExtensions to use TryAdd and still launch unique listeneer services

* Updated unit tests to match new generic format for Listeners

* Fix method spacing

* Update README to reflect new integration setup in ServiceCollectionExtensions

* Move interfaces to I prefix; fix typo in subscription

* Fix reference to IIntegrationListenerConfiguration
This commit is contained in:
Brant DeBow 2025-07-29 11:22:21 -04:00 committed by GitHub
parent 43372b7168
commit a84e5554fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 512 additions and 314 deletions

View File

@ -0,0 +1,38 @@
using Bit.Core.Enums;
using Bit.Core.Settings;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public class HecListenerConfiguration(GlobalSettings globalSettings)
: ListenerConfiguration(globalSettings), IIntegrationListenerConfiguration
{
public IntegrationType IntegrationType
{
get => IntegrationType.Hec;
}
public string EventQueueName
{
get => _globalSettings.EventLogging.RabbitMq.HecEventsQueueName;
}
public string IntegrationQueueName
{
get => _globalSettings.EventLogging.RabbitMq.HecIntegrationQueueName;
}
public string IntegrationRetryQueueName
{
get => _globalSettings.EventLogging.RabbitMq.HecIntegrationRetryQueueName;
}
public string EventSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.HecEventSubscriptionName;
}
public string IntegrationSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.HecIntegrationSubscriptionName;
}
}

View File

@ -0,0 +1,8 @@
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public interface IEventListenerConfiguration
{
public string EventQueueName { get; }
public string EventSubscriptionName { get; }
public string EventTopicName { get; }
}

View File

@ -0,0 +1,18 @@
using Bit.Core.Enums;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public interface IIntegrationListenerConfiguration : IEventListenerConfiguration
{
public IntegrationType IntegrationType { get; }
public string IntegrationQueueName { get; }
public string IntegrationRetryQueueName { get; }
public string IntegrationSubscriptionName { get; }
public string IntegrationTopicName { get; }
public int MaxRetries { get; }
public string RoutingKey
{
get => IntegrationType.ToRoutingKey();
}
}

View File

@ -0,0 +1,28 @@
using Bit.Core.Settings;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public abstract class ListenerConfiguration
{
protected GlobalSettings _globalSettings;
public ListenerConfiguration(GlobalSettings globalSettings)
{
_globalSettings = globalSettings;
}
public int MaxRetries
{
get => _globalSettings.EventLogging.MaxRetries;
}
public string EventTopicName
{
get => _globalSettings.EventLogging.AzureServiceBus.EventTopicName;
}
public string IntegrationTopicName
{
get => _globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName;
}
}

View File

@ -0,0 +1,17 @@
using Bit.Core.Settings;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public class RepositoryListenerConfiguration(GlobalSettings globalSettings)
: ListenerConfiguration(globalSettings), IEventListenerConfiguration
{
public string EventQueueName
{
get => _globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName;
}
public string EventSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName;
}
}

View File

@ -0,0 +1,38 @@
using Bit.Core.Enums;
using Bit.Core.Settings;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public class SlackListenerConfiguration(GlobalSettings globalSettings) :
ListenerConfiguration(globalSettings), IIntegrationListenerConfiguration
{
public IntegrationType IntegrationType
{
get => IntegrationType.Slack;
}
public string EventQueueName
{
get => _globalSettings.EventLogging.RabbitMq.SlackEventsQueueName;
}
public string IntegrationQueueName
{
get => _globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName;
}
public string IntegrationRetryQueueName
{
get => _globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName;
}
public string EventSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.SlackEventSubscriptionName;
}
public string IntegrationSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.SlackIntegrationSubscriptionName;
}
}

View File

@ -0,0 +1,38 @@
using Bit.Core.Enums;
using Bit.Core.Settings;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public class WebhookListenerConfiguration(GlobalSettings globalSettings)
: ListenerConfiguration(globalSettings), IIntegrationListenerConfiguration
{
public IntegrationType IntegrationType
{
get => IntegrationType.Webhook;
}
public string EventQueueName
{
get => _globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName;
}
public string IntegrationQueueName
{
get => _globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName;
}
public string IntegrationRetryQueueName
{
get => _globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName;
}
public string EventSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.WebhookEventSubscriptionName;
}
public string IntegrationSubscriptionName
{
get => _globalSettings.EventLogging.AzureServiceBus.WebhookIntegrationSubscriptionName;
}
}

View File

@ -2,27 +2,26 @@
using System.Text;
using Azure.Messaging.ServiceBus;
using Bit.Core.Settings;
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
using Microsoft.Extensions.Logging;
namespace Bit.Core.Services;
public class AzureServiceBusEventListenerService : EventLoggingListenerService
public class AzureServiceBusEventListenerService<TConfiguration> : EventLoggingListenerService
where TConfiguration : IEventListenerConfiguration
{
private readonly ServiceBusProcessor _processor;
public AzureServiceBusEventListenerService(
TConfiguration configuration,
IEventMessageHandler handler,
IAzureServiceBusService serviceBusService,
string subscriptionName,
GlobalSettings globalSettings,
ILogger<AzureServiceBusEventListenerService> logger) : base(handler, logger)
ILogger<AzureServiceBusEventListenerService<TConfiguration>> logger) : base(handler, logger)
{
_processor = serviceBusService.CreateProcessor(
globalSettings.EventLogging.AzureServiceBus.EventTopicName,
subscriptionName,
topicName: configuration.EventTopicName,
subscriptionName: configuration.EventSubscriptionName,
new ServiceBusProcessorOptions());
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)

View File

@ -1,32 +1,36 @@
#nullable enable
using Azure.Messaging.ServiceBus;
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Bit.Core.Services;
public class AzureServiceBusIntegrationListenerService : BackgroundService
public class AzureServiceBusIntegrationListenerService<TConfiguration> : BackgroundService
where TConfiguration : IIntegrationListenerConfiguration
{
private readonly int _maxRetries;
private readonly IAzureServiceBusService _serviceBusService;
private readonly IIntegrationHandler _handler;
private readonly ServiceBusProcessor _processor;
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger;
private readonly ILogger<AzureServiceBusIntegrationListenerService<TConfiguration>> _logger;
public AzureServiceBusIntegrationListenerService(IIntegrationHandler handler,
string topicName,
string subscriptionName,
int maxRetries,
public AzureServiceBusIntegrationListenerService(
TConfiguration configuration,
IIntegrationHandler handler,
IAzureServiceBusService serviceBusService,
ILogger<AzureServiceBusIntegrationListenerService> logger)
ILogger<AzureServiceBusIntegrationListenerService<TConfiguration>> logger)
{
_handler = handler;
_logger = logger;
_maxRetries = maxRetries;
_maxRetries = configuration.MaxRetries;
_serviceBusService = serviceBusService;
_processor = _serviceBusService.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
_processor = _serviceBusService.CreateProcessor(
topicName: configuration.IntegrationTopicName,
subscriptionName: configuration.IntegrationSubscriptionName,
options: new ServiceBusProcessorOptions());
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)

View File

@ -399,35 +399,44 @@ These names added here are what must match the values provided in the secrets or
in Global Settings. This must be in place (and the local ASB emulator restarted) before you can use any
code locally that accesses ASB resources.
## ListenerConfiguration
New integrations will need their own subclass of `ListenerConfiguration` which also conforms to
`IIntegrationListenerConfiguration`. This class provides a way of accessing the previously configured
RabbitMQ queues and ASB subscriptions by referring to the values created in `GlobalSettings`. This new
listener configuration will be used to type the listener and provide the means to access the necessary
configurations for the integration.
## ServiceCollectionExtensions
In our `ServiceCollectionExtensions`, we pull all the above pieces together to start listeners on each message
tier with handlers to process the integration. There are a number of helper methods in here to make this simple
to add a new integration - one call per platform.
tier with handlers to process the integration.
Also note that if an integration needs a custom singleton / service defined, the add listeners method is a
good place to set that up. For instance, `SlackIntegrationHandler` needs a `SlackService`, so the singleton
declaration is right above the add integration method for slack. Same thing for webhooks when it comes to
defining a custom HttpClient by name.
The core method for all event integration setup is `AddEventIntegrationServices`. This method is called by
both of the add listeners methods, which ensures that we have one common place to set up cross-messaging-platform
dependencies and integrations. For instance, `SlackIntegrationHandler` needs a `SlackService`, so
`AddEventIntegrationServices` has a call to `AddSlackService`. Same thing for webhooks when it
comes to defining a custom HttpClient by name.
1. In `AddEventIntegrationServices` create the listener configuration:
1. In `AddRabbitMqListeners` add the integration:
``` csharp
services.AddRabbitMqIntegration<ExampleIntegrationConfigurationDetails, ExampleIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.ExampleEventsQueueName,
globalSettings.EventLogging.RabbitMq.ExampleIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.ExampleIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.MaxRetries,
IntegrationType.Example);
var exampleConfiguration = new ExampleListenerConfiguration(globalSettings);
```
2. In `AddAzureServiceBusListeners` add the integration:
2. Add the integration to both the RabbitMQ and ASB specific declarations:
``` csharp
services.AddAzureServiceBusIntegration<ExampleIntegrationConfigurationDetails, ExampleIntegrationHandler>(
eventSubscriptionName: globalSettings.EventLogging.AzureServiceBus.ExampleEventSubscriptionName,
integrationSubscriptionName: globalSettings.EventLogging.AzureServiceBus.ExampleIntegrationSubscriptionName,
integrationType: IntegrationType.Example,
globalSettings: globalSettings);
services.AddRabbitMqIntegration<ExampleIntegrationConfigurationDetails, ExampleListenerConfiguration>(exampleConfiguration);
```
and
``` csharp
services.AddAzureServiceBusIntegration<ExampleIntegrationConfigurationDetails, ExampleListenerConfiguration>(exampleConfiguration);
```
# Deploying a new integration
## RabbitMQ

View File

@ -1,13 +1,15 @@
#nullable enable
using System.Text;
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public class RabbitMqEventListenerService : EventLoggingListenerService
public class RabbitMqEventListenerService<TConfiguration> : EventLoggingListenerService
where TConfiguration : IEventListenerConfiguration
{
private readonly Lazy<Task<IChannel>> _lazyChannel;
private readonly string _queueName;
@ -15,12 +17,11 @@ public class RabbitMqEventListenerService : EventLoggingListenerService
public RabbitMqEventListenerService(
IEventMessageHandler handler,
string queueName,
TConfiguration configuration,
IRabbitMqService rabbitMqService,
ILogger<RabbitMqEventListenerService> logger) : base(handler, logger)
ILogger<RabbitMqEventListenerService<TConfiguration>> logger) : base(handler, logger)
{
_logger = logger;
_queueName = queueName;
_queueName = configuration.EventQueueName;
_rabbitMqService = rabbitMqService;
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
}

View File

@ -10,7 +10,8 @@ using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public class RabbitMqIntegrationListenerService : BackgroundService
public class RabbitMqIntegrationListenerService<TConfiguration> : BackgroundService
where TConfiguration : IIntegrationListenerConfiguration
{
private readonly int _maxRetries;
private readonly string _queueName;
@ -19,26 +20,24 @@ public class RabbitMqIntegrationListenerService : BackgroundService
private readonly IIntegrationHandler _handler;
private readonly Lazy<Task<IChannel>> _lazyChannel;
private readonly IRabbitMqService _rabbitMqService;
private readonly ILogger<RabbitMqIntegrationListenerService> _logger;
private readonly ILogger<RabbitMqIntegrationListenerService<TConfiguration>> _logger;
private readonly TimeProvider _timeProvider;
public RabbitMqIntegrationListenerService(IIntegrationHandler handler,
string routingKey,
string queueName,
string retryQueueName,
int maxRetries,
public RabbitMqIntegrationListenerService(
IIntegrationHandler handler,
TConfiguration configuration,
IRabbitMqService rabbitMqService,
ILogger<RabbitMqIntegrationListenerService> logger,
ILogger<RabbitMqIntegrationListenerService<TConfiguration>> logger,
TimeProvider timeProvider)
{
_handler = handler;
_routingKey = routingKey;
_retryQueueName = retryQueueName;
_queueName = queueName;
_maxRetries = configuration.MaxRetries;
_routingKey = configuration.RoutingKey;
_retryQueueName = configuration.IntegrationRetryQueueName;
_queueName = configuration.IntegrationQueueName;
_rabbitMqService = rabbitMqService;
_logger = logger;
_timeProvider = timeProvider;
_maxRetries = maxRetries;
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
}

View File

@ -288,6 +288,7 @@ public class GlobalSettings : IGlobalSettings
public AzureServiceBusSettings AzureServiceBus { get; set; } = new AzureServiceBusSettings();
public RabbitMqSettings RabbitMq { get; set; } = new RabbitMqSettings();
public int IntegrationCacheRefreshIntervalMinutes { get; set; } = 10;
public int MaxRetries { get; set; } = 3;
public class AzureServiceBusSettings
{
@ -295,7 +296,6 @@ public class GlobalSettings : IGlobalSettings
private string _eventTopicName;
private string _integrationTopicName;
public int MaxRetries { get; set; } = 3;
public virtual string EventRepositorySubscriptionName { get; set; } = "events-write-subscription";
public virtual string SlackEventSubscriptionName { get; set; } = "events-slack-subscription";
public virtual string SlackIntegrationSubscriptionName { get; set; } = "integration-slack-subscription";
@ -331,7 +331,6 @@ public class GlobalSettings : IGlobalSettings
private string _eventExchangeName;
private string _integrationExchangeName;
public int MaxRetries { get; set; } = 3;
public int RetryTiming { get; set; } = 30000; // 30s
public bool UseDelayPlugin { get; set; } = false;
public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue";

View File

@ -373,7 +373,6 @@ public static class ServiceCollectionExtensions
services.AddSingleton<IPushRegistrationService, NoopPushRegistrationService>();
services.AddSingleton<IAttachmentStorageService, NoopAttachmentStorageService>();
services.AddSingleton<ILicensingService, NoopLicensingService>();
services.AddSingleton<IEventWriteService, NoopEventWriteService>();
}
public static IdentityBuilder AddCustomIdentityServices(
@ -550,198 +549,57 @@ public static class ServiceCollectionExtensions
{
if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString))
{
services.AddKeyedSingleton<IEventWriteService, AzureQueueEventWriteService>("storage");
services.TryAddKeyedSingleton<IEventWriteService, AzureQueueEventWriteService>("storage");
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
{
services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
services.AddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
services.TryAddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
services.TryAddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
services.TryAddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else if (globalSettings.SelfHosted)
{
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("storage");
services.TryAddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("storage");
if (IsRabbitMqEnabled(globalSettings))
{
services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
services.AddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
services.TryAddSingleton<IEventIntegrationPublisher, RabbitMqService>();
services.TryAddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
services.TryAddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("storage");
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
services.TryAddKeyedSingleton<IEventWriteService, NoopEventWriteService>("storage");
services.TryAddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
services.AddScoped<IEventWriteService, EventRouteService>();
return services;
}
private static IServiceCollection AddAzureServiceBusEventRepositoryListener(this IServiceCollection services, GlobalSettings globalSettings)
{
services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>();
services.AddSingleton<AzureTableStorageEventHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
handler: provider.GetRequiredService<AzureTableStorageEventHandler>(),
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName,
globalSettings: globalSettings,
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>()
)
);
return services;
}
private static IServiceCollection AddAzureServiceBusIntegration<TConfig, THandler>(
this IServiceCollection services,
string eventSubscriptionName,
string integrationSubscriptionName,
IntegrationType integrationType,
GlobalSettings globalSettings)
where TConfig : class
where THandler : class, IIntegrationHandler<TConfig>
{
var routingKey = integrationType.ToRoutingKey();
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
new EventIntegrationHandler<TConfig>(
integrationType,
provider.GetRequiredService<IEventIntegrationPublisher>(),
provider.GetRequiredService<IIntegrationFilterService>(),
provider.GetRequiredService<IIntegrationConfigurationDetailsCache>(),
provider.GetRequiredService<IUserRepository>(),
provider.GetRequiredService<IOrganizationRepository>(),
provider.GetRequiredService<ILogger<EventIntegrationHandler<TConfig>>>()));
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
subscriptionName: eventSubscriptionName,
globalSettings: globalSettings,
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>()
)
);
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusIntegrationListenerService(
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
topicName: globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName,
subscriptionName: integrationSubscriptionName,
maxRetries: globalSettings.EventLogging.AzureServiceBus.MaxRetries,
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
logger: provider.GetRequiredService<ILogger<AzureServiceBusIntegrationListenerService>>()));
services.TryAddScoped<IEventWriteService, EventRouteService>();
return services;
}
public static IServiceCollection AddAzureServiceBusListeners(this IServiceCollection services, GlobalSettings globalSettings)
{
if (!CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) ||
!CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
if (!IsAzureServiceBusEnabled(globalSettings))
{
return services;
}
services.AddSingleton<IntegrationConfigurationDetailsCacheService>();
services.AddSingleton<IIntegrationConfigurationDetailsCache>(provider =>
provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
services.AddHostedService(provider => provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
services.AddSingleton<IIntegrationFilterService, IntegrationFilterService>();
services.AddSingleton<IAzureServiceBusService, AzureServiceBusService>();
services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
services.AddAzureServiceBusEventRepositoryListener(globalSettings);
services.TryAddSingleton<IAzureServiceBusService, AzureServiceBusService>();
services.TryAddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
services.TryAddSingleton<IEventRepository, TableStorageRepos.EventRepository>();
services.TryAddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.TryAddSingleton<AzureTableStorageEventHandler>();
services.AddSlackService(globalSettings);
services.AddAzureServiceBusIntegration<SlackIntegrationConfigurationDetails, SlackIntegrationHandler>(
eventSubscriptionName: globalSettings.EventLogging.AzureServiceBus.SlackEventSubscriptionName,
integrationSubscriptionName: globalSettings.EventLogging.AzureServiceBus.SlackIntegrationSubscriptionName,
integrationType: IntegrationType.Slack,
globalSettings: globalSettings);
services.TryAddSingleton(TimeProvider.System);
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
services.AddAzureServiceBusIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
eventSubscriptionName: globalSettings.EventLogging.AzureServiceBus.WebhookEventSubscriptionName,
integrationSubscriptionName: globalSettings.EventLogging.AzureServiceBus.WebhookIntegrationSubscriptionName,
integrationType: IntegrationType.Webhook,
globalSettings: globalSettings);
services.AddAzureServiceBusIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
eventSubscriptionName: globalSettings.EventLogging.AzureServiceBus.HecEventSubscriptionName,
integrationSubscriptionName: globalSettings.EventLogging.AzureServiceBus.HecIntegrationSubscriptionName,
integrationType: IntegrationType.Hec,
globalSettings: globalSettings);
return services;
}
private static IServiceCollection AddRabbitMqEventRepositoryListener(this IServiceCollection services, GlobalSettings globalSettings)
{
services.AddSingleton<EventRepositoryHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<EventRepositoryHandler>(),
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName,
provider.GetRequiredService<IRabbitMqService>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>()));
return services;
}
private static IServiceCollection AddRabbitMqIntegration<TConfig, THandler>(this IServiceCollection services,
string eventQueueName,
string integrationQueueName,
string integrationRetryQueueName,
int maxRetries,
IntegrationType integrationType)
where TConfig : class
where THandler : class, IIntegrationHandler<TConfig>
{
var routingKey = integrationType.ToRoutingKey();
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
new EventIntegrationHandler<TConfig>(
integrationType,
provider.GetRequiredService<IEventIntegrationPublisher>(),
provider.GetRequiredService<IIntegrationFilterService>(),
provider.GetRequiredService<IIntegrationConfigurationDetailsCache>(),
provider.GetRequiredService<IUserRepository>(),
provider.GetRequiredService<IOrganizationRepository>(),
provider.GetRequiredService<ILogger<EventIntegrationHandler<TConfig>>>()));
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
eventQueueName,
provider.GetRequiredService<IRabbitMqService>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>()));
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
services.AddSingleton<IHostedService>(provider =>
new RabbitMqIntegrationListenerService(
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
routingKey: routingKey,
queueName: integrationQueueName,
retryQueueName: integrationRetryQueueName,
maxRetries: maxRetries,
rabbitMqService: provider.GetRequiredService<IRabbitMqService>(),
logger: provider.GetRequiredService<ILogger<RabbitMqIntegrationListenerService>>(),
timeProvider: provider.GetRequiredService<TimeProvider>()));
services.AddEventIntegrationServices(globalSettings);
return services;
}
@ -753,49 +611,15 @@ public static class ServiceCollectionExtensions
return services;
}
services.AddSingleton<IntegrationConfigurationDetailsCacheService>();
services.AddSingleton<IIntegrationConfigurationDetailsCache>(provider =>
provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
services.AddHostedService(provider => provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
services.AddSingleton<IIntegrationFilterService, IntegrationFilterService>();
services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
services.AddRabbitMqEventRepositoryListener(globalSettings);
services.TryAddSingleton<IRabbitMqService, RabbitMqService>();
services.TryAddSingleton<IEventIntegrationPublisher, RabbitMqService>();
services.TryAddSingleton<EventRepositoryHandler>();
services.AddSlackService(globalSettings);
services.AddRabbitMqIntegration<SlackIntegrationConfigurationDetails, SlackIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.SlackEventsQueueName,
globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.MaxRetries,
IntegrationType.Slack);
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName,
globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.MaxRetries,
IntegrationType.Webhook);
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.HecEventsQueueName,
globalSettings.EventLogging.RabbitMq.HecIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.HecIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.MaxRetries,
IntegrationType.Hec);
services.AddEventIntegrationServices(globalSettings);
return services;
}
private static bool IsRabbitMqEnabled(GlobalSettings settings)
{
return CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.EventExchangeName);
}
public static IServiceCollection AddSlackService(this IServiceCollection services, GlobalSettings globalSettings)
{
if (CoreHelpers.SettingHasValue(globalSettings.Slack.ClientId) &&
@ -803,11 +627,11 @@ public static class ServiceCollectionExtensions
CoreHelpers.SettingHasValue(globalSettings.Slack.Scopes))
{
services.AddHttpClient(SlackService.HttpClientName);
services.AddSingleton<ISlackService, SlackService>();
services.TryAddSingleton<ISlackService, SlackService>();
}
else
{
services.AddSingleton<ISlackService, NoopSlackService>();
services.TryAddSingleton<ISlackService, NoopSlackService>();
}
return services;
@ -1043,4 +867,161 @@ public static class ServiceCollectionExtensions
return (provider, connectionString);
}
private static IServiceCollection AddAzureServiceBusIntegration<TConfig, TListenerConfig>(this IServiceCollection services,
TListenerConfig listenerConfiguration)
where TConfig : class
where TListenerConfig : IIntegrationListenerConfiguration
{
services.TryAddKeyedSingleton<IEventMessageHandler>(serviceKey: listenerConfiguration.RoutingKey, implementationFactory: (provider, _) =>
new EventIntegrationHandler<TConfig>(
integrationType: listenerConfiguration.IntegrationType,
eventIntegrationPublisher: provider.GetRequiredService<IEventIntegrationPublisher>(),
integrationFilterService: provider.GetRequiredService<IIntegrationFilterService>(),
configurationCache: provider.GetRequiredService<IIntegrationConfigurationDetailsCache>(),
userRepository: provider.GetRequiredService<IUserRepository>(),
organizationRepository: provider.GetRequiredService<IOrganizationRepository>(),
logger: provider.GetRequiredService<ILogger<EventIntegrationHandler<TConfig>>>()
)
);
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService,
AzureServiceBusEventListenerService<TListenerConfig>>(provider =>
new AzureServiceBusEventListenerService<TListenerConfig>(
configuration: listenerConfiguration,
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(serviceKey: listenerConfiguration.RoutingKey),
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService<TListenerConfig>>>()
)
)
);
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService,
AzureServiceBusIntegrationListenerService<TListenerConfig>>(provider =>
new AzureServiceBusIntegrationListenerService<TListenerConfig>(
configuration: listenerConfiguration,
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
logger: provider.GetRequiredService<ILogger<AzureServiceBusIntegrationListenerService<TListenerConfig>>>()
)
)
);
return services;
}
private static IServiceCollection AddEventIntegrationServices(this IServiceCollection services,
GlobalSettings globalSettings)
{
// Add common services
services.TryAddSingleton<IntegrationConfigurationDetailsCacheService>();
services.TryAddSingleton<IIntegrationConfigurationDetailsCache>(provider =>
provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
services.AddHostedService(provider => provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
services.TryAddSingleton<IIntegrationFilterService, IntegrationFilterService>();
services.TryAddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
// Add services in support of handlers
services.AddSlackService(globalSettings);
services.TryAddSingleton(TimeProvider.System);
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
// Add integration handlers
services.TryAddSingleton<IIntegrationHandler<SlackIntegrationConfigurationDetails>, SlackIntegrationHandler>();
services.TryAddSingleton<IIntegrationHandler<WebhookIntegrationConfigurationDetails>, WebhookIntegrationHandler>();
var repositoryConfiguration = new RepositoryListenerConfiguration(globalSettings);
var slackConfiguration = new SlackListenerConfiguration(globalSettings);
var webhookConfiguration = new WebhookListenerConfiguration(globalSettings);
var hecConfiguration = new HecListenerConfiguration(globalSettings);
if (IsRabbitMqEnabled(globalSettings))
{
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService,
RabbitMqEventListenerService<RepositoryListenerConfiguration>>(provider =>
new RabbitMqEventListenerService<RepositoryListenerConfiguration>(
handler: provider.GetRequiredService<EventRepositoryHandler>(),
configuration: repositoryConfiguration,
rabbitMqService: provider.GetRequiredService<IRabbitMqService>(),
logger: provider.GetRequiredService<ILogger<RabbitMqEventListenerService<RepositoryListenerConfiguration>>>()
)
)
);
services.AddRabbitMqIntegration<SlackIntegrationConfigurationDetails, SlackListenerConfiguration>(slackConfiguration);
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookListenerConfiguration>(webhookConfiguration);
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, HecListenerConfiguration>(hecConfiguration);
}
if (IsAzureServiceBusEnabled(globalSettings))
{
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService,
AzureServiceBusEventListenerService<RepositoryListenerConfiguration>>(provider =>
new AzureServiceBusEventListenerService<RepositoryListenerConfiguration>(
configuration: repositoryConfiguration,
handler: provider.GetRequiredService<AzureTableStorageEventHandler>(),
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService<RepositoryListenerConfiguration>>>()
)
)
);
services.AddAzureServiceBusIntegration<SlackIntegrationConfigurationDetails, SlackListenerConfiguration>(slackConfiguration);
services.AddAzureServiceBusIntegration<WebhookIntegrationConfigurationDetails, WebhookListenerConfiguration>(webhookConfiguration);
services.AddAzureServiceBusIntegration<WebhookIntegrationConfigurationDetails, HecListenerConfiguration>(hecConfiguration);
}
return services;
}
private static IServiceCollection AddRabbitMqIntegration<TConfig, TListenerConfig>(this IServiceCollection services,
TListenerConfig listenerConfiguration)
where TConfig : class
where TListenerConfig : IIntegrationListenerConfiguration
{
services.TryAddKeyedSingleton<IEventMessageHandler>(serviceKey: listenerConfiguration.RoutingKey, implementationFactory: (provider, _) =>
new EventIntegrationHandler<TConfig>(
integrationType: listenerConfiguration.IntegrationType,
eventIntegrationPublisher: provider.GetRequiredService<IEventIntegrationPublisher>(),
integrationFilterService: provider.GetRequiredService<IIntegrationFilterService>(),
configurationCache: provider.GetRequiredService<IIntegrationConfigurationDetailsCache>(),
userRepository: provider.GetRequiredService<IUserRepository>(),
organizationRepository: provider.GetRequiredService<IOrganizationRepository>(),
logger: provider.GetRequiredService<ILogger<EventIntegrationHandler<TConfig>>>()
)
);
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService,
RabbitMqEventListenerService<TListenerConfig>>(provider =>
new RabbitMqEventListenerService<TListenerConfig>(
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(serviceKey: listenerConfiguration.RoutingKey),
configuration: listenerConfiguration,
rabbitMqService: provider.GetRequiredService<IRabbitMqService>(),
logger: provider.GetRequiredService<ILogger<RabbitMqEventListenerService<TListenerConfig>>>()
)
)
);
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService,
RabbitMqIntegrationListenerService<TListenerConfig>>(provider =>
new RabbitMqIntegrationListenerService<TListenerConfig>(
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
configuration: listenerConfiguration,
rabbitMqService: provider.GetRequiredService<IRabbitMqService>(),
logger: provider.GetRequiredService<ILogger<RabbitMqIntegrationListenerService<TListenerConfig>>>(),
timeProvider: provider.GetRequiredService<TimeProvider>()
)
)
);
return services;
}
private static bool IsAzureServiceBusEnabled(GlobalSettings settings)
{
return CoreHelpers.SettingHasValue(settings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(settings.EventLogging.AzureServiceBus.EventTopicName);
}
private static bool IsRabbitMqEnabled(GlobalSettings settings)
{
return CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.EventExchangeName);
}
}

View File

@ -0,0 +1,16 @@
using Bit.Core.Enums;
namespace Bit.Core.AdminConsole.Models.Data.EventIntegrations;
public class TestListenerConfiguration : IIntegrationListenerConfiguration
{
public string EventQueueName => "event_queue";
public string EventSubscriptionName => "event_subscription";
public string EventTopicName => "event_topic";
public IntegrationType IntegrationType => IntegrationType.Webhook;
public string IntegrationQueueName => "integration_queue";
public string IntegrationRetryQueueName => "integration_retry_queue";
public string IntegrationSubscriptionName => "integration_subscription";
public string IntegrationTopicName => "integration_topic";
public int MaxRetries => 3;
}

View File

@ -1,5 +1,6 @@
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
@ -14,20 +15,28 @@ namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class AzureServiceBusEventListenerServiceTests
{
private readonly IEventMessageHandler _handler = Substitute.For<IEventMessageHandler>();
private readonly ILogger<AzureServiceBusEventListenerService> _logger =
Substitute.For<ILogger<AzureServiceBusEventListenerService>>();
private const string _messageId = "messageId";
private readonly TestListenerConfiguration _config = new();
private SutProvider<AzureServiceBusEventListenerService> GetSutProvider()
private SutProvider<AzureServiceBusEventListenerService<TestListenerConfiguration>> GetSutProvider()
{
return new SutProvider<AzureServiceBusEventListenerService>()
.SetDependency(_handler)
.SetDependency(_logger)
.SetDependency("test-subscription", "subscriptionName")
return new SutProvider<AzureServiceBusEventListenerService<TestListenerConfiguration>>()
.SetDependency(_config)
.Create();
}
[Fact]
public void Constructor_CreatesProcessor()
{
var sutProvider = GetSutProvider();
sutProvider.GetDependency<IAzureServiceBusService>().Received(1).CreateProcessor(
Arg.Is(_config.EventTopicName),
Arg.Is(_config.EventSubscriptionName),
Arg.Any<ServiceBusProcessorOptions>()
);
}
[Theory, BitAutoData]
public async Task ProcessErrorAsync_LogsError(ProcessErrorEventArgs args)
{
@ -35,7 +44,7 @@ public class AzureServiceBusEventListenerServiceTests
await sutProvider.Sut.ProcessErrorAsync(args);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<AzureServiceBusEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
@ -49,7 +58,7 @@ public class AzureServiceBusEventListenerServiceTests
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync(string.Empty, _messageId);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<AzureServiceBusEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
@ -63,7 +72,7 @@ public class AzureServiceBusEventListenerServiceTests
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync("{ Inavlid JSON }", _messageId);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<AzureServiceBusEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Is<object>(o => o.ToString().Contains("Invalid JSON")),
@ -80,7 +89,7 @@ public class AzureServiceBusEventListenerServiceTests
_messageId
);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<AzureServiceBusEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
@ -97,7 +106,7 @@ public class AzureServiceBusEventListenerServiceTests
_messageId
);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<AzureServiceBusEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),

View File

@ -14,33 +14,38 @@ namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class AzureServiceBusIntegrationListenerServiceTests
{
private const int _maxRetries = 3;
private const string _topicName = "test_topic";
private const string _subscriptionName = "test_subscription";
private readonly IIntegrationHandler _handler = Substitute.For<IIntegrationHandler>();
private readonly IAzureServiceBusService _serviceBusService = Substitute.For<IAzureServiceBusService>();
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger =
Substitute.For<ILogger<AzureServiceBusIntegrationListenerService>>();
private readonly TestListenerConfiguration _config = new();
private SutProvider<AzureServiceBusIntegrationListenerService> GetSutProvider()
private SutProvider<AzureServiceBusIntegrationListenerService<TestListenerConfiguration>> GetSutProvider()
{
return new SutProvider<AzureServiceBusIntegrationListenerService>()
return new SutProvider<AzureServiceBusIntegrationListenerService<TestListenerConfiguration>>()
.SetDependency(_config)
.SetDependency(_handler)
.SetDependency(_serviceBusService)
.SetDependency(_topicName, "topicName")
.SetDependency(_subscriptionName, "subscriptionName")
.SetDependency(_maxRetries, "maxRetries")
.SetDependency(_logger)
.Create();
}
[Fact]
public void Constructor_CreatesProcessor()
{
var sutProvider = GetSutProvider();
sutProvider.GetDependency<IAzureServiceBusService>().Received(1).CreateProcessor(
Arg.Is(_config.IntegrationTopicName),
Arg.Is(_config.IntegrationSubscriptionName),
Arg.Any<ServiceBusProcessorOptions>()
);
}
[Theory, BitAutoData]
public async Task ProcessErrorAsync_LogsError(ProcessErrorEventArgs args)
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessErrorAsync(args);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<AzureServiceBusIntegrationListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
@ -70,7 +75,7 @@ public class AzureServiceBusIntegrationListenerServiceTests
public async Task HandleMessageAsync_FailureRetryableButTooManyRetries_PublishesToDeadLetterQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
message.RetryCount = _maxRetries;
message.RetryCount = _config.MaxRetries;
var result = new IntegrationHandlerResult(false, message);
result.Retryable = true;

View File

@ -1,4 +1,5 @@
using System.Text.Json;
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
@ -15,16 +16,12 @@ namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class RabbitMqEventListenerServiceTests
{
private const string _queueName = "test_queue";
private readonly IRabbitMqService _rabbitMqService = Substitute.For<IRabbitMqService>();
private readonly ILogger<RabbitMqEventListenerService> _logger = Substitute.For<ILogger<RabbitMqEventListenerService>>();
private readonly TestListenerConfiguration _config = new();
private SutProvider<RabbitMqEventListenerService> GetSutProvider()
private SutProvider<RabbitMqEventListenerService<TestListenerConfiguration>> GetSutProvider()
{
return new SutProvider<RabbitMqEventListenerService>()
.SetDependency(_rabbitMqService)
.SetDependency(_logger)
.SetDependency(_queueName, "queueName")
return new SutProvider<RabbitMqEventListenerService<TestListenerConfiguration>>()
.SetDependency(_config)
.Create();
}
@ -35,8 +32,8 @@ public class RabbitMqEventListenerServiceTests
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
await _rabbitMqService.Received(1).CreateEventQueueAsync(
Arg.Is(_queueName),
await sutProvider.GetDependency<IRabbitMqService>().Received(1).CreateEventQueueAsync(
Arg.Is(_config.EventQueueName),
Arg.Is(cancellationToken)
);
}
@ -52,11 +49,11 @@ public class RabbitMqEventListenerServiceTests
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: new byte[0]);
body: Array.Empty<byte>());
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<RabbitMqEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
@ -75,11 +72,11 @@ public class RabbitMqEventListenerServiceTests
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: JsonSerializer.SerializeToUtf8Bytes("{ Inavlid JSON"));
body: JsonSerializer.SerializeToUtf8Bytes("{ Invalid JSON"));
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<RabbitMqEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Is<object>(o => o.ToString().Contains("Invalid JSON")),
@ -102,7 +99,7 @@ public class RabbitMqEventListenerServiceTests
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<RabbitMqEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
@ -125,7 +122,7 @@ public class RabbitMqEventListenerServiceTests
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
sutProvider.GetDependency<ILogger<RabbitMqEventListenerService<TestListenerConfiguration>>>().Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),

View File

@ -15,23 +15,17 @@ namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class RabbitMqIntegrationListenerServiceTests
{
private const int _maxRetries = 3;
private const string _queueName = "test_queue";
private const string _retryQueueName = "test_queue_retry";
private const string _routingKey = "test_routing_key";
private readonly DateTime _now = new DateTime(2014, 3, 2, 1, 0, 0, DateTimeKind.Utc);
private readonly IIntegrationHandler _handler = Substitute.For<IIntegrationHandler>();
private readonly IRabbitMqService _rabbitMqService = Substitute.For<IRabbitMqService>();
private readonly TestListenerConfiguration _config = new();
private SutProvider<RabbitMqIntegrationListenerService> GetSutProvider()
private SutProvider<RabbitMqIntegrationListenerService<TestListenerConfiguration>> GetSutProvider()
{
var sutProvider = new SutProvider<RabbitMqIntegrationListenerService>()
var sutProvider = new SutProvider<RabbitMqIntegrationListenerService<TestListenerConfiguration>>()
.SetDependency(_config)
.SetDependency(_handler)
.SetDependency(_rabbitMqService)
.SetDependency(_queueName, "queueName")
.SetDependency(_retryQueueName, "retryQueueName")
.SetDependency(_routingKey, "routingKey")
.SetDependency(_maxRetries, "maxRetries")
.WithFakeTimeProvider()
.Create();
sutProvider.GetDependency<FakeTimeProvider>().SetUtcNow(_now);
@ -46,10 +40,10 @@ public class RabbitMqIntegrationListenerServiceTests
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
await _rabbitMqService.Received(1).CreateIntegrationQueuesAsync(
Arg.Is(_queueName),
Arg.Is(_retryQueueName),
Arg.Is(_routingKey),
await sutProvider.GetDependency<IRabbitMqService>().Received(1).CreateIntegrationQueuesAsync(
Arg.Is(_config.IntegrationQueueName),
Arg.Is(_config.IntegrationRetryQueueName),
Arg.Is(((IIntegrationListenerConfiguration)_config).RoutingKey),
Arg.Is(cancellationToken)
);
}
@ -101,7 +95,7 @@ public class RabbitMqIntegrationListenerServiceTests
await sutProvider.Sut.StartAsync(cancellationToken);
message.DelayUntilDate = null;
message.RetryCount = _maxRetries;
message.RetryCount = _config.MaxRetries;
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,