MessageQueueWorker.cs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. using Database.Database;
  2. using Esim.SendMail.Services;
  3. using Microsoft.EntityFrameworkCore;
  4. using Microsoft.Extensions.Logging;
  5. namespace Esim.SendMail;
  6. /// <summary>
  7. /// High-performance background worker for processing message queue.
  8. /// Uses native .NET BackgroundService for maximum .NET 9 compatibility.
  9. /// Features:
  10. /// - Configurable polling interval
  11. /// - Batch processing
  12. /// - Automatic retry handling
  13. /// - Graceful shutdown
  14. /// </summary>
  15. public class MessageQueueWorker : BackgroundService
  16. {
  17. private readonly ILogger<MessageQueueWorker> _logger;
  18. private readonly IServiceProvider _serviceProvider;
  19. private readonly IEmailService _emailService;
  20. private readonly int _intervalSeconds;
  21. private readonly int _maxMessagesPerRun;
  22. // Message types
  23. private const int MESSAGE_TYPE_EMAIL = 1;
  24. private const int MESSAGE_TYPE_SMS = 2;
  25. private const int MESSAGE_TYPE_PUSH = 3;
  26. // Message statuses
  27. private const int STATUS_PENDING = 0;
  28. private const int STATUS_PROCESSING = 1;
  29. private const int STATUS_SUCCESS = 2;
  30. private const int STATUS_FAILED = 3;
  31. public MessageQueueWorker(
  32. ILogger<MessageQueueWorker> logger,
  33. IServiceProvider serviceProvider,
  34. IEmailService emailService,
  35. IConfiguration configuration)
  36. {
  37. _logger = logger;
  38. _serviceProvider = serviceProvider;
  39. _emailService = emailService;
  40. _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
  41. _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
  42. }
  43. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  44. {
  45. _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
  46. _intervalSeconds, _maxMessagesPerRun);
  47. while (!stoppingToken.IsCancellationRequested)
  48. {
  49. try
  50. {
  51. await ProcessMessagesAsync(stoppingToken);
  52. }
  53. catch (Exception ex)
  54. {
  55. _logger.LogError(ex, "Error in message processing loop");
  56. }
  57. await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
  58. }
  59. _logger.LogInformation("MessageQueueWorker stopped");
  60. }
  61. private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
  62. {
  63. var startTime = DateTime.Now;
  64. // Create a new scope for DbContext (transient)
  65. using var scope = _serviceProvider.CreateScope();
  66. var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
  67. try
  68. {
  69. // Get pending messages
  70. var pendingMessages = await dbContext.MessageQueues
  71. .Where(m => m.Status == STATUS_PENDING
  72. && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
  73. && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
  74. .OrderBy(m => m.Priority)
  75. .ThenBy(m => m.CreatedDate)
  76. .Take(_maxMessagesPerRun)
  77. .ToListAsync(stoppingToken);
  78. if (!pendingMessages.Any())
  79. {
  80. _logger.LogDebug("No pending messages");
  81. return;
  82. }
  83. _logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
  84. // Mark all as processing
  85. var messageIds = pendingMessages.Select(m => m.Id).ToList();
  86. await dbContext.Database.ExecuteSqlRawAsync(
  87. $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})",
  88. stoppingToken);
  89. // Process by message type
  90. var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
  91. // Process emails
  92. if (emailMessages.Any())
  93. {
  94. await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
  95. }
  96. // Mark SMS/Push as not implemented
  97. foreach (var msg in pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL))
  98. {
  99. msg.Status = STATUS_FAILED;
  100. msg.ErrorMessage = "Message type not implemented";
  101. msg.ProcessedAt = DateTime.Now;
  102. }
  103. await dbContext.SaveChangesAsync(stoppingToken);
  104. // Move completed messages to history
  105. await MoveToHistoryAsync(dbContext,
  106. pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList(),
  107. stoppingToken);
  108. var elapsed = DateTime.Now - startTime;
  109. _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
  110. pendingMessages.Count, elapsed.TotalMilliseconds);
  111. }
  112. catch (OperationCanceledException)
  113. {
  114. _logger.LogInformation("Processing cancelled");
  115. }
  116. catch (Exception ex)
  117. {
  118. _logger.LogError(ex, "Error processing messages");
  119. }
  120. }
  121. private async Task ProcessEmailsAsync(ModelContext dbContext, List<MessageQueue> messages, CancellationToken stoppingToken)
  122. {
  123. _logger.LogInformation("Processing {Count} emails", messages.Count);
  124. var tasks = messages.Select(async msg =>
  125. {
  126. if (stoppingToken.IsCancellationRequested) return;
  127. try
  128. {
  129. // Subject and Content are pre-resolved at insert time
  130. var subject = msg.Subject ?? "No Subject";
  131. var content = msg.Content ?? "";
  132. bool success = await _emailService.SendEmailAsync(
  133. msg.Recipient ?? "",
  134. subject,
  135. content,
  136. true);
  137. _logger.LogDebug("Email sent to {Recipient}", msg.Recipient);
  138. if (success)
  139. {
  140. msg.Status = STATUS_SUCCESS;
  141. msg.ProcessedAt = DateTime.Now;
  142. msg.ErrorMessage = null;
  143. }
  144. else
  145. {
  146. HandleFailure(msg);
  147. }
  148. }
  149. catch (Exception ex)
  150. {
  151. _logger.LogError(ex, "Failed to send email {Id}", msg.Id);
  152. msg.ErrorMessage = ex.Message;
  153. HandleFailure(msg);
  154. }
  155. });
  156. await Task.WhenAll(tasks);
  157. }
  158. private void HandleFailure(MessageQueue message)
  159. {
  160. message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
  161. message.ProcessedAt = DateTime.Now;
  162. if (message.RetryCount >= message.MaxRetry)
  163. {
  164. message.Status = STATUS_FAILED;
  165. _logger.LogWarning("Message {Id} failed after max retries", message.Id);
  166. }
  167. else
  168. {
  169. message.Status = STATUS_PENDING;
  170. _logger.LogDebug("Message {Id} will retry ({RetryCount}/{MaxRetry})",
  171. message.Id, message.RetryCount, message.MaxRetry);
  172. }
  173. }
  174. private async Task MoveToHistoryAsync(ModelContext dbContext, List<MessageQueue> completedMessages, CancellationToken stoppingToken)
  175. {
  176. if (!completedMessages.Any()) return;
  177. try
  178. {
  179. var idsString = string.Join(",", completedMessages.Select(m => m.Id));
  180. var insertSql = $@"
  181. INSERT INTO MESSAGE_QUEUE_HIS
  182. (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  183. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  184. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
  185. SELECT
  186. ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  187. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  188. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
  189. FROM MESSAGE_QUEUE
  190. WHERE ID IN ({idsString})";
  191. await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
  192. var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
  193. await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
  194. _logger.LogInformation("Moved {Count} messages to history", completedMessages.Count);
  195. }
  196. catch (Exception ex)
  197. {
  198. _logger.LogError(ex, "Failed to move messages to history");
  199. }
  200. }
  201. }