using Database.Database;
using Esim.SendMail.Services;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace Esim.SendMail;
///
/// High-performance background worker for processing message queue.
/// Uses native .NET BackgroundService for maximum .NET 9 compatibility.
/// Features:
/// - Configurable polling interval
/// - Batch processing
/// - Automatic retry handling
/// - Graceful shutdown
///
public class MessageQueueWorker : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IEmailService _emailService;
private readonly int _intervalSeconds;
private readonly int _maxMessagesPerRun;
// Message types
private const int MESSAGE_TYPE_EMAIL = 1;
private const int MESSAGE_TYPE_SMS = 2;
private const int MESSAGE_TYPE_PUSH = 3;
// Message statuses
private const int STATUS_PENDING = 0;
private const int STATUS_PROCESSING = 1;
private const int STATUS_SUCCESS = 2;
private const int STATUS_FAILED = 3;
public MessageQueueWorker(
ILogger logger,
IServiceProvider serviceProvider,
IEmailService emailService,
IConfiguration configuration)
{
_logger = logger;
_serviceProvider = serviceProvider;
_emailService = emailService;
_intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
_maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
_intervalSeconds, _maxMessagesPerRun);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessMessagesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in message processing loop");
}
await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
}
_logger.LogInformation("MessageQueueWorker stopped");
}
private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
{
var startTime = DateTime.Now;
// Create a new scope for DbContext (transient)
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService();
try
{
// Get pending messages
var pendingMessages = await dbContext.MessageQueues
.Where(m => m.Status == STATUS_PENDING
&& (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
&& (m.RetryCount == null || m.RetryCount < m.MaxRetry))
.OrderBy(m => m.Priority)
.ThenBy(m => m.CreatedDate)
.Take(_maxMessagesPerRun)
.ToListAsync(stoppingToken);
if (!pendingMessages.Any())
{
_logger.LogDebug("No pending messages");
return;
}
_logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
// Mark all as processing
var messageIds = pendingMessages.Select(m => m.Id).ToList();
await dbContext.Database.ExecuteSqlRawAsync(
$"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})",
stoppingToken);
// Process by message type
var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
// Process emails
if (emailMessages.Any())
{
await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
}
// Mark SMS/Push as not implemented
foreach (var msg in pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL))
{
msg.Status = STATUS_FAILED;
msg.ErrorMessage = "Message type not implemented";
msg.ProcessedAt = DateTime.Now;
}
await dbContext.SaveChangesAsync(stoppingToken);
// Move completed messages to history
await MoveToHistoryAsync(dbContext,
pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList(),
stoppingToken);
var elapsed = DateTime.Now - startTime;
_logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
pendingMessages.Count, elapsed.TotalMilliseconds);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Processing cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing messages");
}
}
private async Task ProcessEmailsAsync(ModelContext dbContext, List messages, CancellationToken stoppingToken)
{
_logger.LogInformation("Processing {Count} emails", messages.Count);
var tasks = messages.Select(async msg =>
{
if (stoppingToken.IsCancellationRequested) return;
try
{
// Subject and Content are pre-resolved at insert time
var subject = msg.Subject ?? "No Subject";
var content = msg.Content ?? "";
bool success = await _emailService.SendEmailAsync(
msg.Recipient ?? "",
subject,
content,
true);
_logger.LogDebug("Email sent to {Recipient}", msg.Recipient);
if (success)
{
msg.Status = STATUS_SUCCESS;
msg.ProcessedAt = DateTime.Now;
msg.ErrorMessage = null;
}
else
{
HandleFailure(msg);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send email {Id}", msg.Id);
msg.ErrorMessage = ex.Message;
HandleFailure(msg);
}
});
await Task.WhenAll(tasks);
}
private void HandleFailure(MessageQueue message)
{
message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
message.ProcessedAt = DateTime.Now;
if (message.RetryCount >= message.MaxRetry)
{
message.Status = STATUS_FAILED;
_logger.LogWarning("Message {Id} failed after max retries", message.Id);
}
else
{
message.Status = STATUS_PENDING;
_logger.LogDebug("Message {Id} will retry ({RetryCount}/{MaxRetry})",
message.Id, message.RetryCount, message.MaxRetry);
}
}
private async Task MoveToHistoryAsync(ModelContext dbContext, List completedMessages, CancellationToken stoppingToken)
{
if (!completedMessages.Any()) return;
try
{
var idsString = string.Join(",", completedMessages.Select(m => m.Id));
var insertSql = $@"
INSERT INTO MESSAGE_QUEUE_HIS
(ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
SELECT
ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
FROM MESSAGE_QUEUE
WHERE ID IN ({idsString})";
await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
_logger.LogInformation("Moved {Count} messages to history", completedMessages.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to move messages to history");
}
}
}