| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- using Database.Database;
- using Esim.SendMail.Services;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.Extensions.Logging;
- namespace Esim.SendMail;
- /// <summary>
- /// High-performance background worker for processing message queue.
- /// Uses native .NET BackgroundService for maximum .NET 9 compatibility.
- /// Features:
- /// - Configurable polling interval
- /// - Batch processing
- /// - Automatic retry handling
- /// - Graceful shutdown
- /// </summary>
- public class MessageQueueWorker : BackgroundService
- {
- private readonly ILogger<MessageQueueWorker> _logger;
- private readonly IServiceProvider _serviceProvider;
- private readonly IEmailService _emailService;
- private readonly int _intervalSeconds;
- private readonly int _maxMessagesPerRun;
- // Message types
- private const int MESSAGE_TYPE_EMAIL = 1;
- private const int MESSAGE_TYPE_SMS = 2;
- private const int MESSAGE_TYPE_PUSH = 3;
- // Message statuses
- private const int STATUS_PENDING = 0;
- private const int STATUS_PROCESSING = 1;
- private const int STATUS_SUCCESS = 2;
- private const int STATUS_FAILED = 3;
- public MessageQueueWorker(
- ILogger<MessageQueueWorker> logger,
- IServiceProvider serviceProvider,
- IEmailService emailService,
- IConfiguration configuration)
- {
- _logger = logger;
- _serviceProvider = serviceProvider;
- _emailService = emailService;
- _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
- _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
- _intervalSeconds, _maxMessagesPerRun);
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- await ProcessMessagesAsync(stoppingToken);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error in message processing loop");
- }
- await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
- }
- _logger.LogInformation("MessageQueueWorker stopped");
- }
- private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
- {
- var startTime = DateTime.Now;
- // Create a new scope for DbContext (transient)
- using var scope = _serviceProvider.CreateScope();
- var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
- try
- {
- // Get pending messages
- var pendingMessages = await dbContext.MessageQueues
- .Where(m => m.Status == STATUS_PENDING
- && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
- && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
- .OrderBy(m => m.Priority)
- .ThenBy(m => m.CreatedDate)
- .Take(_maxMessagesPerRun)
- .ToListAsync(stoppingToken);
- if (!pendingMessages.Any())
- {
- _logger.LogDebug("No pending messages");
- return;
- }
- _logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
- // Mark all as processing
- var messageIds = pendingMessages.Select(m => m.Id).ToList();
- await dbContext.Database.ExecuteSqlRawAsync(
- $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})",
- stoppingToken);
- // Process by message type
- var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
- // Process emails
- if (emailMessages.Any())
- {
- await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
- }
- // Mark SMS/Push as not implemented
- foreach (var msg in pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL))
- {
- msg.Status = STATUS_FAILED;
- msg.ErrorMessage = "Message type not implemented";
- msg.ProcessedAt = DateTime.Now;
- }
- await dbContext.SaveChangesAsync(stoppingToken);
- // Move completed messages to history
- await MoveToHistoryAsync(dbContext,
- pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList(),
- stoppingToken);
- var elapsed = DateTime.Now - startTime;
- _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
- pendingMessages.Count, elapsed.TotalMilliseconds);
- }
- catch (OperationCanceledException)
- {
- _logger.LogInformation("Processing cancelled");
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing messages");
- }
- }
- private async Task ProcessEmailsAsync(ModelContext dbContext, List<MessageQueue> messages, CancellationToken stoppingToken)
- {
- _logger.LogInformation("Processing {Count} emails", messages.Count);
- var tasks = messages.Select(async msg =>
- {
- if (stoppingToken.IsCancellationRequested) return;
- try
- {
- // Subject and Content are pre-resolved at insert time
- var subject = msg.Subject ?? "No Subject";
- var content = msg.Content ?? "";
- bool success = await _emailService.SendEmailAsync(
- msg.Recipient ?? "",
- subject,
- content,
- true);
- _logger.LogDebug("Email sent to {Recipient}", msg.Recipient);
- if (success)
- {
- msg.Status = STATUS_SUCCESS;
- msg.ProcessedAt = DateTime.Now;
- msg.ErrorMessage = null;
- }
- else
- {
- HandleFailure(msg);
- }
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Failed to send email {Id}", msg.Id);
- msg.ErrorMessage = ex.Message;
- HandleFailure(msg);
- }
- });
- await Task.WhenAll(tasks);
- }
- private void HandleFailure(MessageQueue message)
- {
- message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
- message.ProcessedAt = DateTime.Now;
- if (message.RetryCount >= message.MaxRetry)
- {
- message.Status = STATUS_FAILED;
- _logger.LogWarning("Message {Id} failed after max retries", message.Id);
- }
- else
- {
- message.Status = STATUS_PENDING;
- _logger.LogDebug("Message {Id} will retry ({RetryCount}/{MaxRetry})",
- message.Id, message.RetryCount, message.MaxRetry);
- }
- }
- private async Task MoveToHistoryAsync(ModelContext dbContext, List<MessageQueue> completedMessages, CancellationToken stoppingToken)
- {
- if (!completedMessages.Any()) return;
- try
- {
- var idsString = string.Join(",", completedMessages.Select(m => m.Id));
- var insertSql = $@"
- INSERT INTO MESSAGE_QUEUE_HIS
- (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
- PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
- ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
- SELECT
- ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
- PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
- ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
- FROM MESSAGE_QUEUE
- WHERE ID IN ({idsString})";
- await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
- var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
- await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
- _logger.LogInformation("Moved {Count} messages to history", completedMessages.Count);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Failed to move messages to history");
- }
- }
- }
|