MessageQueueJob.cs 9.3 KB


  1. using Database.Database;
  2. using Esim.SendMail.Services;
  3. using Microsoft.EntityFrameworkCore;
  4. using Quartz;
  5. namespace Esim.SendMail.Jobs;
  6. /// <summary>
  7. /// Optimized background job for high-volume email sending
  8. /// - Batch processing
  9. /// - Move sent messages to history table for optimal queue performance
  10. /// - Parallel execution with rate limiting
  11. /// </summary>
  12. [DisallowConcurrentExecution] // Prevent overlapping executions
  13. public class MessageQueueJob : IJob
  14. {
  15. private static readonly log4net.ILog log = log4net.LogManager.GetLogger(typeof(MessageQueueJob));
  16. private readonly ModelContext _dbContext;
  17. private readonly IEmailService _emailService;
  18. private readonly int _maxMessagesPerRun;
  19. // Message types
  20. private const int MESSAGE_TYPE_EMAIL = 1;
  21. private const int MESSAGE_TYPE_SMS = 2;
  22. private const int MESSAGE_TYPE_PUSH = 3;
  23. // Message statuses
  24. private const int STATUS_PENDING = 0;
  25. private const int STATUS_PROCESSING = 1;
  26. private const int STATUS_SUCCESS = 2;
  27. private const int STATUS_FAILED = 3;
  28. public MessageQueueJob(ModelContext dbContext, IEmailService emailService, Microsoft.Extensions.Configuration.IConfiguration configuration)
  29. {
  30. _dbContext = dbContext;
  31. _emailService = emailService;
  32. _maxMessagesPerRun = int.Parse(configuration["Job:MaxMessagesPerRun"] ?? "500");
  33. }
  34. public async Task Execute(IJobExecutionContext context)
  35. {
  36. var startTime = DateTime.Now;
  37. log.Debug("MessageQueueJob started");
  38. try
  39. {
  40. // Get pending messages with FOR UPDATE SKIP LOCKED pattern
  41. // This allows multiple instances to run without conflicts
  42. var pendingMessages = await _dbContext.MessageQueues
  43. .Where(m => m.Status == STATUS_PENDING
  44. && (m.ScheduledAt == null || m.ScheduledAt <= DateTime.Now)
  45. && (m.RetryCount == null || m.RetryCount < m.MaxRetry))
  46. .OrderBy(m => m.Priority) // true (high priority) first
  47. .ThenBy(m => m.CreatedDate)
  48. .Take(_maxMessagesPerRun)
  49. .ToListAsync();
  50. if (!pendingMessages.Any())
  51. {
  52. log.Debug("No pending messages");
  53. return;
  54. }
  55. log.Info($"Processing {pendingMessages.Count} messages");
  56. // Mark all as processing first (atomic update)
  57. var messageIds = pendingMessages.Select(m => m.Id).ToList();
  58. await _dbContext.Database.ExecuteSqlRawAsync(
  59. $"UPDATE MESSAGE_QUEUE SET STATUS = {STATUS_PROCESSING} WHERE ID IN ({string.Join(",", messageIds)})");
  60. // Process by message type
  61. var emailMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_EMAIL).ToList();
  62. var smsMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_SMS).ToList();
  63. var pushMessages = pendingMessages.Where(m => m.MessageType == MESSAGE_TYPE_PUSH).ToList();
  64. // Process emails in parallel batches
  65. if (emailMessages.Any())
  66. {
  67. await ProcessEmailsAsync(emailMessages);
  68. }
  69. // Process SMS (placeholder)
  70. foreach (var msg in smsMessages)
  71. {
  72. msg.Status = STATUS_FAILED;
  73. msg.ErrorMessage = "SMS not implemented";
  74. msg.ProcessedAt = DateTime.Now;
  75. }
  76. // Process Push (placeholder)
  77. foreach (var msg in pushMessages)
  78. {
  79. msg.Status = STATUS_FAILED;
  80. msg.ErrorMessage = "Push not implemented";
  81. msg.ProcessedAt = DateTime.Now;
  82. }
  83. await _dbContext.SaveChangesAsync();
  84. // Move completed messages to history table
  85. await MoveToHistoryAsync(pendingMessages.Where(m => m.Status == STATUS_SUCCESS || m.Status == STATUS_FAILED).ToList());
  86. var elapsed = DateTime.Now - startTime;
  87. log.Info($"Job completed in {elapsed.TotalMilliseconds:F0}ms - Processed: {pendingMessages.Count}");
  88. }
  89. catch (Exception ex)
  90. {
  91. log.Error("MessageQueueJob error", ex);
  92. }
  93. }
  94. private async Task ProcessEmailsAsync(List<MessageQueue> messages)
  95. {
  96. log.Info($"Processing {messages.Count} emails");
  97. var emailBatch = messages.Select(m => new EmailMessage
  98. {
  99. Id = m.Id,
  100. To = m.Recipient,
  101. Subject = GetEmailSubject(m),
  102. Body = GetEmailContent(m),
  103. IsHtml = true
  104. }).ToList();
  105. // Process in parallel with rate limiting
  106. var tasks = new List<Task>();
  107. foreach (var msg in messages)
  108. {
  109. var emailMsg = emailBatch.First(e => e.Id == msg.Id);
  110. tasks.Add(Task.Run(async () =>
  111. {
  112. bool success = await _emailService.SendEmailAsync(
  113. emailMsg.To,
  114. emailMsg.Subject,
  115. emailMsg.Body,
  116. emailMsg.IsHtml);
  117. if (success)
  118. {
  119. msg.Status = STATUS_SUCCESS;
  120. msg.ProcessedAt = DateTime.Now;
  121. msg.ErrorMessage = null;
  122. }
  123. else
  124. {
  125. HandleFailure(msg);
  126. }
  127. }));
  128. }
  129. await Task.WhenAll(tasks);
  130. }
  131. private string GetEmailSubject(MessageQueue message)
  132. {
  133. if (!string.IsNullOrEmpty(message.Subject))
  134. return message.Subject;
  135. // Get from template if available
  136. if (!string.IsNullOrEmpty(message.TemplateCode))
  137. {
  138. var template = _dbContext.MessageTemplates
  139. .FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true);
  140. if (template != null)
  141. return template.Subject ?? "No Subject";
  142. }
  143. return "No Subject";
  144. }
  145. private string GetEmailContent(MessageQueue message)
  146. {
  147. string content = message.Content ?? "";
  148. // Use template if specified
  149. if (!string.IsNullOrEmpty(message.TemplateCode))
  150. {
  151. var template = _dbContext.MessageTemplates
  152. .FirstOrDefault(t => t.TemplateCode == message.TemplateCode && t.Status == true);
  153. if (template != null)
  154. {
  155. content = template.Content ?? content;
  156. }
  157. }
  158. // Replace placeholders
  159. if (!string.IsNullOrEmpty(message.TemplateData))
  160. {
  161. try
  162. {
  163. var data = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(message.TemplateData);
  164. if (data != null)
  165. {
  166. foreach (var kvp in data)
  167. {
  168. content = content.Replace($"{{{{{kvp.Key}}}}}", kvp.Value);
  169. }
  170. }
  171. }
  172. catch (Exception ex)
  173. {
  174. log.Warn($"Failed to parse template data: {ex.Message}");
  175. }
  176. }
  177. return content;
  178. }
  179. private void HandleFailure(MessageQueue message)
  180. {
  181. message.RetryCount = (byte?)((message.RetryCount ?? 0) + 1);
  182. message.ProcessedAt = DateTime.Now;
  183. if (message.RetryCount >= message.MaxRetry)
  184. {
  185. message.Status = STATUS_FAILED;
  186. log.Warn($"Message {message.Id} failed after max retries");
  187. }
  188. else
  189. {
  190. message.Status = STATUS_PENDING; // Will be retried
  191. log.Debug($"Message {message.Id} will retry ({message.RetryCount}/{message.MaxRetry})");
  192. }
  193. }
  194. /// <summary>
  195. /// Move completed messages to MESSAGE_QUEUE_HIS for optimal queue performance
  196. /// </summary>
  197. private async Task MoveToHistoryAsync(List<MessageQueue> completedMessages)
  198. {
  199. if (!completedMessages.Any()) return;
  200. try
  201. {
  202. var ids = completedMessages.Select(m => m.Id).ToList();
  203. var idsString = string.Join(",", ids);
  204. // Insert into history table
  205. var insertSql = $@"
  206. INSERT INTO MESSAGE_QUEUE_HIS
  207. (ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  208. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  209. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, MOVED_DATE)
  210. SELECT
  211. ID, MESSAGE_TYPE, RECIPIENT, SUBJECT, CONTENT, TEMPLATE_CODE, TEMPLATE_DATA,
  212. PRIORITY, STATUS, SCHEDULED_AT, PROCESSED_AT, RETRY_COUNT, MAX_RETRY,
  213. ERROR_MESSAGE, CREATED_BY, CREATED_DATE, SYSDATE
  214. FROM MESSAGE_QUEUE
  215. WHERE ID IN ({idsString})";
  216. await _dbContext.Database.ExecuteSqlRawAsync(insertSql);
  217. // Delete from main queue
  218. var deleteSql = $"DELETE FROM MESSAGE_QUEUE WHERE ID IN ({idsString})";
  219. await _dbContext.Database.ExecuteSqlRawAsync(deleteSql);
  220. log.Info($"Moved {completedMessages.Count} messages to history");
  221. }
  222. catch (Exception ex)
  223. {
  224. log.Error($"Failed to move messages to history: {ex.Message}", ex);
  225. // Don't throw - messages are already processed, just not moved to history
  226. }
  227. }
  228. }