MessageQueueWorker.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. using Database.Database;
  2. using Esim.SendMail.Services;
  3. using Microsoft.EntityFrameworkCore;
  4. using Microsoft.Extensions.Logging;
  5. namespace Esim.SendMail;
  6. /// <summary>
  7. /// High-performance background worker for processing message queue.
  8. /// Features:
  9. /// - Row-level locking to prevent duplicate processing
  10. /// - Configurable batch size and interval
  11. /// - Graceful shutdown with in-flight message handling
  12. /// - Automatic retry with exponential backoff
  13. /// - Metrics logging
  14. /// </summary>
  15. public class MessageQueueWorker : BackgroundService
  16. {
  17. private readonly ILogger<MessageQueueWorker> _logger;
  18. private readonly IServiceProvider _serviceProvider;
  19. private readonly IEmailService _emailService;
  20. private readonly int _intervalSeconds;
  21. private readonly int _maxMessagesPerRun;
  22. private readonly int _metricsLogIntervalSeconds;
  23. // Message types
  24. private const int MESSAGE_TYPE_EMAIL = 1;
  25. private const int MESSAGE_TYPE_SMS = 2;
  26. private const int MESSAGE_TYPE_PUSH = 3;
  27. // Message statuses
  28. private const int STATUS_PENDING = 0;
  29. private const int STATUS_PROCESSING = 1;
  30. private const int STATUS_SUCCESS = 2;
  31. private const int STATUS_FAILED = 3;
  32. private DateTime _lastMetricsLog = DateTime.MinValue;
  33. private long _totalProcessed = 0;
  34. private long _totalSuccess = 0;
  35. private long _totalFailed = 0;
  36. public MessageQueueWorker(
  37. ILogger<MessageQueueWorker> logger,
  38. IServiceProvider serviceProvider,
  39. IEmailService emailService,
  40. IConfiguration configuration)
  41. {
  42. _logger = logger;
  43. _serviceProvider = serviceProvider;
  44. _emailService = emailService;
  45. _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
  46. _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
  47. _metricsLogIntervalSeconds = int.Parse(configuration["Job:MetricsLogIntervalSeconds"] ?? "60");
  48. }
  49. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  50. {
  51. _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
  52. _intervalSeconds, _maxMessagesPerRun);
  53. // Wait a bit for initialization
  54. await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
  55. while (!stoppingToken.IsCancellationRequested)
  56. {
  57. try
  58. {
  59. await ProcessMessagesAsync(stoppingToken);
  60. // Log metrics periodically
  61. if ((DateTime.Now - _lastMetricsLog).TotalSeconds >= _metricsLogIntervalSeconds)
  62. {
  63. LogMetrics();
  64. _lastMetricsLog = DateTime.Now;
  65. }
  66. }
  67. catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
  68. {
  69. break;
  70. }
  71. catch (Exception ex)
  72. {
  73. _logger.LogError(ex, "Error in message processing loop");
  74. }
  75. try
  76. {
  77. await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
  78. }
  79. catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
  80. {
  81. break;
  82. }
  83. }
  84. _logger.LogInformation("MessageQueueWorker stopped. Total processed: {Total}, Success: {Success}, Failed: {Failed}",
  85. _totalProcessed, _totalSuccess, _totalFailed);
  86. }
  87. private void LogMetrics()
  88. {
  89. var emailMetrics = _emailService.GetMetrics();
  90. _logger.LogInformation(
  91. "Metrics: Processed={Processed} | Success={Success} | Failed={Failed} | " +
  92. "EmailRate={Rate}/min | AvgLatency={Latency:F0}ms",
  93. _totalProcessed, _totalSuccess, _totalFailed,
  94. emailMetrics.EmailsLastMinute, emailMetrics.AverageLatencyMs);
  95. }
  96. private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
  97. {
  98. var startTime = DateTime.Now;
  99. // Create a new scope for DbContext
  100. using var scope = _serviceProvider.CreateScope();
  101. var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
  102. try
  103. {
  104. // Get and lock pending messages using Oracle's FOR UPDATE SKIP LOCKED
  105. // This prevents duplicate processing when running multiple workers
  106. var pendingMessages = await GetAndLockPendingMessagesAsync(dbContext, stoppingToken);
  107. if (!pendingMessages.Any())
  108. {
  109. _logger.LogDebug("No pending messages");
  110. return;
  111. }
  112. _logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
  113. // Process by message type
  114. var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
  115. var otherMessages = pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL).ToList();
  116. // Process emails asynchronously
  117. if (emailMessages.Any())
  118. {
  119. await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
  120. }
  121. // Mark SMS/Push as not implemented
  122. foreach (var msg in otherMessages)
  123. {
  124. msg.Status = STATUS_FAILED;
  125. msg.ErrorMessage = "Message type not implemented";
  126. msg.ProcessedAt = DateTime.Now;
  127. Interlocked.Increment(ref _totalFailed);
  128. }
  129. await dbContext.SaveChangesAsync(stoppingToken);
  130. // Move completed messages to history
  131. var completedMessages = pendingMessages
  132. .Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED)
  133. .ToList();
  134. if (completedMessages.Any())
  135. {
  136. await MoveToHistoryAsync(dbContext, completedMessages, stoppingToken);
  137. }
  138. var elapsed = DateTime.Now - startTime;
  139. _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
  140. pendingMessages.Count, elapsed.TotalMilliseconds);
  141. }
  142. catch (OperationCanceledException)
  143. {
  144. _logger.LogInformation("Processing cancelled");
  145. }
  146. catch (Exception ex)
  147. {
  148. _logger.LogError(ex, "Error processing messages");
  149. }
  150. }
  151. private async Task<List<MessageQueue>> GetAndLockPendingMessagesAsync(
  152. ModelContext dbContext,
  153. CancellationToken stoppingToken)
  154. {
  155. // Using Oracle's FOR UPDATE SKIP LOCKED to prevent duplicate processing
  156. // This allows multiple workers to run safely
  157. var sql = $@"
  158. SELECT ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  159. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  160. ERROR_MESSAGE, CREATED_BY, CREATED_DATE
  161. FROM MESSAGE_QUEUE
  162. WHERE STATUS = {STATUS_PENDING}
  163. AND (SCHEDULED_AT IS NULL OR SCHEDULED_AT <= SYSDATE)
  164. AND (RETRY_COUNT IS NULL OR RETRY_COUNT < NVL(MAX_RETRY, 3))
  165. ORDER BY PRIORITY, CREATED_DATE
  166. FETCH FIRST {_maxMessagesPerRun} ROWS ONLY
  167. FOR UPDATE SKIP LOCKED";
  168. try
  169. {
  170. var messages = await dbContext.MessageQueues
  171. .FromSqlRaw(sql)
  172. .ToListAsync(stoppingToken);
  173. // Mark as processing immediately
  174. if (messages.Any())
  175. {
  176. foreach (var msg in messages)
  177. {
  178. msg.Status = STATUS_PROCESSING;
  179. }
  180. await dbContext.SaveChangesAsync(stoppingToken);
  181. }
  182. return messages;
  183. }
  184. catch (Exception ex)
  185. {
  186. _logger.LogWarning("Failed to get messages with row-locking, falling back: {Message}", ex.Message);
  187. // Fallback to simple query if FOR UPDATE fails
  188. var messages = await dbContext.MessageQueues
  189. .Where(m => m.Status == STATUS_PENDING
  190. && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
  191. && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
  192. .OrderBy(m => m.Priority)
  193. .ThenBy(m => m.CreatedDate)
  194. .Take(_maxMessagesPerRun)
  195. .ToListAsync(stoppingToken);
  196. // Mark as processing
  197. if (messages.Any())
  198. {
  199. var ids = string.Join(",", messages.Select(m => m.Id));
  200. await dbContext.Database.ExecuteSqlRawAsync(
  201. $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({ids})",
  202. stoppingToken);
  203. }
  204. return messages;
  205. }
  206. }
  207. private async Task ProcessEmailsAsync(
  208. ModelContext dbContext,
  209. List<MessageQueue> messages,
  210. CancellationToken stoppingToken)
  211. {
  212. _logger.LogInformation("Processing {Count} emails", messages.Count);
  213. // Convert to EmailMessage for batch processing
  214. var emailMessages = messages.Select(m => new EmailMessage
  215. {
  216. Id = m.Id,
  217. To = m.Recipient ?? "",
  218. Subject = m.Subject ?? "No Subject",
  219. Body = m.Content ?? "",
  220. IsHtml = true
  221. }).ToList();
  222. // Create a lookup for original messages
  223. var messageDict = messages.ToDictionary(m => m.Id);
  224. // Process in parallel using semaphore for concurrency control
  225. var semaphore = new SemaphoreSlim(10); // Max 10 concurrent
  226. var tasks = emailMessages.Select(async email =>
  227. {
  228. await semaphore.WaitAsync(stoppingToken);
  229. try
  230. {
  231. var success = await _emailService.SendEmailAsync(email.To, email.Subject, email.Body, email.IsHtml);
  232. if (messageDict.TryGetValue((int)email.Id, out var msg))
  233. {
  234. if (success)
  235. {
  236. msg.Status = STATUS_SUCCESS;
  237. msg.ProcessedAt = DateTime.Now;
  238. msg.ErrorMessage = null;
  239. Interlocked.Increment(ref _totalSuccess);
  240. }
  241. else
  242. {
  243. HandleFailure(msg);
  244. }
  245. }
  246. Interlocked.Increment(ref _totalProcessed);
  247. }
  248. catch (Exception ex)
  249. {
  250. _logger.LogError(ex, "Failed to send email {Id}", email.Id);
  251. if (messageDict.TryGetValue((int)email.Id, out var msg))
  252. {
  253. msg.ErrorMessage = ex.Message;
  254. HandleFailure(msg);
  255. }
  256. Interlocked.Increment(ref _totalProcessed);
  257. }
  258. finally
  259. {
  260. semaphore.Release();
  261. }
  262. });
  263. await Task.WhenAll(tasks);
  264. }
  265. private void HandleFailure(MessageQueue message)
  266. {
  267. message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
  268. message.ProcessedAt = DateTime.Now;
  269. var maxRetry = message.MaxRetry ?? 3;
  270. if (message.RetryCount >= maxRetry)
  271. {
  272. message.Status = STATUS_FAILED;
  273. Interlocked.Increment(ref _totalFailed);
  274. _logger.LogWarning("Message {Id} failed after {RetryCount} retries: {Error}",
  275. message.Id, message.RetryCount, message.ErrorMessage);
  276. }
  277. else
  278. {
  279. // Set back to pending for retry with exponential backoff delay
  280. message.Status = STATUS_PENDING;
  281. var backoffDelay = _emailService.CalculateBackoffDelay(message.RetryCount ?? 0);
  282. message.ScheduledAt = DateTime.Now.AddMilliseconds(backoffDelay);
  283. _logger.LogDebug("Message {Id} will retry in {DelayMs}ms ({RetryCount}/{MaxRetry})",
  284. message.Id, backoffDelay, message.RetryCount, maxRetry);
  285. }
  286. }
  287. private async Task MoveToHistoryAsync(
  288. ModelContext dbContext,
  289. List<MessageQueue> completedMessages,
  290. CancellationToken stoppingToken)
  291. {
  292. if (!completedMessages.Any()) return;
  293. try
  294. {
  295. var ids = string.Join(",", completedMessages.Select(m => m.Id));
  296. // Insert to history
  297. var insertSql = $@"
  298. INSERT INTO MESSAGE_QUEUE_HIS
  299. (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  300. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  301. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
  302. SELECT
  303. ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  304. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  305. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
  306. FROM MESSAGE_QUEUE
  307. WHERE ID IN ({ids})";
  308. await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
  309. // Delete from main queue
  310. var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({ids})";
  311. await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
  312. _logger.LogDebug("Moved {Count} messages to history", completedMessages.Count);
  313. }
  314. catch (Exception ex)
  315. {
  316. _logger.LogError(ex, "Failed to move messages to history");
  317. }
  318. }
  319. public override async Task StopAsync(CancellationToken cancellationToken)
  320. {
  321. _logger.LogInformation("MessageQueueWorker stopping, waiting for in-flight messages...");
  322. await base.StopAsync(cancellationToken);
  323. _logger.LogInformation("MessageQueueWorker stopped gracefully");
  324. }
  325. }