using Database.Database;
using Esim.SendMail.Services;
using Microsoft.EntityFrameworkCore;
using Quartz;
namespace Esim.SendMail.Jobs;
///
/// Optimized background job for high-volume email sending
/// - Batch processing
/// - Move sent messages to history table for optimal queue performance
/// - Parallel execution with rate limiting
///
[DisallowConcurrentExecution] // Prevent overlapping executions
public class MessageQueueJob : IJob
{
private static readonly log4net.ILog log = log4net.LogManager.GetLogger(typeof(MessageQueueJob));
private readonly ModelContext _dbContext;
private readonly IEmailService _emailService;
private readonly int _maxMessagesPerRun;
// Message types
private const int MESSAGE_TYPE_EMAIL = 1;
private const int MESSAGE_TYPE_SMS = 2;
private const int MESSAGE_TYPE_PUSH = 3;
// Message statuses
private const int STATUS_PENDING = 0;
private const int STATUS_PROCESSING = 1;
private const int STATUS_SUCCESS = 2;
private const int STATUS_FAILED = 3;
public MessageQueueJob(ModelContext dbContext, IEmailService emailService, Microsoft.Extensions.Configuration.IConfiguration configuration)
{
_dbContext = dbContext;
_emailService = emailService;
_maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
}
public async Task Execute(IJobExecutionContext context)
{
var startTime = DateTime.Now;
log.Debug("MessageQueueJob started");
try
{
// Get pending messages with FOR UPDATE SKIP LOCKED pattern
// This allows multiple instances to run without conflicts
var pendingMessages = await _dbContext.MessageQueues
.Where(m => m.Status == STATUS_PENDING
&& (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
&& (m.RetryCount == null || m.RetryCount < m.MaxRetry))
.OrderBy(m => m.Priority) // true (high priority) first
.ThenBy(m => m.CreatedDate)
.Take(_maxMessagesPerRun)
.ToListAsync();
if (!pendingMessages.Any())
{
log.Debug("No pending messages");
return;
}
log.Info($"Processing {pendingMessages.Count} messages");
// Mark all as processing first (atomic update)
var messageIds = pendingMessages.Select(m => m.Id).ToList();
await _dbContext.Database.ExecuteSqlRawAsync(
$"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})");
// Process by message type
var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
var smsMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_SMS).ToList();
var pushMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_PUSH).ToList();
// Process emails in parallel batches
if (emailMessages.Any())
{
await ProcessEmailsAsync(emailMessages);
}
// Process SMS (placeholder)
foreach (var msg in smsMessages)
{
msg.Status = STATUS_FAILED;
msg.ErrorMessage = "SMS not implemented";
msg.ProcessedAt = DateTime.Now;
}
// Process Push (placeholder)
foreach (var msg in pushMessages)
{
msg.Status = STATUS_FAILED;
msg.ErrorMessage = "Push not implemented";
msg.ProcessedAt = DateTime.Now;
}
await _dbContext.SaveChangesAsync();
// Move completed messages to history table
await MoveToHistoryAsync(pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList());
var elapsed = DateTime.Now - startTime;
log.Info($"Job completed in {elapsed.TotalMilliseconds:F0}ms - Processed: {pendingMessages.Count}");
}
catch (Exception ex)
{
log.Error("MessageQueueJob error", ex);
}
}
private async Task ProcessEmailsAsync(List messages)
{
log.Info($"Processing {messages.Count} emails");
var emailBatch = messages.Select(m => new EmailMessage
{
Id = m.Id,
To = m.Recipient,
Subject = GetEmailSubject(m),
Body = GetEmailContent(m),
IsHtml = true
}).ToList();
// Process in parallel with rate limiting
var tasks = new List();
foreach (var msg in messages)
{
var emailMsg = emailBatch.First(e => e.Id == msg.Id);
tasks.Add(Task.Run(async () =>
{
bool success = await _emailService.SendEmailAsync(
emailMsg.To,
emailMsg.Subject,
emailMsg.Body,
emailMsg.IsHtml);
if (success)
{
msg.Status = STATUS_SUCCESS;
msg.ProcessedAt = DateTime.Now;
msg.ErrorMessage = null;
}
else
{
HandleFailure(msg);
}
}));
}
await Task.WhenAll(tasks);
}
private string GetEmailSubject(MessageQueue message)
{
if (!string.IsNullOrEmpty(message.Subject))
return message.Subject;
// Get from template if available
if (!string.IsNullOrEmpty(message.TemplateCode))
{
var template = _dbContext.MessageTemplates
.FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true);
if (template != null)
return template.Subject ?? "No Subject";
}
return "No Subject";
}
private string GetEmailContent(MessageQueue message)
{
string content = message.Content ?? "";
// Use template if specified
if (!string.IsNullOrEmpty(message.TemplateCode))
{
var template = _dbContext.MessageTemplates
.FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true);
if (template != null)
{
content = template.Content ?? content;
}
}
// Replace placeholders
if (!string.IsNullOrEmpty(message.TemplateData))
{
try
{
var data = System.Text.Json.JsonSerializer.Deserialize>(message.TemplateData);
if (data != null)
{
foreach (var kvp in data)
{
content = content.Replace($"{{{{{kvp.Key}}}}}", kvp.Value);
}
}
}
catch (Exception ex)
{
log.Warn($"Failed to parse template data: {ex.Message}");
}
}
return content;
}
private void HandleFailure(MessageQueue message)
{
message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
message.ProcessedAt = DateTime.Now;
if (message.RetryCount >= message.MaxRetry)
{
message.Status = STATUS_FAILED;
log.Warn($"Message {message.Id} failed after max retries");
}
else
{
message.Status = STATUS_PENDING; // Will be retried
log.Debug($"Message {message.Id} will retry ({message.RetryCount}/{message.MaxRetry})");
}
}
///
/// Move completed messages to MESSAGE_QUEUE_HIS for optimal queue performance
///
private async Task MoveToHistoryAsync(List completedMessages)
{
if (!completedMessages.Any()) return;
try
{
var ids = completedMessages.Select(m => m.Id).ToList();
var idsString = string.Join(",", ids);
// Insert into history table
var insertSql = $@"
INSERT INTO MESSAGE_QUEUE_HIS
(ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
SELECT
ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
FROM MESSAGE_QUEUE
WHERE ID IN ({idsString})";
await _dbContext.Database.ExecuteSqlRawAsync(insertSql);
// Delete from main queue
var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
await _dbContext.Database.ExecuteSqlRawAsync(deleteSql);
log.Info($"Moved {completedMessages.Count} messages to history");
}
catch (Exception ex)
{
log.Error($"Failed to move messages to history: {ex.Message}", ex);
// Don't throw - messages are already processed, just not moved to history
}
}
}