using Database.Database; using Esim.SendMail.Services; using Microsoft.EntityFrameworkCore; using Quartz; namespace Esim.SendMail.Jobs; /// /// Optimized background job for high-volume email sending /// - Batch processing /// - Move sent messages to history table for optimal queue performance /// - Parallel execution with rate limiting /// [DisallowConcurrentExecution] // Prevent overlapping executions public class MessageQueueJob : IJob { private static readonly log4net.ILog log = log4net.LogManager.GetLogger(typeof(MessageQueueJob)); private readonly ModelContext _dbContext; private readonly IEmailService _emailService; private readonly int _maxMessagesPerRun; // Message types private const int MESSAGE_TYPE_EMAIL = 1; private const int MESSAGE_TYPE_SMS = 2; private const int MESSAGE_TYPE_PUSH = 3; // Message statuses private const int STATUS_PENDING = 0; private const int STATUS_PROCESSING = 1; private const int STATUS_SUCCESS = 2; private const int STATUS_FAILED = 3; public MessageQueueJob(ModelContext dbContext, IEmailService emailService, Microsoft.Extensions.Configuration.IConfiguration configuration) { _dbContext = dbContext; _emailService = emailService; _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500"); } public async Task Execute(IJobExecutionContext context) { var startTime = DateTime.Now; log.Debug("MessageQueueJob started"); try { // Get pending messages with FOR UPDATE SKIP LOCKED pattern // This allows multiple instances to run without conflicts var pendingMessages = await _dbContext.MessageQueues .Where(m => m.Status == STATUS_PENDING && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now) && (m.RetryCount == null || m.RetryCount < m.MaxRetry)) .OrderBy(m => m.Priority) // true (high priority) first .ThenBy(m => m.CreatedDate) .Take(_maxMessagesPerRun) .ToListAsync(); if (!pendingMessages.Any()) { log.Debug("No pending messages"); return; } log.Info($"Processing {pendingMessages.Count} messages"); // Mark all as processing first (atomic update) var messageIds = pendingMessages.Select(m => m.Id).ToList(); await _dbContext.Database.ExecuteSqlRawAsync( $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})"); // Process by message type var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList(); var smsMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_SMS).ToList(); var pushMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_PUSH).ToList(); // Process emails in parallel batches if (emailMessages.Any()) { await ProcessEmailsAsync(emailMessages); } // Process SMS (placeholder) foreach (var msg in smsMessages) { msg.Status = STATUS_FAILED; msg.ErrorMessage = "SMS not implemented"; msg.ProcessedAt = DateTime.Now; } // Process Push (placeholder) foreach (var msg in pushMessages) { msg.Status = STATUS_FAILED; msg.ErrorMessage = "Push not implemented"; msg.ProcessedAt = DateTime.Now; } await _dbContext.SaveChangesAsync(); // Move completed messages to history table await MoveToHistoryAsync(pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList()); var elapsed = DateTime.Now - startTime; log.Info($"Job completed in {elapsed.TotalMilliseconds:F0}ms - Processed: {pendingMessages.Count}"); } catch (Exception ex) { log.Error("MessageQueueJob error", ex); } } private async Task ProcessEmailsAsync(List messages) { log.Info($"Processing {messages.Count} emails"); var emailBatch = messages.Select(m => new EmailMessage { Id = m.Id, To = m.Recipient, Subject = GetEmailSubject(m), Body = GetEmailContent(m), IsHtml = true }).ToList(); // Process in parallel with rate limiting var tasks = new List(); foreach (var msg in messages) { var emailMsg = emailBatch.First(e => e.Id == msg.Id); tasks.Add(Task.Run(async () => { bool success = await _emailService.SendEmailAsync( emailMsg.To, emailMsg.Subject, emailMsg.Body, emailMsg.IsHtml); if (success) { msg.Status = STATUS_SUCCESS; msg.ProcessedAt = DateTime.Now; msg.ErrorMessage = null; } else { HandleFailure(msg); } })); } await Task.WhenAll(tasks); } private string GetEmailSubject(MessageQueue message) { if (!string.IsNullOrEmpty(message.Subject)) return message.Subject; // Get from template if available if (!string.IsNullOrEmpty(message.TemplateCode)) { var template = _dbContext.MessageTemplates .FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true); if (template != null) return template.Subject ?? "No Subject"; } return "No Subject"; } private string GetEmailContent(MessageQueue message) { string content = message.Content ?? ""; // Use template if specified if (!string.IsNullOrEmpty(message.TemplateCode)) { var template = _dbContext.MessageTemplates .FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true); if (template != null) { content = template.Content ?? content; } } // Replace placeholders if (!string.IsNullOrEmpty(message.TemplateData)) { try { var data = System.Text.Json.JsonSerializer.Deserialize>(message.TemplateData); if (data != null) { foreach (var kvp in data) { content = content.Replace($"{{{{{kvp.Key}}}}}", kvp.Value); } } } catch (Exception ex) { log.Warn($"Failed to parse template data: {ex.Message}"); } } return content; } private void HandleFailure(MessageQueue message) { message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1); message.ProcessedAt = DateTime.Now; if (message.RetryCount >= message.MaxRetry) { message.Status = STATUS_FAILED; log.Warn($"Message {message.Id} failed after max retries"); } else { message.Status = STATUS_PENDING; // Will be retried log.Debug($"Message {message.Id} will retry ({message.RetryCount}/{message.MaxRetry})"); } } /// /// Move completed messages to MESSAGE_QUEUE_HIS for optimal queue performance /// private async Task MoveToHistoryAsync(List completedMessages) { if (!completedMessages.Any()) return; try { var ids = completedMessages.Select(m => m.Id).ToList(); var idsString = string.Join(",", ids); // Insert into history table var insertSql = $@" INSERT INTO MESSAGE_QUEUE_HIS (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE) SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA, PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY, ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})"; await _dbContext.Database.ExecuteSqlRawAsync(insertSql); // Delete from main queue var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})"; await _dbContext.Database.ExecuteSqlRawAsync(deleteSql); log.Info($"Moved {completedMessages.Count} messages to history"); } catch (Exception ex) { log.Error($"Failed to move messages to history: {ex.Message}", ex); // Don't throw - messages are already processed, just not moved to history } } }