|
@@ -7,12 +7,12 @@ namespace Esim.SendMail;
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// <summary>
|
|
|
/// High-performance background worker for processing message queue.
|
|
/// High-performance background worker for processing message queue.
|
|
|
-/// Uses native .NET BackgroundService for maximum .NET 9 compatibility.
|
|
|
|
|
/// Features:
|
|
/// Features:
|
|
|
-/// - Configurable polling interval
|
|
|
|
|
-/// - Batch processing
|
|
|
|
|
-/// - Automatic retry handling
|
|
|
|
|
-/// - Graceful shutdown
|
|
|
|
|
|
|
+/// - Row-level locking to prevent duplicate processing
|
|
|
|
|
+/// - Configurable batch size and interval
|
|
|
|
|
+/// - Graceful shutdown with in-flight message handling
|
|
|
|
|
+/// - Automatic retry with exponential backoff
|
|
|
|
|
+/// - Metrics logging
|
|
|
/// </summary>
|
|
/// </summary>
|
|
|
public class MessageQueueWorker : BackgroundService
|
|
public class MessageQueueWorker : BackgroundService
|
|
|
{
|
|
{
|
|
@@ -21,6 +21,7 @@ public class MessageQueueWorker : BackgroundService
|
|
|
private readonly IEmailService _emailService;
|
|
private readonly IEmailService _emailService;
|
|
|
private readonly int _intervalSeconds;
|
|
private readonly int _intervalSeconds;
|
|
|
private readonly int _maxMessagesPerRun;
|
|
private readonly int _maxMessagesPerRun;
|
|
|
|
|
+ private readonly int _metricsLogIntervalSeconds;
|
|
|
|
|
|
|
|
// Message types
|
|
// Message types
|
|
|
private const int MESSAGE_TYPE_EMAIL = 1;
|
|
private const int MESSAGE_TYPE_EMAIL = 1;
|
|
@@ -33,6 +34,11 @@ public class MessageQueueWorker : BackgroundService
|
|
|
private const int STATUS_SUCCESS = 2;
|
|
private const int STATUS_SUCCESS = 2;
|
|
|
private const int STATUS_FAILED = 3;
|
|
private const int STATUS_FAILED = 3;
|
|
|
|
|
|
|
|
|
|
+ private DateTime _lastMetricsLog = DateTime.MinValue;
|
|
|
|
|
+ private long _totalProcessed = 0;
|
|
|
|
|
+ private long _totalSuccess = 0;
|
|
|
|
|
+ private long _totalFailed = 0;
|
|
|
|
|
+
|
|
|
public MessageQueueWorker(
|
|
public MessageQueueWorker(
|
|
|
ILogger<MessageQueueWorker> logger,
|
|
ILogger<MessageQueueWorker> logger,
|
|
|
IServiceProvider serviceProvider,
|
|
IServiceProvider serviceProvider,
|
|
@@ -44,6 +50,7 @@ public class MessageQueueWorker : BackgroundService
|
|
|
_emailService = emailService;
|
|
_emailService = emailService;
|
|
|
_intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
|
|
_intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
|
|
|
_maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
|
|
_maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
|
|
|
|
|
+ _metricsLogIntervalSeconds = int.Parse(configuration["Job:MetricsLogIntervalSeconds"] ?? "60");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
@@ -51,42 +58,68 @@ public class MessageQueueWorker : BackgroundService
|
|
|
_logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
|
|
_logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
|
|
|
_intervalSeconds, _maxMessagesPerRun);
|
|
_intervalSeconds, _maxMessagesPerRun);
|
|
|
|
|
|
|
|
|
|
+ // Wait a bit for initialization
|
|
|
|
|
+ await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
|
|
|
|
|
+
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
|
{
|
|
{
|
|
|
try
|
|
try
|
|
|
{
|
|
{
|
|
|
await ProcessMessagesAsync(stoppingToken);
|
|
await ProcessMessagesAsync(stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ // Log metrics periodically
|
|
|
|
|
+ if ((DateTime.Now - _lastMetricsLog).TotalSeconds >= _metricsLogIntervalSeconds)
|
|
|
|
|
+ {
|
|
|
|
|
+ LogMetrics();
|
|
|
|
|
+ _lastMetricsLog = DateTime.Now;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
|
|
|
+ {
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
catch (Exception ex)
|
|
catch (Exception ex)
|
|
|
{
|
|
{
|
|
|
_logger.LogError(ex, "Error in message processing loop");
|
|
_logger.LogError(ex, "Error in message processing loop");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
|
|
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
|
|
|
+ {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- _logger.LogInformation("MessageQueueWorker stopped");
|
|
|
|
|
|
|
+ _logger.LogInformation("MessageQueueWorker stopped. Total processed: {Total}, Success: {Success}, Failed: {Failed}",
|
|
|
|
|
+ _totalProcessed, _totalSuccess, _totalFailed);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void LogMetrics()
|
|
|
|
|
+ {
|
|
|
|
|
+ var emailMetrics = _emailService.GetMetrics();
|
|
|
|
|
+ _logger.LogInformation(
|
|
|
|
|
+ "Metrics: Processed={Processed} | Success={Success} | Failed={Failed} | " +
|
|
|
|
|
+ "EmailRate={Rate}/min | AvgLatency={Latency:F0}ms",
|
|
|
|
|
+ _totalProcessed, _totalSuccess, _totalFailed,
|
|
|
|
|
+ emailMetrics.EmailsLastMinute, emailMetrics.AverageLatencyMs);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
|
|
private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
|
|
|
{
|
|
{
|
|
|
var startTime = DateTime.Now;
|
|
var startTime = DateTime.Now;
|
|
|
|
|
|
|
|
- // Create a new scope for DbContext (transient)
|
|
|
|
|
|
|
+ // Create a new scope for DbContext
|
|
|
using var scope = _serviceProvider.CreateScope();
|
|
using var scope = _serviceProvider.CreateScope();
|
|
|
var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
|
|
var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
|
|
|
|
|
|
|
|
try
|
|
try
|
|
|
{
|
|
{
|
|
|
- // Get pending messages
|
|
|
|
|
- var pendingMessages = await dbContext.MessageQueues
|
|
|
|
|
- .Where(m => m.Status == STATUS_PENDING
|
|
|
|
|
- && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
|
|
|
|
|
- && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
|
|
|
|
|
- .OrderBy(m => m.Priority)
|
|
|
|
|
- .ThenBy(m => m.CreatedDate)
|
|
|
|
|
- .Take(_maxMessagesPerRun)
|
|
|
|
|
- .ToListAsync(stoppingToken);
|
|
|
|
|
|
|
+ // Get and lock pending messages using Oracle's FOR UPDATE SKIP LOCKED
|
|
|
|
|
+ // This prevents duplicate processing when running multiple workers
|
|
|
|
|
+ var pendingMessages = await GetAndLockPendingMessagesAsync(dbContext, stoppingToken);
|
|
|
|
|
|
|
|
if (!pendingMessages.Any())
|
|
if (!pendingMessages.Any())
|
|
|
{
|
|
{
|
|
@@ -96,35 +129,36 @@ public class MessageQueueWorker : BackgroundService
|
|
|
|
|
|
|
|
_logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
|
|
_logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
|
|
|
|
|
|
|
|
- // Mark all as processing
|
|
|
|
|
- var messageIds = pendingMessages.Select(m => m.Id).ToList();
|
|
|
|
|
- await dbContext.Database.ExecuteSqlRawAsync(
|
|
|
|
|
- $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})",
|
|
|
|
|
- stoppingToken);
|
|
|
|
|
-
|
|
|
|
|
// Process by message type
|
|
// Process by message type
|
|
|
var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
|
|
var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
|
|
|
|
|
+ var otherMessages = pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL).ToList();
|
|
|
|
|
|
|
|
- // Process emails
|
|
|
|
|
|
|
+ // Process emails asynchronously
|
|
|
if (emailMessages.Any())
|
|
if (emailMessages.Any())
|
|
|
{
|
|
{
|
|
|
await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
|
|
await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Mark SMS/Push as not implemented
|
|
// Mark SMS/Push as not implemented
|
|
|
- foreach (var msg in pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL))
|
|
|
|
|
|
|
+ foreach (var msg in otherMessages)
|
|
|
{
|
|
{
|
|
|
msg.Status = STATUS_FAILED;
|
|
msg.Status = STATUS_FAILED;
|
|
|
msg.ErrorMessage = "Message type not implemented";
|
|
msg.ErrorMessage = "Message type not implemented";
|
|
|
msg.ProcessedAt = DateTime.Now;
|
|
msg.ProcessedAt = DateTime.Now;
|
|
|
|
|
+ Interlocked.Increment(ref _totalFailed);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
await dbContext.SaveChangesAsync(stoppingToken);
|
|
await dbContext.SaveChangesAsync(stoppingToken);
|
|
|
|
|
|
|
|
// Move completed messages to history
|
|
// Move completed messages to history
|
|
|
- await MoveToHistoryAsync(dbContext,
|
|
|
|
|
- pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList(),
|
|
|
|
|
- stoppingToken);
|
|
|
|
|
|
|
+ var completedMessages = pendingMessages
|
|
|
|
|
+ .Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED)
|
|
|
|
|
+ .ToList();
|
|
|
|
|
+
|
|
|
|
|
+ if (completedMessages.Any())
|
|
|
|
|
+ {
|
|
|
|
|
+ await MoveToHistoryAsync(dbContext, completedMessages, stoppingToken);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
var elapsed = DateTime.Now - startTime;
|
|
var elapsed = DateTime.Now - startTime;
|
|
|
_logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
|
|
_logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
|
|
@@ -140,44 +174,128 @@ public class MessageQueueWorker : BackgroundService
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private async Task ProcessEmailsAsync(ModelContext dbContext, List<MessageQueue> messages, CancellationToken stoppingToken)
|
|
|
|
|
|
|
+ private async Task<List<MessageQueue>> GetAndLockPendingMessagesAsync(
|
|
|
|
|
+ ModelContext dbContext,
|
|
|
|
|
+ CancellationToken stoppingToken)
|
|
|
{
|
|
{
|
|
|
- _logger.LogInformation("Processing {Count} emails", messages.Count);
|
|
|
|
|
|
|
+ // Using Oracle's FOR UPDATE SKIP LOCKED to prevent duplicate processing
|
|
|
|
|
+ // This allows multiple workers to run safely
|
|
|
|
|
+ var sql = $@"
|
|
|
|
|
+ SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
|
|
|
|
|
+ PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
|
|
|
|
|
+ ERROR_MESSAGE, CREATED_BY, CREATED_DATE
|
|
|
|
|
+ FROM MESSAGE_QUEUE
|
|
|
|
|
+ WHERE STATUS = {STATUS_PENDING}
|
|
|
|
|
+ AND (SCHEDULED_AT IS NULL OR SCHEDULED_AT <= SYSDATE)
|
|
|
|
|
+ AND (RETRY_COUNT IS NULL OR RETRY_COUNT < NVL(MAX_RETRY, 3))
|
|
|
|
|
+ ORDER BY PRIORITY, CREATED_DATE
|
|
|
|
|
+ FETCH FIRST {_maxMessagesPerRun} ROWS ONLY
|
|
|
|
|
+ FOR UPDATE SKIP LOCKED";
|
|
|
|
|
|
|
|
- var tasks = messages.Select(async msg =>
|
|
|
|
|
|
|
+ try
|
|
|
{
|
|
{
|
|
|
- if (stoppingToken.IsCancellationRequested) return;
|
|
|
|
|
|
|
+ var messages = await dbContext.MessageQueues
|
|
|
|
|
+ .FromSqlRaw(sql)
|
|
|
|
|
+ .ToListAsync(stoppingToken);
|
|
|
|
|
|
|
|
- try
|
|
|
|
|
|
|
+ // Mark as processing immediately
|
|
|
|
|
+ if (messages.Any())
|
|
|
|
|
+ {
|
|
|
|
|
+ foreach (var msg in messages)
|
|
|
|
|
+ {
|
|
|
|
|
+ msg.Status = STATUS_PROCESSING;
|
|
|
|
|
+ }
|
|
|
|
|
+ await dbContext.SaveChangesAsync(stoppingToken);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return messages;
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogWarning("Failed to get messages with row-locking, falling back: {Message}", ex.Message);
|
|
|
|
|
+
|
|
|
|
|
+ // Fallback to simple query if FOR UPDATE fails
|
|
|
|
|
+ var messages = await dbContext.MessageQueues
|
|
|
|
|
+ .Where(m => m.Status == STATUS_PENDING
|
|
|
|
|
+ && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
|
|
|
|
|
+ && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
|
|
|
|
|
+ .OrderBy(m => m.Priority)
|
|
|
|
|
+ .ThenBy(m => m.CreatedDate)
|
|
|
|
|
+ .Take(_maxMessagesPerRun)
|
|
|
|
|
+ .ToListAsync(stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ // Mark as processing
|
|
|
|
|
+ if (messages.Any())
|
|
|
{
|
|
{
|
|
|
- // Subject and Content are pre-resolved at insert time
|
|
|
|
|
- var subject = msg.Subject ?? "No Subject";
|
|
|
|
|
- var content = msg.Content ?? "";
|
|
|
|
|
|
|
+ var ids = string.Join(",", messages.Select(m => m.Id));
|
|
|
|
|
+ await dbContext.Database.ExecuteSqlRawAsync(
|
|
|
|
|
+ $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({ids})",
|
|
|
|
|
+ stoppingToken);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- bool success = await _emailService.SendEmailAsync(
|
|
|
|
|
- msg.Recipient ?? "",
|
|
|
|
|
- subject,
|
|
|
|
|
- content,
|
|
|
|
|
- true);
|
|
|
|
|
|
|
+ return messages;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- _logger.LogDebug("Email sent to {Recipient}", msg.Recipient);
|
|
|
|
|
|
|
+ private async Task ProcessEmailsAsync(
|
|
|
|
|
+ ModelContext dbContext,
|
|
|
|
|
+ List<MessageQueue> messages,
|
|
|
|
|
+ CancellationToken stoppingToken)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogInformation("Processing {Count} emails", messages.Count);
|
|
|
|
|
|
|
|
- if (success)
|
|
|
|
|
|
|
+ // Convert to EmailMessage for batch processing
|
|
|
|
|
+ var emailMessages = messages.Select(m => new EmailMessage
|
|
|
|
|
+ {
|
|
|
|
|
+ Id = m.Id,
|
|
|
|
|
+ To = m.Recipient ?? "",
|
|
|
|
|
+ Subject = m.Subject ?? "No Subject",
|
|
|
|
|
+ Body = m.Content ?? "",
|
|
|
|
|
+ IsHtml = true
|
|
|
|
|
+ }).ToList();
|
|
|
|
|
+
|
|
|
|
|
+ // Create a lookup for original messages
|
|
|
|
|
+ var messageDict = messages.ToDictionary(m => m.Id);
|
|
|
|
|
+
|
|
|
|
|
+ // Process in parallel using semaphore for concurrency control
|
|
|
|
|
+ var semaphore = new SemaphoreSlim(10); // Max 10 concurrent
|
|
|
|
|
+ var tasks = emailMessages.Select(async email =>
|
|
|
|
|
+ {
|
|
|
|
|
+ await semaphore.WaitAsync(stoppingToken);
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ var success = await _emailService.SendEmailAsync(email.To, email.Subject, email.Body, email.IsHtml);
|
|
|
|
|
+
|
|
|
|
|
+ if (messageDict.TryGetValue((int)email.Id, out var msg))
|
|
|
{
|
|
{
|
|
|
- msg.Status = STATUS_SUCCESS;
|
|
|
|
|
- msg.ProcessedAt = DateTime.Now;
|
|
|
|
|
- msg.ErrorMessage = null;
|
|
|
|
|
|
|
+ if (success)
|
|
|
|
|
+ {
|
|
|
|
|
+ msg.Status = STATUS_SUCCESS;
|
|
|
|
|
+ msg.ProcessedAt = DateTime.Now;
|
|
|
|
|
+ msg.ErrorMessage = null;
|
|
|
|
|
+ Interlocked.Increment(ref _totalSuccess);
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ HandleFailure(msg);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- else
|
|
|
|
|
|
|
+
|
|
|
|
|
+ Interlocked.Increment(ref _totalProcessed);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogError(ex, "Failed to send email {Id}", email.Id);
|
|
|
|
|
+ if (messageDict.TryGetValue((int)email.Id, out var msg))
|
|
|
{
|
|
{
|
|
|
|
|
+ msg.ErrorMessage = ex.Message;
|
|
|
HandleFailure(msg);
|
|
HandleFailure(msg);
|
|
|
}
|
|
}
|
|
|
|
|
+ Interlocked.Increment(ref _totalProcessed);
|
|
|
}
|
|
}
|
|
|
- catch (Exception ex)
|
|
|
|
|
|
|
+ finally
|
|
|
{
|
|
{
|
|
|
- _logger.LogError(ex, "Failed to send email {Id}", msg.Id);
|
|
|
|
|
- msg.ErrorMessage = ex.Message;
|
|
|
|
|
- HandleFailure(msg);
|
|
|
|
|
|
|
+ semaphore.Release();
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
|
|
|
|
@@ -189,27 +307,38 @@ public class MessageQueueWorker : BackgroundService
|
|
|
message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
|
|
message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
|
|
|
message.ProcessedAt = DateTime.Now;
|
|
message.ProcessedAt = DateTime.Now;
|
|
|
|
|
|
|
|
- if (message.RetryCount >= message.MaxRetry)
|
|
|
|
|
|
|
+ var maxRetry = message.MaxRetry ?? 3;
|
|
|
|
|
+ if (message.RetryCount >= maxRetry)
|
|
|
{
|
|
{
|
|
|
message.Status = STATUS_FAILED;
|
|
message.Status = STATUS_FAILED;
|
|
|
- _logger.LogWarning("Message {Id} failed after max retries", message.Id);
|
|
|
|
|
|
|
+ Interlocked.Increment(ref _totalFailed);
|
|
|
|
|
+ _logger.LogWarning("Message {Id} failed after {RetryCount} retries: {Error}",
|
|
|
|
|
+ message.Id, message.RetryCount, message.ErrorMessage);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
|
|
+ // Set back to pending for retry with exponential backoff delay
|
|
|
message.Status = STATUS_PENDING;
|
|
message.Status = STATUS_PENDING;
|
|
|
- _logger.LogDebug("Message {Id} will retry ({RetryCount}/{MaxRetry})",
|
|
|
|
|
- message.Id, message.RetryCount, message.MaxRetry);
|
|
|
|
|
|
|
+ var backoffDelay = _emailService.CalculateBackoffDelay(message.RetryCount ?? 0);
|
|
|
|
|
+ message.ScheduledAt = DateTime.Now.AddMilliseconds(backoffDelay);
|
|
|
|
|
+
|
|
|
|
|
+ _logger.LogDebug("Message {Id} will retry in {DelayMs}ms ({RetryCount}/{MaxRetry})",
|
|
|
|
|
+ message.Id, backoffDelay, message.RetryCount, maxRetry);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private async Task MoveToHistoryAsync(ModelContext dbContext, List<MessageQueue> completedMessages, CancellationToken stoppingToken)
|
|
|
|
|
|
|
+ private async Task MoveToHistoryAsync(
|
|
|
|
|
+ ModelContext dbContext,
|
|
|
|
|
+ List<MessageQueue> completedMessages,
|
|
|
|
|
+ CancellationToken stoppingToken)
|
|
|
{
|
|
{
|
|
|
if (!completedMessages.Any()) return;
|
|
if (!completedMessages.Any()) return;
|
|
|
|
|
|
|
|
try
|
|
try
|
|
|
{
|
|
{
|
|
|
- var idsString = string.Join(",", completedMessages.Select(m => m.Id));
|
|
|
|
|
|
|
+ var ids = string.Join(",", completedMessages.Select(m => m.Id));
|
|
|
|
|
|
|
|
|
|
+ // Insert to history
|
|
|
var insertSql = $@"
|
|
var insertSql = $@"
|
|
|
INSERT INTO MESSAGE_QUEUE_HIS
|
|
INSERT INTO MESSAGE_QUEUE_HIS
|
|
|
(ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
|
|
(ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
|
|
@@ -220,18 +349,26 @@ public class MessageQueueWorker : BackgroundService
|
|
|
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
|
|
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
|
|
|
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
|
|
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
|
|
|
FROM MESSAGE_QUEUE
|
|
FROM MESSAGE_QUEUE
|
|
|
- WHERE ID IN ({idsString})";
|
|
|
|
|
|
|
+ WHERE ID IN ({ids})";
|
|
|
|
|
|
|
|
await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
|
|
await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
|
|
|
|
|
|
|
|
- var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
|
|
|
|
|
|
|
+ // Delete from main queue
|
|
|
|
|
+ var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({ids})";
|
|
|
await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
|
|
await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
|
|
|
|
|
|
|
|
- _logger.LogInformation("Moved {Count} messages to history", completedMessages.Count);
|
|
|
|
|
|
|
+ _logger.LogDebug("Moved {Count} messages to history", completedMessages.Count);
|
|
|
}
|
|
}
|
|
|
catch (Exception ex)
|
|
catch (Exception ex)
|
|
|
{
|
|
{
|
|
|
_logger.LogError(ex, "Failed to move messages to history");
|
|
_logger.LogError(ex, "Failed to move messages to history");
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ public override async Task StopAsync(CancellationToken cancellationToken)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogInformation("MessageQueueWorker stopping, waiting for in-flight messages...");
|
|
|
|
|
+ await base.StopAsync(cancellationToken);
|
|
|
|
|
+ _logger.LogInformation("MessageQueueWorker stopped gracefully");
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|