||
- using Database.Database;
- using Esim.SendMail.Services;
- using Microsoft.EntityFrameworkCore;
- using Quartz;
- namespace Esim.SendMail.Jobs;
- /// <summary>
- /// Optimized background job for high-volume email sending
- /// - Batch processing
- /// - Move sent messages to history table for optimal queue performance
- /// - Parallel execution with rate limiting
- /// </summary>
- [DisallowConcurrentExecution] // Prevent overlapping executions
- public class MessageQueueJob : IJob
- {
- private static readonly log4net.ILog log = log4net.LogManager.GetLogger(typeof(MessageQueueJob));
- private readonly ModelContext _dbContext;
- private readonly IEmailService _emailService;
- private readonly int _maxMessagesPerRun;
- // Message types
- private const int MESSAGE_TYPE_EMAIL = 1;
- private const int MESSAGE_TYPE_SMS = 2;
- private const int MESSAGE_TYPE_PUSH = 3;
- // Message statuses
- private const int STATUS_PENDING = 0;
- private const int STATUS_PROCESSING = 1;
- private const int STATUS_SUCCESS = 2;
- private const int STATUS_FAILED = 3;
- public MessageQueueJob(ModelContext dbContext, IEmailService emailService, Microsoft.Extensions.Configuration.IConfiguration configuration)
- {
- _dbContext = dbContext;
- _emailService = emailService;
- _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
- }
- public async Task Execute(IJobExecutionContext context)
- {
- var startTime = DateTime.Now;
- log.Debug("MessageQueueJob started");
- try
- {
- // Get pending messages with FOR UPDATE SKIP LOCKED pattern
- // This allows multiple instances to run without conflicts
- var pendingMessages = await _dbContext.MessageQueues
- .Where(m => m.Status == STATUS_PENDING
- && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
- && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
- .OrderBy(m => m.Priority) // true (high priority) first
- .ThenBy(m => m.CreatedDate)
- .Take(_maxMessagesPerRun)
- .ToListAsync();
- if (!pendingMessages.Any())
- {
- log.Debug("No pending messages");
- return;
- }
- log.Info($"Processing {pendingMessages.Count} messages");
- // Mark all as processing first (atomic update)
- var messageIds = pendingMessages.Select(m => m.Id).ToList();
- await _dbContext.Database.ExecuteSqlRawAsync(
- $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})");
- // Process by message type
- var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
- var smsMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_SMS).ToList();
- var pushMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_PUSH).ToList();
- // Process emails in parallel batches
- if (emailMessages.Any())
- {
- await ProcessEmailsAsync(emailMessages);
- }
- // Process SMS (placeholder)
- foreach (var msg in smsMessages)
- {
- msg.Status = STATUS_FAILED;
- msg.ErrorMessage = "SMS not implemented";
- msg.ProcessedAt = DateTime.Now;
- }
- // Process Push (placeholder)
- foreach (var msg in pushMessages)
- {
- msg.Status = STATUS_FAILED;
- msg.ErrorMessage = "Push not implemented";
- msg.ProcessedAt = DateTime.Now;
- }
- await _dbContext.SaveChangesAsync();
- // Move completed messages to history table
- await MoveToHistoryAsync(pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList());
- var elapsed = DateTime.Now - startTime;
- log.Info($"Job completed in {elapsed.TotalMilliseconds:F0}ms - Processed: {pendingMessages.Count}");
- }
- catch (Exception ex)
- {
- log.Error("MessageQueueJob error", ex);
- }
- }
- private async Task ProcessEmailsAsync(List<MessageQueue> messages)
- {
- log.Info($"Processing {messages.Count} emails");
- var emailBatch = messages.Select(m => new EmailMessage
- {
- Id = m.Id,
- To = m.Recipient,
- Subject = GetEmailSubject(m),
- Body = GetEmailContent(m),
- IsHtml = true
- }).ToList();
- // Process in parallel with rate limiting
- var tasks = new List<Task>();
- foreach (var msg in messages)
- {
- var emailMsg = emailBatch.First(e => e.Id == msg.Id);
-
- tasks.Add(Task.Run(async () =>
- {
- bool success = await _emailService.SendEmailAsync(
- emailMsg.To,
- emailMsg.Subject,
- emailMsg.Body,
- emailMsg.IsHtml);
- if (success)
- {
- msg.Status = STATUS_SUCCESS;
- msg.ProcessedAt = DateTime.Now;
- msg.ErrorMessage = null;
- }
- else
- {
- HandleFailure(msg);
- }
- }));
- }
- await Task.WhenAll(tasks);
- }
- private string GetEmailSubject(MessageQueue message)
- {
- if (!string.IsNullOrEmpty(message.Subject))
- return message.Subject;
- // Get from template if available
- if (!string.IsNullOrEmpty(message.TemplateCode))
- {
- var template = _dbContext.MessageTemplates
- .FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true);
- if (template != null)
- return template.Subject ?? "No Subject";
- }
- return "No Subject";
- }
- private string GetEmailContent(MessageQueue message)
- {
- string content = message.Content ?? "";
- // Use template if specified
- if (!string.IsNullOrEmpty(message.TemplateCode))
- {
- var template = _dbContext.MessageTemplates
- .FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true);
- if (template != null)
- {
- content = template.Content ?? content;
- }
- }
- // Replace placeholders
- if (!string.IsNullOrEmpty(message.TemplateData))
- {
- try
- {
- var data = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(message.TemplateData);
- if (data != null)
- {
- foreach (var kvp in data)
- {
- content = content.Replace($"{{{{{kvp.Key}}}}}", kvp.Value);
- }
- }
- }
- catch (Exception ex)
- {
- log.Warn($"Failed to parse template data: {ex.Message}");
- }
- }
- return content;
- }
- private void HandleFailure(MessageQueue message)
- {
- message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
- message.ProcessedAt = DateTime.Now;
- if (message.RetryCount >= message.MaxRetry)
- {
- message.Status = STATUS_FAILED;
- log.Warn($"Message {message.Id} failed after max retries");
- }
- else
- {
- message.Status = STATUS_PENDING; // Will be retried
- log.Debug($"Message {message.Id} will retry ({message.RetryCount}/{message.MaxRetry})");
- }
- }
- /// <summary>
- /// Move completed messages to MESSAGE_QUEUE_HIS for optimal queue performance
- /// </summary>
- private async Task MoveToHistoryAsync(List<MessageQueue> completedMessages)
- {
- if (!completedMessages.Any()) return;
- try
- {
- var ids = completedMessages.Select(m => m.Id).ToList();
- var idsString = string.Join(",", ids);
- // Insert into history table
- var insertSql = $@"
- INSERT INTO MESSAGE_QUEUE_HIS
- (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
- PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
- ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
- SELECT
- ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
- PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
- ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
- FROM MESSAGE_QUEUE
- WHERE ID IN ({idsString})";
- await _dbContext.Database.ExecuteSqlRawAsync(insertSql);
- // Delete from main queue
- var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
- await _dbContext.Database.ExecuteSqlRawAsync(deleteSql);
- log.Info($"Moved {completedMessages.Count} messages to history");
- }
- catch (Exception ex)
- {
- log.Error($"Failed to move messages to history: {ex.Message}", ex);
- // Don't throw - messages are already processed, just not moved to history
- }
- }
- }
|