using Database.Database; using Esim.SendMail.Services; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; namespace Esim.SendMail; /// /// High-performance background worker for processing message queue. /// Uses native .NET BackgroundService for maximum .NET 9 compatibility. /// Features: /// - Configurable polling interval /// - Batch processing /// - Automatic retry handling /// - Graceful shutdown /// public class MessageQueueWorker : BackgroundService { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private readonly IEmailService _emailService; private readonly int _intervalSeconds; private readonly int _maxMessagesPerRun; // Message types private const int MESSAGE_TYPE_EMAIL = 1; private const int MESSAGE_TYPE_SMS = 2; private const int MESSAGE_TYPE_PUSH = 3; // Message statuses private const int STATUS_PENDING = 0; private const int STATUS_PROCESSING = 1; private const int STATUS_SUCCESS = 2; private const int STATUS_FAILED = 3; public MessageQueueWorker( ILogger logger, IServiceProvider serviceProvider, IEmailService emailService, IConfiguration configuration) { _logger = logger; _serviceProvider = serviceProvider; _emailService = emailService; _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10"); _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500"); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}", _intervalSeconds, _maxMessagesPerRun); while (!stoppingToken.IsCancellationRequested) { try { await ProcessMessagesAsync(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Error in message processing loop"); } await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken); } _logger.LogInformation("MessageQueueWorker stopped"); } private async Task ProcessMessagesAsync(CancellationToken stoppingToken) { var startTime = DateTime.Now; // Create a new scope for DbContext (transient) using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); try { // Get pending messages var pendingMessages = await dbContext.MessageQueues .Where(m => m.Status == STATUS_PENDING && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now) && (m.RetryCount == null || m.RetryCount < m.MaxRetry)) .OrderBy(m => m.Priority) .ThenBy(m => m.CreatedDate) .Take(_maxMessagesPerRun) .ToListAsync(stoppingToken); if (!pendingMessages.Any()) { _logger.LogDebug("No pending messages"); return; } _logger.LogInformation("Processing {Count} messages", pendingMessages.Count); // Mark all as processing var messageIds = pendingMessages.Select(m => m.Id).ToList(); await dbContext.Database.ExecuteSqlRawAsync( $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})", stoppingToken); // Process by message type var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList(); // Process emails if (emailMessages.Any()) { await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken); } // Mark SMS/Push as not implemented foreach (var msg in pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL)) { msg.Status = STATUS_FAILED; msg.ErrorMessage = "Message type not implemented"; msg.ProcessedAt = DateTime.Now; } await dbContext.SaveChangesAsync(stoppingToken); // Move completed messages to history await MoveToHistoryAsync(dbContext, pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList(), stoppingToken); var elapsed = DateTime.Now - startTime; _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms", pendingMessages.Count, elapsed.TotalMilliseconds); } catch (OperationCanceledException) { _logger.LogInformation("Processing cancelled"); } catch (Exception ex) { _logger.LogError(ex, "Error processing messages"); } } private async Task ProcessEmailsAsync(ModelContext dbContext, List messages, CancellationToken stoppingToken) { _logger.LogInformation("Processing {Count} emails", messages.Count); var tasks = messages.Select(async msg => { if (stoppingToken.IsCancellationRequested) return; try { // Subject and Content are pre-resolved at insert time var subject = msg.Subject ?? "No Subject"; var content = msg.Content ?? ""; bool success = await _emailService.SendEmailAsync( msg.Recipient ?? "", subject, content, true); _logger.LogDebug("Email sent to {Recipient}", msg.Recipient); if (success) { msg.Status = STATUS_SUCCESS; msg.ProcessedAt = DateTime.Now; msg.ErrorMessage = null; } else { HandleFailure(msg); } } catch (Exception ex) { _logger.LogError(ex, "Failed to send email {Id}", msg.Id); msg.ErrorMessage = ex.Message; HandleFailure(msg); } }); await Task.WhenAll(tasks); } private void HandleFailure(MessageQueue message) { message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1); message.ProcessedAt = DateTime.Now; if (message.RetryCount >= message.MaxRetry) { message.Status = STATUS_FAILED; _logger.LogWarning("Message {Id} failed after max retries", message.Id); } else { message.Status = STATUS_PENDING; _logger.LogDebug("Message {Id} will retry ({RetryCount}/{MaxRetry})", message.Id, message.RetryCount, message.MaxRetry); } } private async Task MoveToHistoryAsync(ModelContext dbContext, List completedMessages, CancellationToken stoppingToken) { if (!completedMessages.Any()) return; try { var idsString = string.Join(",", completedMessages.Select(m => m.Id)); var insertSql = $@" INSERT INTO MESSAGE_QUEUE_HIS (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE) SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})"; await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken); var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})"; await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken); _logger.LogInformation("Moved {Count} messages to history", completedMessages.Count); } catch (Exception ex) { _logger.LogError(ex, "Failed to move messages to history"); } } }