|
@@ -0,0 +1,176 @@
|
|
|
|
|
+using Database.Database;
|
|
|
|
|
+using Microsoft.EntityFrameworkCore;
|
|
|
|
|
+using Microsoft.Extensions.Logging;
|
|
|
|
|
+
|
|
|
|
|
+namespace Esim.SendMail;
|
|
|
|
|
+
|
|
|
|
|
+/// <summary>
|
|
|
|
|
+/// Background worker to clean up expired messages from MESSAGE_QUEUE.
|
|
|
|
|
+/// Messages older than configured expiration days are moved to MESSAGE_QUEUE_HIS with failed status and deleted.
|
|
|
|
|
+/// </summary>
|
|
|
|
|
+public class ExpiredMessageCleanupWorker : BackgroundService
|
|
|
|
|
+{
|
|
|
|
|
+ private readonly ILogger<ExpiredMessageCleanupWorker> _logger;
|
|
|
|
|
+ private readonly IServiceProvider _serviceProvider;
|
|
|
|
|
+ private readonly int _intervalMinutes;
|
|
|
|
|
+ private readonly int _expirationDays;
|
|
|
|
|
+
|
|
|
|
|
+ // Message statuses
|
|
|
|
|
+ private const int STATUS_PENDING = 0;
|
|
|
|
|
+ private const int STATUS_PROCESSING = 1;
|
|
|
|
|
+ private const int STATUS_FAILED = 3;
|
|
|
|
|
+
|
|
|
|
|
+ public ExpiredMessageCleanupWorker(
|
|
|
|
|
+ ILogger<ExpiredMessageCleanupWorker> logger,
|
|
|
|
|
+ IServiceProvider serviceProvider,
|
|
|
|
|
+ IConfiguration configuration)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger = logger;
|
|
|
|
|
+ _serviceProvider = serviceProvider;
|
|
|
|
|
+ _intervalMinutes = int.Parse(configuration["Cleanup:IntervalMinutes"] ?? "60"); // Default: 1 hour
|
|
|
|
|
+ _expirationDays = int.Parse(configuration["Cleanup:ExpirationDays"] ?? "1"); // Default: 1 day
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogInformation("ExpiredMessageCleanupWorker started. Interval: {Interval} minutes, Expiration: {Days} day(s)",
|
|
|
|
|
+ _intervalMinutes, _expirationDays);
|
|
|
|
|
+
|
|
|
|
|
+ // Wait a bit for initialization
|
|
|
|
|
+ await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ while (!stoppingToken.IsCancellationRequested)
|
|
|
|
|
+ {
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ await CleanupExpiredMessagesAsync(stoppingToken);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
|
|
|
+ {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogError(ex, "Error in cleanup loop");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ await Task.Delay(TimeSpan.FromMinutes(_intervalMinutes), stoppingToken);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
|
|
|
+ {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ _logger.LogInformation("ExpiredMessageCleanupWorker stopped.");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private async Task CleanupExpiredMessagesAsync(CancellationToken stoppingToken)
|
|
|
|
|
+ {
|
|
|
|
|
+ var startTime = DateTime.Now;
|
|
|
|
|
+
|
|
|
|
|
+ using var scope = _serviceProvider.CreateScope();
|
|
|
|
|
+ var dbContext = scope.ServiceProvider.GetRequiredService<ModelContext>();
|
|
|
|
|
+
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ // Find messages older than expiration days that are still pending or processing
|
|
|
|
|
+ var expirationDate = DateTime.Now.AddDays(-_expirationDays);
|
|
|
|
|
+
|
|
|
|
|
+ var expiredMessages = await dbContext.MessageQueues
|
|
|
|
|
+ .Where(m => m.CreatedDate <= expirationDate
|
|
|
|
|
+ && (m.Status == STATUS_PENDING || m.Status == STATUS_PROCESSING))
|
|
|
|
|
+ .OrderBy(m => m.CreatedDate)
|
|
|
|
|
+ .ToListAsync(stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ if (!expiredMessages.Any())
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogDebug("No expired messages found");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ _logger.LogWarning("Found {Count} expired messages (older than {Days} day(s)). Moving to history and deleting...",
|
|
|
|
|
+ expiredMessages.Count, _expirationDays);
|
|
|
|
|
+
|
|
|
|
|
+ // Log sample of expired messages for audit
|
|
|
|
|
+ var sampleSize = Math.Min(5, expiredMessages.Count);
|
|
|
|
|
+ for (int i = 0; i < sampleSize; i++)
|
|
|
|
|
+ {
|
|
|
|
|
+ var msg = expiredMessages[i];
|
|
|
|
|
+ var age = DateTime.Now - (msg.CreatedDate ?? DateTime.Now);
|
|
|
|
|
+ var subjectPreview = msg.Subject != null
|
|
|
|
|
+ ? msg.Subject.Substring(0, Math.Min(50, msg.Subject.Length))
|
|
|
|
|
+ : "(no subject)";
|
|
|
|
|
+ _logger.LogInformation(
|
|
|
|
|
+ "Expired message sample: ID={Id}, Recipient={Recipient}, Subject={Subject}, Age={Age:F1}h, Status={Status}, Retries={Retry}/{Max}",
|
|
|
|
|
+ msg.Id, msg.Recipient, subjectPreview,
|
|
|
|
|
+ age.TotalHours, msg.Status, msg.RetryCount ?? 0, msg.MaxRetry ?? 3);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (expiredMessages.Count > sampleSize)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogInformation("... and {More} more expired messages", expiredMessages.Count - sampleSize);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Move to MESSAGE_QUEUE_HIS with failed status
|
|
|
|
|
+ await MoveToHistoryAsync(dbContext, expiredMessages, stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ // Delete from MESSAGE_QUEUE
|
|
|
|
|
+ var ids = expiredMessages.Select(m => m.Id).ToList();
|
|
|
|
|
+ var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({string.Join(",", ids)})";
|
|
|
|
|
+ await dbContext.Database.ExecuteSqlRawAsync(deleteSql, stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ var elapsed = DateTime.Now - startTime;
|
|
|
|
|
+ _logger.LogInformation("Successfully cleaned up {Count} expired messages in {Elapsed:F0}ms. Messages archived to MESSAGE_QUEUE_HIS with FAILED status.",
|
|
|
|
|
+ expiredMessages.Count, elapsed.TotalMilliseconds);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogError(ex, "Error cleaning up expired messages");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private async Task MoveToHistoryAsync(
|
|
|
|
|
+ ModelContext dbContext,
|
|
|
|
|
+ List<MessageQueue> messages,
|
|
|
|
|
+ CancellationToken stoppingToken)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (!messages.Any()) return;
|
|
|
|
|
+
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ var ids = string.Join(",", messages.Select(m => m.Id));
|
|
|
|
|
+
|
|
|
|
|
+ // Insert to history with STATUS = 3 (FAILED - EXPIRED)
|
|
|
|
|
+ var insertSql = $@"
|
|
|
|
|
+ INSERT INTO MESSAGE_QUEUE_HIS
|
|
|
|
|
+ (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
|
|
|
|
|
+ PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
|
|
|
|
|
+ ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
|
|
|
|
|
+ SELECT
|
|
|
|
|
+ ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
|
|
|
|
|
+ PRIORITY, {STATUS_FAILED} AS STATUS, SCHEDULED_AT, SYSDATE AS PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
|
|
|
|
|
+ 'EXPIRED: Message older than {_expirationDays} day(s) - Auto-deleted to prevent spam and table bloat. Created: ' || TO_CHAR(CREATED_DATE, 'YYYY-MM-DD HH24:MI:SS') AS ERROR_MESSAGE,
|
|
|
|
|
+ CREATED_BY, CREATED_DATE, SYSDATE
|
|
|
|
|
+ FROM MESSAGE_QUEUE
|
|
|
|
|
+ WHERE ID IN ({ids})";
|
|
|
|
|
+
|
|
|
|
|
+ await dbContext.Database.ExecuteSqlRawAsync(insertSql, stoppingToken);
|
|
|
|
|
+
|
|
|
|
|
+ _logger.LogDebug("Moved {Count} expired messages to MESSAGE_QUEUE_HIS with FAILED status", messages.Count);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogError(ex, "Failed to move expired messages to history");
|
|
|
|
|
+ throw; // Re-throw to prevent deletion if archival fails
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public override async Task StopAsync(CancellationToken cancellationToken)
|
|
|
|
|
+ {
|
|
|
|
|
+ _logger.LogInformation("ExpiredMessageCleanupWorker stopping...");
|
|
|
|
|
+ await base.StopAsync(cancellationToken);
|
|
|
|
|
+ _logger.LogInformation("ExpiredMessageCleanupWorker stopped gracefully");
|
|
|
|
|
+ }
|
|
|
|
|
+}
|