||
- using Database.Database;
- using Esim.SendMail.Services;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.EntityFrameworkCore.Storage;
- using Microsoft.Extensions.Logging;
- namespace Esim.SendMail;
- /// <summary>
- /// High-performance background worker for processing message queue.
- /// Features:
- /// - Row-level locking to prevent duplicate processing
- /// - Configurable batch size and interval
- /// - Graceful shutdown with in-flight message handling
- /// - Automatic retry with exponential backoff
- /// - Metrics logging
- /// </summary>
- public class MessageQueueWorker : BackgroundService
- {
- private readonly ILogger<MessageQueueWorker> _logger;
- private readonly IServiceProvider _serviceProvider;
- private readonly IEmailService _emailService;
- private readonly int _intervalSeconds;
- private readonly int _maxMessagesPerRun;
- private readonly int _metricsLogIntervalSeconds;
- // Message types
- private const int MESSAGE_TYPE_EMAIL = 1;
- private const int MESSAGE_TYPE_SMS = 2;
- private const int MESSAGE_TYPE_PUSH = 3;
- // Message statuses
- private const int STATUS_PENDING = 0;
- private const int STATUS_PROCESSING = 1;
- private const int STATUS_SUCCESS = 2;
- private const int STATUS_FAILED = 3;
- private DateTime _lastMetricsLog = DateTime.MinValue;
- private long _totalProcessed = 0;
- private long _totalSuccess = 0;
- private long _totalFailed = 0;
- public MessageQueueWorker(
- ILogger<MessageQueueWorker> logger,
- IServiceProvider serviceProvider,
- IEmailService emailService,
- IConfiguration configuration)
- {
- _logger = logger;
- _serviceProvider = serviceProvider;
- _emailService = emailService;
- _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
- _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
- _metricsLogIntervalSeconds = int.Parse(configuration["Job:MetricsLogIntervalSeconds"] ?? "60");
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
- _intervalSeconds, _maxMessagesPerRun);
- // Wait a bit for initialization
- await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- await ProcessMessagesAsync(stoppingToken);
-
- // Log metrics periodically
- if ((DateTime.Now - _lastMetricsLog).TotalSeconds >= _metricsLogIntervalSeconds)
- {
- LogMetrics();
- _lastMetricsLog = DateTime.Now;
- }
- }
- catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
- {
- break;
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error in message processing loop");
- }
- try
- {
- await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
- }
- catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
- {
- break;
- }
- }
- _logger.LogInformation("MessageQueueWorker stopped. Total processed: {Total}, Success: {Success}, Failed: {Failed}",
- _totalProcessed, _totalSuccess, _totalFailed);
- }
- private void LogMetrics()
- {
- var emailMetrics = _emailService.GetMetrics();
- _logger.LogInformation(
- "Metrics: Processed={Processed} | Success={Success} | Failed={Failed} | " +
- "EmailRate={Rate}/min | AvgLatency={Latency:F0}ms",
- _totalProcessed, _totalSuccess, _totalFailed,
- emailMetrics.EmailsLastMinute, emailMetrics.AverageLatencyMs);
- }
- private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
- {
- var startTime = DateTime.Now;
- // Create a new scope for DbContext
- using var scope = _serviceProvider.CreateScope();
- var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
- try
- {
- // Get and lock pending messages using Oracle's FOR UPDATE SKIP LOCKED
- // This prevents duplicate processing when running multiple workers
- var pendingMessages = await GetAndLockPendingMessagesAsync(dbContext, stoppingToken);
- if (!pendingMessages.Any())
- {
- _logger.LogDebug("No pending messages");
- return;
- }
- _logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
- // Process by message type
- var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
- var otherMessages = pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL).ToList();
- // Process emails asynchronously
- if (emailMessages.Any())
- {
- await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
- }
- // Mark SMS/Push as not implemented
- foreach (var msg in otherMessages)
- {
- msg.Status = STATUS_FAILED;
- msg.ErrorMessage = "Message type not implemented";
- msg.ProcessedAt = DateTime.Now;
- Interlocked.Increment(ref _totalFailed);
- }
- await dbContext.SaveChangesAsync(stoppingToken);
- // Move completed messages to history
- var completedMessages = pendingMessages
- .Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED)
- .ToList();
-
- if (completedMessages.Any())
- {
- await MoveToHistoryAsync(dbContext, completedMessages, stoppingToken);
- }
- var elapsed = DateTime.Now - startTime;
- _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
- pendingMessages.Count, elapsed.TotalMilliseconds);
- }
- catch (OperationCanceledException)
- {
- _logger.LogInformation("Processing cancelled");
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing messages");
- }
- }
- private async Task<List<MessageQueue>> GetAndLockPendingMessagesAsync(
- ModelContext dbContext,
- CancellationToken stoppingToken)
- {
- // Step 1: Lock rows and get IDs using raw SQL with FOR UPDATE SKIP LOCKED
- // This prevents duplicate processing across multiple workers
- var lockSql = $@"
- SELECT ID
- FROM {dbContext.Model.FindEntityType(typeof(MessageQueue))?.GetSchemaQualifiedTableName() ?? "MESSAGE_QUEUE"}
- WHERE STATUS = {STATUS_PENDING}
- AND (SCHEDULED_AT IS NULL OR SCHEDULED_AT <= SYSDATE)
- AND (RETRY_COUNT IS NULL OR RETRY_COUNT < NVL(MAX_RETRY, 3))
- ORDER BY PRIORITY, CREATED_DATE
- FETCH FIRST {_maxMessagesPerRun} ROWS ONLY
- FOR UPDATE SKIP LOCKED";
- try
- {
- // Execute raw SQL to lock rows and get IDs
- var connection = dbContext.Database.GetDbConnection();
- await dbContext.Database.OpenConnectionAsync(stoppingToken);
-
- var lockedIds = new List<int>();
- using (var command = connection.CreateCommand())
- {
- command.CommandText = lockSql;
- command.Transaction = dbContext.Database.CurrentTransaction?.GetDbTransaction();
-
- using (var reader = await command.ExecuteReaderAsync(stoppingToken))
- {
- while (await reader.ReadAsync(stoppingToken))
- {
- lockedIds.Add(reader.GetInt32(0));
- }
- }
- }
- if (!lockedIds.Any())
- {
- return new List<MessageQueue>();
- }
- // Step 2: Query the full entities using EF Core with the locked IDs
- var messages = await dbContext.MessageQueues
- .Where(m => lockedIds.Contains(m.Id))
- .ToListAsync(stoppingToken);
- // Step 3: Mark as processing immediately while rows are still locked
- foreach (var msg in messages)
- {
- msg.Status = STATUS_PROCESSING;
- }
- await dbContext.SaveChangesAsync(stoppingToken);
-
- _logger.LogDebug("Locked and fetched {Count} messages for processing", messages.Count);
- return messages;
- }
- catch (Exception ex)
- {
- _logger.LogWarning(ex, "Failed to lock messages with FOR UPDATE, falling back to simple query");
-
- // Fallback: Simple EF Core query without locking
- var messages = await dbContext.MessageQueues
- .Where(m => m.Status == STATUS_PENDING
- && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
- && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
- .OrderBy(m => m.Priority)
- .ThenBy(m => m.CreatedDate)
- .Take(_maxMessagesPerRun)
- .ToListAsync(stoppingToken);
- if (messages.Any())
- {
- foreach (var msg in messages)
- {
- msg.Status = STATUS_PROCESSING;
- }
- await dbContext.SaveChangesAsync(stoppingToken);
- }
- return messages;
- }
- }
- private async Task ProcessEmailsAsync(
- ModelContext dbContext,
- List<MessageQueue> messages,
- CancellationToken stoppingToken)
- {
- _logger.LogInformation("Processing {Count} emails", messages.Count);
- // Convert to EmailMessage for batch processing
- var emailMessages = messages.Select(m => new EmailMessage
- {
- Id = m.Id,
- To = m.Recipient ?? "",
- Subject = m.Subject ?? "No Subject",
- Body = m.Content ?? "",
- IsHtml = true
- }).ToList();
- // Create a lookup for original messages
- var messageDict = messages.ToDictionary(m => m.Id);
- // Process in parallel using semaphore for concurrency control
- var semaphore = new SemaphoreSlim(10); // Max 10 concurrent
- var tasks = emailMessages.Select(async email =>
- {
- await semaphore.WaitAsync(stoppingToken);
- try
- {
- var success = await _emailService.SendEmailAsync(email.To, email.Subject, email.Body, email.IsHtml);
-
- if (messageDict.TryGetValue((int)email.Id, out var msg))
- {
- if (success)
- {
- msg.Status = STATUS_SUCCESS;
- msg.ProcessedAt = DateTime.Now;
- msg.ErrorMessage = null;
- Interlocked.Increment(ref _totalSuccess);
- }
- else
- {
- HandleFailure(msg);
- }
- }
-
- Interlocked.Increment(ref _totalProcessed);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Failed to send email {Id}", email.Id);
- if (messageDict.TryGetValue((int)email.Id, out var msg))
- {
- msg.ErrorMessage = ex.Message;
- HandleFailure(msg);
- }
- Interlocked.Increment(ref _totalProcessed);
- }
- finally
- {
- semaphore.Release();
- }
- });
- await Task.WhenAll(tasks);
- }
- private void HandleFailure(MessageQueue message)
- {
- message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
- message.ProcessedAt = DateTime.Now;
- var maxRetry = message.MaxRetry ?? 3;
- if (message.RetryCount >= maxRetry)
- {
- message.Status = STATUS_FAILED;
- Interlocked.Increment(ref _totalFailed);
- _logger.LogWarning("Message {Id} failed after {RetryCount} retries: {Error}",
- message.Id, message.RetryCount, message.ErrorMessage);
- }
- else
- {
- // Set back to pending for retry with exponential backoff delay
- message.Status = STATUS_PENDING;
- var backoffDelay = _emailService.CalculateBackoffDelay(message.RetryCount ?? 0);
- message.ScheduledAt = DateTime.Now.AddMilliseconds(backoffDelay);
-
- _logger.LogDebug("Message {Id} will retry in {DelayMs}ms ({RetryCount}/{MaxRetry})",
- message.Id, backoffDelay, message.RetryCount, maxRetry);
- }
- }
- private async Task MoveToHistoryAsync(
- ModelContext dbContext,
- List<MessageQueue> completedMessages,
- CancellationToken stoppingToken)
- {
- if (!completedMessages.Any()) return;
- try
- {
- var ids = string.Join(",", completedMessages.Select(m => m.Id));
- // Insert to history
- var insertSql = $@"
- INSERT INTO MESSAGE_QUEUE_HIS
- (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
- PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
- ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
- SELECT
- ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
- PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
- ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
- FROM MESSAGE_QUEUE
- WHERE ID IN ({ids})";
- await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
- // Delete from main queue
- var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({ids})";
- await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
- _logger.LogDebug("Moved {Count} messages to history", completedMessages.Count);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Failed to move messages to history");
- }
- }
- public override async Task StopAsync(CancellationToken cancellationToken)
- {
- _logger.LogInformation("MessageQueueWorker stopping, waiting for in-flight messages...");
- await base.StopAsync(cancellationToken);
- _logger.LogInformation("MessageQueueWorker stopped gracefully");
- }
- }
|