using Database.Database;
using Esim.SendMail.Services;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace Esim.SendMail;
///
/// High-performance background worker for processing message queue.
/// Features:
/// - Row-level locking to prevent duplicate processing
/// - Configurable batch size and interval
/// - Graceful shutdown with in-flight message handling
/// - Automatic retry with exponential backoff
/// - Metrics logging
///
public class MessageQueueWorker : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IEmailService _emailService;
private readonly int _intervalSeconds;
private readonly int _maxMessagesPerRun;
private readonly int _metricsLogIntervalSeconds;
// Message types
private const int MESSAGE_TYPE_EMAIL = 1;
private const int MESSAGE_TYPE_SMS = 2;
private const int MESSAGE_TYPE_PUSH = 3;
// Message statuses
private const int STATUS_PENDING = 0;
private const int STATUS_PROCESSING = 1;
private const int STATUS_SUCCESS = 2;
private const int STATUS_FAILED = 3;
private DateTime _lastMetricsLog = DateTime.MinValue;
private long _totalProcessed = 0;
private long _totalSuccess = 0;
private long _totalFailed = 0;
public MessageQueueWorker(
ILogger logger,
IServiceProvider serviceProvider,
IEmailService emailService,
IConfiguration configuration)
{
_logger = logger;
_serviceProvider = serviceProvider;
_emailService = emailService;
_intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
_maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
_metricsLogIntervalSeconds = int.Parse(configuration["Job:MetricsLogIntervalSeconds"] ?? "60");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
_intervalSeconds, _maxMessagesPerRun);
// Wait a bit for initialization
await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessMessagesAsync(stoppingToken);
// Log metrics periodically
if ((DateTime.Now - _lastMetricsLog).TotalSeconds >= _metricsLogIntervalSeconds)
{
LogMetrics();
_lastMetricsLog = DateTime.Now;
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in message processing loop");
}
try
{
await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
_logger.LogInformation("MessageQueueWorker stopped. Total processed: {Total}, Success: {Success}, Failed: {Failed}",
_totalProcessed, _totalSuccess, _totalFailed);
}
private void LogMetrics()
{
var emailMetrics = _emailService.GetMetrics();
_logger.LogInformation(
"Metrics: Processed={Processed} | Success={Success} | Failed={Failed} | " +
"EmailRate={Rate}/min | AvgLatency={Latency:F0}ms",
_totalProcessed, _totalSuccess, _totalFailed,
emailMetrics.EmailsLastMinute, emailMetrics.AverageLatencyMs);
}
private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
{
var startTime = DateTime.Now;
// Create a new scope for DbContext
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService();
try
{
// Get and lock pending messages using Oracle's FOR UPDATE SKIP LOCKED
// This prevents duplicate processing when running multiple workers
var pendingMessages = await GetAndLockPendingMessagesAsync(dbContext, stoppingToken);
if (!pendingMessages.Any())
{
_logger.LogDebug("No pending messages");
return;
}
_logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
// Process by message type
var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
var otherMessages = pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL).ToList();
// Process emails asynchronously
if (emailMessages.Any())
{
await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
}
// Mark SMS/Push as not implemented
foreach (var msg in otherMessages)
{
msg.Status = STATUS_FAILED;
msg.ErrorMessage = "Message type not implemented";
msg.ProcessedAt = DateTime.Now;
Interlocked.Increment(ref _totalFailed);
}
await dbContext.SaveChangesAsync(stoppingToken);
// Move completed messages to history
var completedMessages = pendingMessages
.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED)
.ToList();
if (completedMessages.Any())
{
await MoveToHistoryAsync(dbContext, completedMessages, stoppingToken);
}
var elapsed = DateTime.Now - startTime;
_logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
pendingMessages.Count, elapsed.TotalMilliseconds);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Processing cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing messages");
}
}
private async Task> GetAndLockPendingMessagesAsync(
ModelContext dbContext,
CancellationToken stoppingToken)
{
// Using Oracle's FOR UPDATE SKIP LOCKED to prevent duplicate processing
// This allows multiple workers to run safely
var sql = $@"
SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE
FROM MESSAGE_QUEUE
WHERE STATUS = {STATUS_PENDING}
AND (SCHEDULED_AT IS NULL OR SCHEDULED_AT <= SYSDATE)
AND (RETRY_COUNT IS NULL OR RETRY_COUNT < NVL(MAX_RETRY, 3))
ORDER BY PRIORITY, CREATED_DATE
FETCH FIRST {_maxMessagesPerRun} ROWS ONLY
FOR UPDATE SKIP LOCKED";
try
{
var messages = await dbContext.MessageQueues
.FromSqlRaw(sql)
.ToListAsync(stoppingToken);
// Mark as processing immediately
if (messages.Any())
{
foreach (var msg in messages)
{
msg.Status = STATUS_PROCESSING;
}
await dbContext.SaveChangesAsync(stoppingToken);
}
return messages;
}
catch (Exception ex)
{
_logger.LogWarning("Failed to get messages with row-locking, falling back: {Message}", ex.Message);
// Fallback to simple query if FOR UPDATE fails
var messages = await dbContext.MessageQueues
.Where(m => m.Status == STATUS_PENDING
&& (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
&& (m.RetryCount == null || m.RetryCount < m.MaxRetry))
.OrderBy(m => m.Priority)
.ThenBy(m => m.CreatedDate)
.Take(_maxMessagesPerRun)
.ToListAsync(stoppingToken);
// Mark as processing
if (messages.Any())
{
var ids = string.Join(",", messages.Select(m => m.Id));
await dbContext.Database.ExecuteSqlRawAsync(
$"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({ids})",
stoppingToken);
}
return messages;
}
}
private async Task ProcessEmailsAsync(
ModelContext dbContext,
List messages,
CancellationToken stoppingToken)
{
_logger.LogInformation("Processing {Count} emails", messages.Count);
// Convert to EmailMessage for batch processing
var emailMessages = messages.Select(m => new EmailMessage
{
Id = m.Id,
To = m.Recipient ?? "",
Subject = m.Subject ?? "No Subject",
Body = m.Content ?? "",
IsHtml = true
}).ToList();
// Create a lookup for original messages
var messageDict = messages.ToDictionary(m => m.Id);
// Process in parallel using semaphore for concurrency control
var semaphore = new SemaphoreSlim(10); // Max 10 concurrent
var tasks = emailMessages.Select(async email =>
{
await semaphore.WaitAsync(stoppingToken);
try
{
var success = await _emailService.SendEmailAsync(email.To, email.Subject, email.Body, email.IsHtml);
if (messageDict.TryGetValue((int)email.Id, out var msg))
{
if (success)
{
msg.Status = STATUS_SUCCESS;
msg.ProcessedAt = DateTime.Now;
msg.ErrorMessage = null;
Interlocked.Increment(ref _totalSuccess);
}
else
{
HandleFailure(msg);
}
}
Interlocked.Increment(ref _totalProcessed);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send email {Id}", email.Id);
if (messageDict.TryGetValue((int)email.Id, out var msg))
{
msg.ErrorMessage = ex.Message;
HandleFailure(msg);
}
Interlocked.Increment(ref _totalProcessed);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
private void HandleFailure(MessageQueue message)
{
message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
message.ProcessedAt = DateTime.Now;
var maxRetry = message.MaxRetry ?? 3;
if (message.RetryCount >= maxRetry)
{
message.Status = STATUS_FAILED;
Interlocked.Increment(ref _totalFailed);
_logger.LogWarning("Message {Id} failed after {RetryCount} retries: {Error}",
message.Id, message.RetryCount, message.ErrorMessage);
}
else
{
// Set back to pending for retry with exponential backoff delay
message.Status = STATUS_PENDING;
var backoffDelay = _emailService.CalculateBackoffDelay(message.RetryCount ?? 0);
message.ScheduledAt = DateTime.Now.AddMilliseconds(backoffDelay);
_logger.LogDebug("Message {Id} will retry in {DelayMs}ms ({RetryCount}/{MaxRetry})",
message.Id, backoffDelay, message.RetryCount, maxRetry);
}
}
private async Task MoveToHistoryAsync(
ModelContext dbContext,
List completedMessages,
CancellationToken stoppingToken)
{
if (!completedMessages.Any()) return;
try
{
var ids = string.Join(",", completedMessages.Select(m => m.Id));
// Insert to history
var insertSql = $@"
INSERT INTO MESSAGE_QUEUE_HIS
(ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
SELECT
ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
FROM MESSAGE_QUEUE
WHERE ID IN ({ids})";
await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
// Delete from main queue
var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({ids})";
await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
_logger.LogDebug("Moved {Count} messages to history", completedMessages.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to move messages to history");
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MessageQueueWorker stopping, waiting for in-flight messages...");
await base.StopAsync(cancellationToken);
_logger.LogInformation("MessageQueueWorker stopped gracefully");
}
}