ExpiredMessageCleanupWorker.cs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. using Database.Database;
  2. using Microsoft.EntityFrameworkCore;
  3. using Microsoft.Extensions.Logging;
  4. namespace Esim.SendMail;
  5. /// <summary>
  6. /// Background worker to clean up expired messages from MESSAGE_QUEUE.
  7. /// Messages older than configured expiration days are moved to MESSAGE_QUEUE_HIS with failed status and deleted.
  8. /// </summary>
  9. public class ExpiredMessageCleanupWorker : BackgroundService
  10. {
  11. private readonly ILogger<ExpiredMessageCleanupWorker> _logger;
  12. private readonly IServiceProvider _serviceProvider;
  13. private readonly int _intervalMinutes;
  14. private readonly int _expirationDays;
  15. // Message statuses
  16. private const int STATUS_PENDING = 0;
  17. private const int STATUS_PROCESSING = 1;
  18. private const int STATUS_FAILED = 3;
  19. public ExpiredMessageCleanupWorker(
  20. ILogger<ExpiredMessageCleanupWorker> logger,
  21. IServiceProvider serviceProvider,
  22. IConfiguration configuration)
  23. {
  24. _logger = logger;
  25. _serviceProvider = serviceProvider;
  26. _intervalMinutes = int.Parse(configuration["Cleanup:IntervalMinutes"] ?? "60"); // Default: 1 hour
  27. _expirationDays = int.Parse(configuration["Cleanup:ExpirationDays"] ?? "1"); // Default: 1 day
  28. }
  29. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  30. {
  31. _logger.LogInformation("ExpiredMessageCleanupWorker started. Interval: {Interval} minutes, Expiration: {Days} day(s)",
  32. _intervalMinutes, _expirationDays);
  33. // Wait a bit for initialization
  34. await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
  35. while (!stoppingToken.IsCancellationRequested)
  36. {
  37. try
  38. {
  39. await CleanupExpiredMessagesAsync(stoppingToken);
  40. }
  41. catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
  42. {
  43. break;
  44. }
  45. catch (Exception ex)
  46. {
  47. _logger.LogError(ex, "Error in cleanup loop");
  48. }
  49. try
  50. {
  51. await Task.Delay(TimeSpan.FromMinutes(_intervalMinutes), stoppingToken);
  52. }
  53. catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
  54. {
  55. break;
  56. }
  57. }
  58. _logger.LogInformation("ExpiredMessageCleanupWorker stopped.");
  59. }
  60. private async Task CleanupExpiredMessagesAsync(CancellationToken stoppingToken)
  61. {
  62. var startTime = DateTime.Now;
  63. using var scope = _serviceProvider.CreateScope();
  64. var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
  65. try
  66. {
  67. // Find messages older than expiration days that are still pending or processing
  68. var expirationDate = DateTime.Now.AddDays(-_expirationDays);
  69. var expiredMessages = await dbContext.MessageQueues
  70. .Where(m => m.CreatedDate <= expirationDate
  71. && (m.Status == STATUS_PENDING || m.Status == STATUS_PROCESSING))
  72. .OrderBy(m => m.CreatedDate)
  73. .ToListAsync(stoppingToken);
  74. if (!expiredMessages.Any())
  75. {
  76. _logger.LogDebug("No expired messages found");
  77. return;
  78. }
  79. _logger.LogWarning("Found {Count} expired messages (older than {Days} day(s)). Moving to history and deleting...",
  80. expiredMessages.Count, _expirationDays);
  81. // Log sample of expired messages for audit
  82. var sampleSize = Math.Min(5, expiredMessages.Count);
  83. for (int i = 0; i < sampleSize; i++)
  84. {
  85. var msg = expiredMessages[i];
  86. var age = DateTime.Now - (msg.CreatedDate ?? DateTime.Now);
  87. var subjectPreview = msg.Subject != null
  88. ? msg.Subject.Substring(0, Math.Min(50, msg.Subject.Length))
  89. : "(no subject)";
  90. _logger.LogInformation(
  91. "Expired message sample: ID={Id}, Recipient={Recipient}, Subject={Subject}, Age={Age:F1}h, Status={Status}, Retries={Retry}/{Max}",
  92. msg.Id, msg.Recipient, subjectPreview,
  93. age.TotalHours, msg.Status, msg.RetryCount ?? 0, msg.MaxRetry ?? 3);
  94. }
  95. if (expiredMessages.Count > sampleSize)
  96. {
  97. _logger.LogInformation("... and {More} more expired messages", expiredMessages.Count - sampleSize);
  98. }
  99. // Move to MESSAGE_QUEUE_HIS with failed status
  100. await MoveToHistoryAsync(dbContext, expiredMessages, stoppingToken);
  101. // Delete from MESSAGE_QUEUE
  102. var ids = expiredMessages.Select(m => m.Id).ToList();
  103. var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({string.Join(",", ids)})";
  104. await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
  105. var elapsed = DateTime.Now - startTime;
  106. _logger.LogInformation("Successfully cleaned up {Count} expired messages in {Elapsed:F0}ms. Messages archived to MESSAGE_QUEUE_HIS with FAILED status.",
  107. expiredMessages.Count, elapsed.TotalMilliseconds);
  108. }
  109. catch (Exception ex)
  110. {
  111. _logger.LogError(ex, "Error cleaning up expired messages");
  112. }
  113. }
  114. private async Task MoveToHistoryAsync(
  115. ModelContext dbContext,
  116. List<MessageQueue> messages,
  117. CancellationToken stoppingToken)
  118. {
  119. if (!messages.Any()) return;
  120. try
  121. {
  122. var ids = string.Join(",", messages.Select(m => m.Id));
  123. // Insert to history with STATUS = 3 (FAILED - EXPIRED)
  124. var insertSql = $@"
  125. INSERT INTO MESSAGE_QUEUE_HIS
  126. (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  127. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  128. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
  129. SELECT
  130. ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  131. PRIORITY, {STATUS_FAILED} AS STATUS, SCHEDULED_AT, SYSDATE AS PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  132. 'EXPIRED: Message older than {_expirationDays} day(s) - Auto-deleted to prevent spam and table bloat. Created: ' || TO_CHAR(CREATED_DATE, 'YYYY-MM-DD HH24:MI:SS') AS ERROR_MESSAGE,
  133. CREATED_BY, CREATED_DATE, SYSDATE
  134. FROM MESSAGE_QUEUE
  135. WHERE ID IN ({ids})";
  136. await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
  137. _logger.LogDebug("Moved {Count} expired messages to MESSAGE_QUEUE_HIS with FAILED status", messages.Count);
  138. }
  139. catch (Exception ex)
  140. {
  141. _logger.LogError(ex, "Failed to move expired messages to history");
  142. throw; // Re-throw to prevent deletion if archival fails
  143. }
  144. }
  145. public override async Task StopAsync(CancellationToken cancellationToken)
  146. {
  147. _logger.LogInformation("ExpiredMessageCleanupWorker stopping...");
  148. await base.StopAsync(cancellationToken);
  149. _logger.LogInformation("ExpiredMessageCleanupWorker stopped gracefully");
  150. }
  151. }