MessageQueueWorker.cs 15 KB


  1. using Database.Database;
  2. using Esim.SendMail.Services;
  3. using Microsoft.EntityFrameworkCore;
  4. using Microsoft.EntityFrameworkCore.Storage;
  5. using Microsoft.Extensions.Logging;
  6. namespace Esim.SendMail;
  7. /// <summary>
  8. /// High-performance background worker for processing message queue.
  9. /// Features:
  10. /// - Row-level locking to prevent duplicate processing
  11. /// - Configurable batch size and interval
  12. /// - Graceful shutdown with in-flight message handling
  13. /// - Automatic retry with exponential backoff
  14. /// - Metrics logging
  15. /// </summary>
  16. public class MessageQueueWorker : BackgroundService
  17. {
  18. private readonly ILogger<MessageQueueWorker> _logger;
  19. private readonly IServiceProvider _serviceProvider;
  20. private readonly IEmailService _emailService;
  21. private readonly int _intervalSeconds;
  22. private readonly int _maxMessagesPerRun;
  23. private readonly int _metricsLogIntervalSeconds;
  24. // Message types
  25. private const int MESSAGE_TYPE_EMAIL = 1;
  26. private const int MESSAGE_TYPE_SMS = 2;
  27. private const int MESSAGE_TYPE_PUSH = 3;
  28. // Message statuses
  29. private const int STATUS_PENDING = 0;
  30. private const int STATUS_PROCESSING = 1;
  31. private const int STATUS_SUCCESS = 2;
  32. private const int STATUS_FAILED = 3;
  33. private DateTime _lastMetricsLog = DateTime.MinValue;
  34. private long _totalProcessed = 0;
  35. private long _totalSuccess = 0;
  36. private long _totalFailed = 0;
  37. public MessageQueueWorker(
  38. ILogger<MessageQueueWorker> logger,
  39. IServiceProvider serviceProvider,
  40. IEmailService emailService,
  41. IConfiguration configuration)
  42. {
  43. _logger = logger;
  44. _serviceProvider = serviceProvider;
  45. _emailService = emailService;
  46. _intervalSeconds = int.Parse(configuration["Job:IntervalSeconds"] ?? "10");
  47. _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
  48. _metricsLogIntervalSeconds = int.Parse(configuration["Job:MetricsLogIntervalSeconds"] ?? "60");
  49. }
  50. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  51. {
  52. _logger.LogInformation("MessageQueueWorker started. Interval: {Interval}s, MaxPerRun: {Max}",
  53. _intervalSeconds, _maxMessagesPerRun);
  54. // Wait a bit for initialization
  55. await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
  56. while (!stoppingToken.IsCancellationRequested)
  57. {
  58. try
  59. {
  60. await ProcessMessagesAsync(stoppingToken);
  61. // Log metrics periodically
  62. if ((DateTime.Now - _lastMetricsLog).TotalSeconds >= _metricsLogIntervalSeconds)
  63. {
  64. LogMetrics();
  65. _lastMetricsLog = DateTime.Now;
  66. }
  67. }
  68. catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
  69. {
  70. break;
  71. }
  72. catch (Exception ex)
  73. {
  74. _logger.LogError(ex, "Error in message processing loop");
  75. }
  76. try
  77. {
  78. await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
  79. }
  80. catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
  81. {
  82. break;
  83. }
  84. }
  85. _logger.LogInformation("MessageQueueWorker stopped. Total processed: {Total}, Success: {Success}, Failed: {Failed}",
  86. _totalProcessed, _totalSuccess, _totalFailed);
  87. }
  88. private void LogMetrics()
  89. {
  90. var emailMetrics = _emailService.GetMetrics();
  91. _logger.LogInformation(
  92. "Metrics: Processed={Processed} | Success={Success} | Failed={Failed} | " +
  93. "EmailRate={Rate}/min | AvgLatency={Latency:F0}ms",
  94. _totalProcessed, _totalSuccess, _totalFailed,
  95. emailMetrics.EmailsLastMinute, emailMetrics.AverageLatencyMs);
  96. }
  97. private async Task ProcessMessagesAsync(CancellationToken stoppingToken)
  98. {
  99. var startTime = DateTime.Now;
  100. // Create a new scope for DbContext
  101. using var scope = _serviceProvider.CreateScope();
  102. var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
  103. try
  104. {
  105. // Get and lock pending messages using Oracle's FOR UPDATE SKIP LOCKED
  106. // This prevents duplicate processing when running multiple workers
  107. var pendingMessages = await GetAndLockPendingMessagesAsync(dbContext, stoppingToken);
  108. if (!pendingMessages.Any())
  109. {
  110. _logger.LogDebug("No pending messages");
  111. return;
  112. }
  113. _logger.LogInformation("Processing {Count} messages", pendingMessages.Count);
  114. // Process by message type
  115. var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
  116. var otherMessages = pendingMessages.Where(m => m.MessageType != MESSAGE_TYPE_EMAIL).ToList();
  117. // Process emails asynchronously
  118. if (emailMessages.Any())
  119. {
  120. await ProcessEmailsAsync(dbContext, emailMessages, stoppingToken);
  121. }
  122. // Mark SMS/Push as not implemented
  123. foreach (var msg in otherMessages)
  124. {
  125. msg.Status = STATUS_FAILED;
  126. msg.ErrorMessage = "Message type not implemented";
  127. msg.ProcessedAt = DateTime.Now;
  128. Interlocked.Increment(ref _totalFailed);
  129. }
  130. await dbContext.SaveChangesAsync(stoppingToken);
  131. // Move completed messages to history
  132. var completedMessages = pendingMessages
  133. .Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED)
  134. .ToList();
  135. if (completedMessages.Any())
  136. {
  137. await MoveToHistoryAsync(dbContext, completedMessages, stoppingToken);
  138. }
  139. var elapsed = DateTime.Now - startTime;
  140. _logger.LogInformation("Processed {Count} messages in {Elapsed:F0}ms",
  141. pendingMessages.Count, elapsed.TotalMilliseconds);
  142. }
  143. catch (OperationCanceledException)
  144. {
  145. _logger.LogInformation("Processing cancelled");
  146. }
  147. catch (Exception ex)
  148. {
  149. _logger.LogError(ex, "Error processing messages");
  150. }
  151. }
  152. private async Task<List<MessageQueue>> GetAndLockPendingMessagesAsync(
  153. ModelContext dbContext,
  154. CancellationToken stoppingToken)
  155. {
  156. // Step 1: Lock rows and get IDs using raw SQL with FOR UPDATE SKIP LOCKED
  157. // This prevents duplicate processing across multiple workers
  158. var lockSql = $@"
  159. SELECT ID
  160. FROM {dbContext.Model.FindEntityType(typeof(MessageQueue))?.GetSchemaQualifiedTableName() ?? "MESSAGE_QUEUE"}
  161. WHERE STATUS = {STATUS_PENDING}
  162. AND (SCHEDULED_AT IS NULL OR SCHEDULED_AT <= SYSDATE)
  163. AND (RETRY_COUNT IS NULL OR RETRY_COUNT < NVL(MAX_RETRY, 3))
  164. ORDER BY PRIORITY, CREATED_DATE
  165. FETCH FIRST {_maxMessagesPerRun} ROWS ONLY
  166. FOR UPDATE SKIP LOCKED";
  167. try
  168. {
  169. // Execute raw SQL to lock rows and get IDs
  170. var connection = dbContext.Database.GetDbConnection();
  171. await dbContext.Database.OpenConnectionAsync(stoppingToken);
  172. var lockedIds = new List<int>();
  173. using (var command = connection.CreateCommand())
  174. {
  175. command.CommandText = lockSql;
  176. command.Transaction = dbContext.Database.CurrentTransaction?.GetDbTransaction();
  177. using (var reader = await command.ExecuteReaderAsync(stoppingToken))
  178. {
  179. while (await reader.ReadAsync(stoppingToken))
  180. {
  181. lockedIds.Add(reader.GetInt32(0));
  182. }
  183. }
  184. }
  185. if (!lockedIds.Any())
  186. {
  187. return new List<MessageQueue>();
  188. }
  189. // Step 2: Query the full entities using EF Core with the locked IDs
  190. var messages = await dbContext.MessageQueues
  191. .Where(m => lockedIds.Contains(m.Id))
  192. .ToListAsync(stoppingToken);
  193. // Step 3: Mark as processing immediately while rows are still locked
  194. foreach (var msg in messages)
  195. {
  196. msg.Status = STATUS_PROCESSING;
  197. }
  198. await dbContext.SaveChangesAsync(stoppingToken);
  199. _logger.LogDebug("Locked and fetched {Count} messages for processing", messages.Count);
  200. return messages;
  201. }
  202. catch (Exception ex)
  203. {
  204. _logger.LogWarning(ex, "Failed to lock messages with FOR UPDATE, falling back to simple query");
  205. // Fallback: Simple EF Core query without locking
  206. var messages = await dbContext.MessageQueues
  207. .Where(m => m.Status == STATUS_PENDING
  208. && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
  209. && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
  210. .OrderBy(m => m.Priority)
  211. .ThenBy(m => m.CreatedDate)
  212. .Take(_maxMessagesPerRun)
  213. .ToListAsync(stoppingToken);
  214. if (messages.Any())
  215. {
  216. foreach (var msg in messages)
  217. {
  218. msg.Status = STATUS_PROCESSING;
  219. }
  220. await dbContext.SaveChangesAsync(stoppingToken);
  221. }
  222. return messages;
  223. }
  224. }
  225. private async Task ProcessEmailsAsync(
  226. ModelContext dbContext,
  227. List<MessageQueue> messages,
  228. CancellationToken stoppingToken)
  229. {
  230. _logger.LogInformation("Processing {Count} emails", messages.Count);
  231. // Convert to EmailMessage for batch processing
  232. var emailMessages = messages.Select(m => new EmailMessage
  233. {
  234. Id = m.Id,
  235. To = m.Recipient ?? "",
  236. Subject = m.Subject ?? "No Subject",
  237. Body = m.Content ?? "",
  238. IsHtml = true
  239. }).ToList();
  240. // Create a lookup for original messages
  241. var messageDict = messages.ToDictionary(m => m.Id);
  242. // Process in parallel using semaphore for concurrency control
  243. var semaphore = new SemaphoreSlim(10); // Max 10 concurrent
  244. var tasks = emailMessages.Select(async email =>
  245. {
  246. await semaphore.WaitAsync(stoppingToken);
  247. try
  248. {
  249. var success = await _emailService.SendEmailAsync(email.To, email.Subject, email.Body, email.IsHtml);
  250. if (messageDict.TryGetValue((int)email.Id, out var msg))
  251. {
  252. if (success)
  253. {
  254. msg.Status = STATUS_SUCCESS;
  255. msg.ProcessedAt = DateTime.Now;
  256. msg.ErrorMessage = null;
  257. Interlocked.Increment(ref _totalSuccess);
  258. }
  259. else
  260. {
  261. HandleFailure(msg);
  262. }
  263. }
  264. Interlocked.Increment(ref _totalProcessed);
  265. }
  266. catch (Exception ex)
  267. {
  268. _logger.LogError(ex, "Failed to send email {Id}", email.Id);
  269. if (messageDict.TryGetValue((int)email.Id, out var msg))
  270. {
  271. msg.ErrorMessage = ex.Message;
  272. HandleFailure(msg);
  273. }
  274. Interlocked.Increment(ref _totalProcessed);
  275. }
  276. finally
  277. {
  278. semaphore.Release();
  279. }
  280. });
  281. await Task.WhenAll(tasks);
  282. }
  283. private void HandleFailure(MessageQueue message)
  284. {
  285. message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
  286. message.ProcessedAt = DateTime.Now;
  287. var maxRetry = message.MaxRetry ?? 3;
  288. if (message.RetryCount >= maxRetry)
  289. {
  290. message.Status = STATUS_FAILED;
  291. Interlocked.Increment(ref _totalFailed);
  292. _logger.LogWarning("Message {Id} failed after {RetryCount} retries: {Error}",
  293. message.Id, message.RetryCount, message.ErrorMessage);
  294. }
  295. else
  296. {
  297. // Set back to pending for retry with exponential backoff delay
  298. message.Status = STATUS_PENDING;
  299. var backoffDelay = _emailService.CalculateBackoffDelay(message.RetryCount ?? 0);
  300. message.ScheduledAt = DateTime.Now.AddMilliseconds(backoffDelay);
  301. _logger.LogDebug("Message {Id} will retry in {DelayMs}ms ({RetryCount}/{MaxRetry})",
  302. message.Id, backoffDelay, message.RetryCount, maxRetry);
  303. }
  304. }
  305. private async Task MoveToHistoryAsync(
  306. ModelContext dbContext,
  307. List<MessageQueue> completedMessages,
  308. CancellationToken stoppingToken)
  309. {
  310. if (!completedMessages.Any()) return;
  311. try
  312. {
  313. var ids = string.Join(",", completedMessages.Select(m => m.Id));
  314. // Insert to history
  315. var insertSql = $@"
  316. INSERT INTO MESSAGE_QUEUE_HIS
  317. (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  318. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  319. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
  320. SELECT
  321. ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  322. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  323. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
  324. FROM MESSAGE_QUEUE
  325. WHERE ID IN ({ids})";
  326. await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
  327. // Delete from main queue
  328. var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({ids})";
  329. await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
  330. _logger.LogDebug("Moved {Count} messages to history", completedMessages.Count);
  331. }
  332. catch (Exception ex)
  333. {
  334. _logger.LogError(ex, "Failed to move messages to history");
  335. }
  336. }
  337. public override async Task StopAsync(CancellationToken cancellationToken)
  338. {
  339. _logger.LogInformation("MessageQueueWorker stopping, waiting for in-flight messages...");
  340. await base.StopAsync(cancellationToken);
  341. _logger.LogInformation("MessageQueueWorker stopped gracefully");
  342. }
  343. }