mirror of
https://github.com/bitwarden/server.git
synced 2025-12-11 04:34:37 -06:00
[BRE-1058] fix alpine race condition (#6156)
* alpine race condition during shutdown fix * change catch to only be for relevant task cancelled, added a debug log * test commit for build and test * remove testing comment
This commit is contained in:
parent
1c2bccdeff
commit
9081c205b1
@ -67,19 +67,25 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
|||||||
|
|
||||||
public virtual async Task StopAsync(CancellationToken cancellationToken)
|
public virtual async Task StopAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
// Step 1: Signal ExecuteAsync to stop gracefully
|
||||||
|
_cts?.Cancel();
|
||||||
|
|
||||||
|
// Step 2: Wait for ExecuteAsync to finish cleanly
|
||||||
|
if (_executingTask != null)
|
||||||
|
{
|
||||||
|
await _executingTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: Now safely dispose resources (ExecuteAsync is done)
|
||||||
await _subscriptionReceiver.CloseAsync(cancellationToken);
|
await _subscriptionReceiver.CloseAsync(cancellationToken);
|
||||||
await _serviceBusClient.DisposeAsync();
|
await _serviceBusClient.DisposeAsync();
|
||||||
_cts?.Cancel();
|
|
||||||
|
// Step 4: Clean up subscription
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _serviceBusAdministrationClient.DeleteSubscriptionAsync(_topicName, _subName, cancellationToken);
|
await _serviceBusAdministrationClient.DeleteSubscriptionAsync(_topicName, _subName, cancellationToken);
|
||||||
}
|
}
|
||||||
catch { }
|
catch { }
|
||||||
|
|
||||||
if (_executingTask != null)
|
|
||||||
{
|
|
||||||
await _executingTask;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual void Dispose()
|
public virtual void Dispose()
|
||||||
@ -87,7 +93,18 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
|||||||
|
|
||||||
private async Task ExecuteAsync(CancellationToken cancellationToken)
|
private async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
await foreach (var message in _subscriptionReceiver.ReceiveMessagesAsync(cancellationToken))
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var messages = await _subscriptionReceiver.ReceiveMessagesAsync(
|
||||||
|
maxMessages: 1,
|
||||||
|
maxWaitTime: TimeSpan.FromSeconds(30),
|
||||||
|
cancellationToken);
|
||||||
|
|
||||||
|
if (messages?.Any() == true)
|
||||||
|
{
|
||||||
|
foreach (var message in messages)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -99,6 +116,19 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("ServiceBus receiver disposed during Alpine container shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("ServiceBus operation cancelled during Alpine container shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async Task ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
|
private async Task ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -86,12 +86,25 @@ public class AzureQueueHostedService : IHostedService, IDisposable
|
|||||||
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Task.Delay cancelled during Alpine container shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "Error occurred processing message block.");
|
_logger.LogError(ex, "Error occurred processing message block.");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
||||||
}
|
}
|
||||||
|
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Task.Delay cancelled during Alpine container shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.LogWarning("Done processing messages.");
|
_logger.LogWarning("Done processing messages.");
|
||||||
|
|||||||
@ -87,6 +87,11 @@ public class AzureQueueHostedService : IHostedService, IDisposable
|
|||||||
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Task.Delay cancelled during Alpine container shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
_logger.LogError(e, "Error processing messages.");
|
_logger.LogError(e, "Error processing messages.");
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user