using Database.Database; using Esim.SendMail.Services; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; namespace Esim.SendMail; /// /// High-performance background worker for processing message queue. /// Features: /// - Row-level locking to prevent duplicate processing /// - Configurable batch size and interval /// - Graceful shutdown with in-flight message handling /// - Automatic retry with exponential backoff /// - Metrics logging /// public class MessageQueueWorker : BackgroundService { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private readonly IEmailService _emailService; private readonly int _intervalSeconds; private readonly int _maxMessagesPerRun; private readonly int _metricsLogIntervalSeconds; // Message types private const int MESSAGE_TYPE_EMAIL = 1; private const int MESSAGE_TYPE_SMS = 2; private const int MESSAGE_TYPE_PUSH = 3; // Message statuses private const int STATUS_PENDING = 0; private const int STATUS_PROCESSING = 1; private const int STATUS_SUCCESS = 2; private const int STATUS_FAILED = 3; private DateTime _lastMetricsLog = DateTime.MinValue; private long _totalProcessed = 0; private long _totalSuccess = 0; private long _totalFailed = 0; public MessageQueueWorker( ILogger logger, IServiceProvider serviceProvider, IEmailService emailService, IConfiguration configuration) { _logger = logger; _serviceProvider = serviceProvider; _emailService = emailService; _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10"); _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500"); _metricsLogIntervalSeconds = int.Parse(configuration["Job:MetricsLogIntervalSeconds"] ?? "60"); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}", _intervalSeconds, _maxMessagesPerRun); // Wait a bit for initialization await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken); while (!stoppingToken.IsCancellationRequested) { try { await ProcessMessagesAsync(stoppingToken); // Log metrics periodically if ((DateTime.Now - _lastMetricsLog).TotalSeconds >= _metricsLogIntervalSeconds) { LogMetrics(); _lastMetricsLog = DateTime.Now; } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "Error in message processing loop"); } try { await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } } _logger.LogInformation("MessageQueueWorker stopped. Total processed: {Total}, Success: {Success}, Failed: {Failed}", _totalProcessed, _totalSuccess, _totalFailed); } private void LogMetrics() { var emailMetrics = _emailService.GetMetrics(); _logger.LogInformation( "Metrics: Processed={Processed} | Success={Success} | Failed={Failed} | " + "EmailRate={Rate}/min | AvgLatency={Latency:F0}ms", _totalProcessed, _totalSuccess, _totalFailed, emailMetrics.EmailsLastMinute, emailMetrics.AverageLatencyMs); } private async Task ProcessMessagesAsync(CancellationToken stoppingToken) { var startTime = DateTime.Now; // Create a new scope for DbContext using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); try { // Get and lock pending messages using Oracle's FOR UPDATE SKIP LOCKED // This prevents duplicate processing when running multiple workers var pendingMessages = await GetAndLockPendingMessagesAsync(dbContext, stoppingToken); if (!pendingMessages.Any()) { _logger.LogDebug("No pending messages"); return; } _logger.LogInformation("Processing {Count} messages", pendingMessages.Count); // Process by message type var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList(); var otherMessages = pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL).ToList(); // Process emails asynchronously if (emailMessages.Any()) { await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken); } // Mark SMS/Push as not implemented foreach (var msg in otherMessages) { msg.Status = STATUS_FAILED; msg.ErrorMessage = "Message type not implemented"; msg.ProcessedAt = DateTime.Now; Interlocked.Increment(ref _totalFailed); } await dbContext.SaveChangesAsync(stoppingToken); // Move completed messages to history var completedMessages = pendingMessages .Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED) .ToList(); if (completedMessages.Any()) { await MoveToHistoryAsync(dbContext, completedMessages, stoppingToken); } var elapsed = DateTime.Now - startTime; _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms", pendingMessages.Count, elapsed.TotalMilliseconds); } catch (OperationCanceledException) { _logger.LogInformation("Processing cancelled"); } catch (Exception ex) { _logger.LogError(ex, "Error processing messages"); } } private async Task> GetAndLockPendingMessagesAsync( ModelContext dbContext, CancellationToken stoppingToken) { // Using Oracle's FOR UPDATE SKIP LOCKED to prevent duplicate processing // This allows multiple workers to run safely var sql = $@" SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE FROM MESSAGE_QUEUE WHERE STATUS = {STATUS_PENDING} AND (SCHEDULED_AT IS NULL OR SCHEDULED_AT <= SYSDATE) AND (RETRY_COUNT IS NULL OR RETRY_COUNT < NVL(MAX_RETRY, 3)) ORDER BY PRIORITY, CREATED_DATE FETCH FIRST {_maxMessagesPerRun} ROWS ONLY FOR UPDATE SKIP LOCKED"; try { var messages = await dbContext.MessageQueues .FromSqlRaw(sql) .ToListAsync(stoppingToken); // Mark as processing immediately if (messages.Any()) { foreach (var msg in messages) { msg.Status = STATUS_PROCESSING; } await dbContext.SaveChangesAsync(stoppingToken); } return messages; } catch (Exception ex) { _logger.LogWarning("Failed to get messages with row-locking, falling back: {Message}", ex.Message); // Fallback to simple query if FOR UPDATE fails var messages = await dbContext.MessageQueues .Where(m => m.Status == STATUS_PENDING && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now) && (m.RetryCount == null || m.RetryCount < m.MaxRetry)) .OrderBy(m => m.Priority) .ThenBy(m => m.CreatedDate) .Take(_maxMessagesPerRun) .ToListAsync(stoppingToken); // Mark as processing if (messages.Any()) { var ids = string.Join(",", messages.Select(m => m.Id)); await dbContext.Database.ExecuteSqlRawAsync( $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({ids})", stoppingToken); } return messages; } } private async Task ProcessEmailsAsync( ModelContext dbContext, List messages, CancellationToken stoppingToken) { _logger.LogInformation("Processing {Count} emails", messages.Count); // Convert to EmailMessage for batch processing var emailMessages = messages.Select(m => new EmailMessage { Id = m.Id, To = m.Recipient ?? "", Subject = m.Subject ?? "No Subject", Body = m.Content ?? "", IsHtml = true }).ToList(); // Create a lookup for original messages var messageDict = messages.ToDictionary(m => m.Id); // Process in parallel using semaphore for concurrency control var semaphore = new SemaphoreSlim(10); // Max 10 concurrent var tasks = emailMessages.Select(async email => { await semaphore.WaitAsync(stoppingToken); try { var success = await _emailService.SendEmailAsync(email.To, email.Subject, email.Body, email.IsHtml); if (messageDict.TryGetValue((int)email.Id, out var msg)) { if (success) { msg.Status = STATUS_SUCCESS; msg.ProcessedAt = DateTime.Now; msg.ErrorMessage = null; Interlocked.Increment(ref _totalSuccess); } else { HandleFailure(msg); } } Interlocked.Increment(ref _totalProcessed); } catch (Exception ex) { _logger.LogError(ex, "Failed to send email {Id}", email.Id); if (messageDict.TryGetValue((int)email.Id, out var msg)) { msg.ErrorMessage = ex.Message; HandleFailure(msg); } Interlocked.Increment(ref _totalProcessed); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } private void HandleFailure(MessageQueue message) { message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1); message.ProcessedAt = DateTime.Now; var maxRetry = message.MaxRetry ?? 3; if (message.RetryCount >= maxRetry) { message.Status = STATUS_FAILED; Interlocked.Increment(ref _totalFailed); _logger.LogWarning("Message {Id} failed after {RetryCount} retries: {Error}", message.Id, message.RetryCount, message.ErrorMessage); } else { // Set back to pending for retry with exponential backoff delay message.Status = STATUS_PENDING; var backoffDelay = _emailService.CalculateBackoffDelay(message.RetryCount ?? 0); message.ScheduledAt = DateTime.Now.AddMilliseconds(backoffDelay); _logger.LogDebug("Message {Id} will retry in {DelayMs}ms ({RetryCount}/{MaxRetry})", message.Id, backoffDelay, message.RetryCount, maxRetry); } } private async Task MoveToHistoryAsync( ModelContext dbContext, List completedMessages, CancellationToken stoppingToken) { if (!completedMessages.Any()) return; try { var ids = string.Join(",", completedMessages.Select(m => m.Id)); // Insert to history var insertSql = $@" INSERT INTO MESSAGE_QUEUE_HIS (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE) SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE FROM MESSAGE_QUEUE WHERE ID IN ({ids})"; await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken); // Delete from main queue var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({ids})"; await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken); _logger.LogDebug("Moved {Count} messages to history", completedMessages.Count); } catch (Exception ex) { _logger.LogError(ex, "Failed to move messages to history"); } } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("MessageQueueWorker stopping, waiting for in-flight messages..."); await base.StopAsync(cancellationToken); _logger.LogInformation("MessageQueueWorker stopped gracefully"); } }