using System.Collections.Concurrent;
using System.Threading.Channels;
using MailKit.Net.Smtp;
using MailKit.Security;
using MimeKit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Esim.SendMail.Services;
///
/// High-performance email service optimized for millions of messages
/// Features: Connection pooling, rate limiting, exponential backoff
///
public interface IEmailService
{
Task SendEmailAsync(string to, string subject, string body, bool isHtml = true);
Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default);
int CalculateBackoffDelay(int retryCount);
EmailMetrics GetMetrics();
void Dispose();
}
public class EmailMessage
{
public decimal Id { get; set; }
public string To { get; set; } = null!;
public string Subject { get; set; } = null!;
public string Body { get; set; } = null!;
public bool IsHtml { get; set; } = true;
}
public class EmailResult
{
public decimal MessageId { get; set; }
public bool Success { get; set; }
public string? ErrorMessage { get; set; }
public bool IsRetryable { get; set; }
}
public class EmailMetrics
{
public long TotalSent { get; set; }
public long TotalSuccess { get; set; }
public long TotalFailed { get; set; }
public long TotalRetried { get; set; }
public double AverageLatencyMs { get; set; }
public int EmailsLastMinute { get; set; }
public DateTime LastSentAt { get; set; }
}
public class HighPerformanceEmailService : IEmailService, IDisposable
{
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
private readonly string _smtpServer;
private readonly int _smtpPort;
private readonly string _senderEmail;
private readonly string _senderName;
private readonly string _senderPassword;
private readonly bool _enableSsl;
private readonly int _connectionPoolSize;
private readonly int _maxConcurrentSends;
private readonly int _maxEmailsPerMinute;
private readonly int _baseRetryDelayMs;
private readonly int _maxRetryDelayMs;
// Connection pool for SMTP clients
private readonly ConcurrentBag _connectionPool;
private readonly SemaphoreSlim _poolSemaphore;
// Rate limiting
private readonly SemaphoreSlim _rateLimitSemaphore;
private readonly ConcurrentQueue _sentTimestamps;
private readonly object _rateLimitLock = new();
// Metrics tracking
private long _totalSent;
private long _totalSuccess;
private long _totalFailed;
private long _totalRetried;
private long _totalLatencyMs;
private DateTime _lastSentAt;
private bool _disposed = false;
public HighPerformanceEmailService(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
// SMTP configuration
_smtpServer = _configuration["Email:SmtpServer"] ?? "smtp.gmail.com";
_smtpPort = int.Parse(_configuration["Email:SmtpPort"] ?? "587");
_senderEmail = _configuration["Email:SenderEmail"] ?? "";
_senderName = _configuration["Email:SenderName"] ?? "EsimLao";
_senderPassword = _configuration["Email:SenderPassword"] ?? "";
_enableSsl = bool.Parse(_configuration["Email:EnableSsl"] ?? "true");
// Performance configuration
_connectionPoolSize = int.Parse(_configuration["Email:ConnectionPoolSize"] ?? "5");
_maxConcurrentSends = int.Parse(_configuration["Email:MaxConcurrentSends"] ?? "10");
_maxEmailsPerMinute = int.Parse(_configuration["Email:MaxEmailsPerMinute"] ?? "30");
_baseRetryDelayMs = int.Parse(_configuration["Email:BaseRetryDelayMs"] ?? "2000");
_maxRetryDelayMs = int.Parse(_configuration["Email:MaxRetryDelayMs"] ?? "60000");
// Initialize pools
_connectionPool = new ConcurrentBag();
_poolSemaphore = new SemaphoreSlim(_connectionPoolSize, _connectionPoolSize);
// Initialize rate limiting
_rateLimitSemaphore = new SemaphoreSlim(_maxConcurrentSends, _maxConcurrentSends);
_sentTimestamps = new ConcurrentQueue();
// Pre-initialize connection pool
InitializeConnectionPool();
_logger.LogInformation(
"EmailService initialized: Pool={PoolSize}, Concurrent={Concurrent}, RateLimit={RateLimit}/min",
_connectionPoolSize, _maxConcurrentSends, _maxEmailsPerMinute);
}
private void InitializeConnectionPool()
{
_logger.LogInformation("Initializing SMTP connection pool with {PoolSize} connections", _connectionPoolSize);
for (int i = 0; i < _connectionPoolSize; i++)
{
try
{
var client = CreateConnectedClient();
if (client != null)
{
_connectionPool.Add(client);
}
}
catch (Exception ex)
{
_logger.LogWarning("Failed to create initial SMTP connection {Index}: {Message}", i + 1, ex.Message);
}
}
_logger.LogInformation("Connection pool initialized with {Count} connections", _connectionPool.Count);
}
private SmtpClient? CreateConnectedClient()
{
try
{
var client = new SmtpClient();
client.Timeout = 30000; // 30 seconds
client.Connect(_smtpServer, _smtpPort, _enableSsl ? SecureSocketOptions.StartTls : SecureSocketOptions.None);
client.Authenticate(_senderEmail, _senderPassword);
return client;
}
catch (Exception ex)
{
_logger.LogError("Failed to create SMTP connection: {Message}", ex.Message);
return null;
}
}
private async Task GetClientFromPoolAsync(CancellationToken cancellationToken = default)
{
await _poolSemaphore.WaitAsync(cancellationToken);
if (_connectionPool.TryTake(out var client))
{
if (client.IsConnected)
{
return client;
}
try { client.Dispose(); } catch { }
}
return CreateConnectedClient();
}
private void ReturnClientToPool(SmtpClient? client)
{
if (client != null && client.IsConnected)
{
_connectionPool.Add(client);
}
else
{
try
{
var newClient = CreateConnectedClient();
if (newClient != null)
{
_connectionPool.Add(newClient);
}
}
catch { }
}
_poolSemaphore.Release();
}
///
/// Apply rate limiting - waits if we've exceeded emails per minute
///
private async Task ApplyRateLimitAsync(CancellationToken cancellationToken = default)
{
var now = DateTime.UtcNow;
var oneMinuteAgo = now.AddMinutes(-1);
// Clean up old timestamps
while (_sentTimestamps.TryPeek(out var oldestTime) && oldestTime < oneMinuteAgo)
{
_sentTimestamps.TryDequeue(out _);
}
// Check if we need to wait
while (_sentTimestamps.Count >= _maxEmailsPerMinute)
{
if (_sentTimestamps.TryPeek(out var oldestTime))
{
var waitTime = oldestTime.AddMinutes(1) - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
{
_logger.LogDebug("Rate limit reached, waiting {WaitMs}ms", waitTime.TotalMilliseconds);
await Task.Delay(waitTime, cancellationToken);
}
}
// Clean up again
now = DateTime.UtcNow;
oneMinuteAgo = now.AddMinutes(-1);
while (_sentTimestamps.TryPeek(out var time) && time < oneMinuteAgo)
{
_sentTimestamps.TryDequeue(out _);
}
}
// Record this send
_sentTimestamps.Enqueue(DateTime.UtcNow);
}
///
/// Calculate exponential backoff delay
///
public int CalculateBackoffDelay(int retryCount)
{
var delay = _baseRetryDelayMs * (int)Math.Pow(2, retryCount);
return Math.Min(delay, _maxRetryDelayMs);
}
///
/// Determine if an error is retryable
///
private static bool IsRetryableError(Exception ex)
{
// Network errors are retryable
if (ex is System.Net.Sockets.SocketException) return true;
if (ex is TimeoutException) return true;
if (ex is OperationCanceledException) return false;
// SMTP errors
if (ex is SmtpCommandException smtpEx)
{
// 4xx errors are temporary, 5xx are permanent
return smtpEx.StatusCode < MailKit.Net.Smtp.SmtpStatusCode.MailboxNameNotAllowed;
}
return true; // Default to retryable
}
public async Task SendEmailAsync(string to, string subject, string body, bool isHtml = true)
{
var startTime = DateTime.UtcNow;
SmtpClient? client = null;
try
{
// Apply rate limiting
await ApplyRateLimitAsync();
// Wait for concurrent send slot
await _rateLimitSemaphore.WaitAsync();
try
{
client = await GetClientFromPoolAsync();
if (client == null)
{
_logger.LogError("Failed to get SMTP client from pool for {To}", to);
Interlocked.Increment(ref _totalFailed);
return false;
}
var message = CreateMimeMessage(to, subject, body, isHtml);
await client.SendAsync(message);
// Update metrics
Interlocked.Increment(ref _totalSent);
Interlocked.Increment(ref _totalSuccess);
Interlocked.Add(ref _totalLatencyMs, (long)(DateTime.UtcNow - startTime).TotalMilliseconds);
_lastSentAt = DateTime.UtcNow;
_logger.LogDebug("Email sent to {To} in {ElapsedMs}ms", to, (DateTime.UtcNow - startTime).TotalMilliseconds);
return true;
}
finally
{
_rateLimitSemaphore.Release();
}
}
catch (Exception ex)
{
Interlocked.Increment(ref _totalSent);
Interlocked.Increment(ref _totalFailed);
_logger.LogError("Failed to send email to {To}: {Message}", to, ex.Message);
// Force reconnect on error
if (client != null)
{
try { client.Disconnect(true); } catch { }
try { client.Dispose(); } catch { }
client = null;
}
return false;
}
finally
{
ReturnClientToPool(client);
}
}
///
/// Send batch of emails with parallel processing and rate limiting
/// Returns number of successfully sent emails
///
public async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default)
{
var messageList = messages.ToList();
if (!messageList.Any()) return 0;
_logger.LogInformation("Sending batch of {Count} emails", messageList.Count);
var successCount = 0;
var channel = Channel.CreateBounded(new BoundedChannelOptions(_maxConcurrentSends * 2)
{
FullMode = BoundedChannelFullMode.Wait
});
// Producer task - writes messages to channel
var producerTask = Task.Run(async () =>
{
foreach (var msg in messageList)
{
if (cancellationToken.IsCancellationRequested) break;
await channel.Writer.WriteAsync(msg, cancellationToken);
}
channel.Writer.Complete();
}, cancellationToken);
// Consumer tasks - reads from channel and sends emails
var consumerTasks = Enumerable.Range(0, _maxConcurrentSends).Select(async _ =>
{
var localSuccess = 0;
await foreach (var msg in channel.Reader.ReadAllAsync(cancellationToken))
{
try
{
var result = await SendEmailAsync(msg.To, msg.Subject, msg.Body, msg.IsHtml);
if (result) localSuccess++;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send email {Id}", msg.Id);
}
}
return localSuccess;
}).ToList();
await producerTask;
var results = await Task.WhenAll(consumerTasks);
successCount = results.Sum();
_logger.LogInformation("Batch complete: {SuccessCount}/{TotalCount} successful", successCount, messageList.Count);
return successCount;
}
private MimeMessage CreateMimeMessage(string to, string subject, string body, bool isHtml)
{
var message = new MimeMessage();
message.From.Add(new MailboxAddress(_senderName, _senderEmail));
message.To.Add(MailboxAddress.Parse(to));
message.Subject = subject;
var bodyBuilder = new BodyBuilder();
if (isHtml)
{
bodyBuilder.HtmlBody = body;
}
else
{
bodyBuilder.TextBody = body;
}
message.Body = bodyBuilder.ToMessageBody();
return message;
}
public EmailMetrics GetMetrics()
{
var emailsLastMinute = 0;
var oneMinuteAgo = DateTime.UtcNow.AddMinutes(-1);
foreach (var timestamp in _sentTimestamps)
{
if (timestamp >= oneMinuteAgo) emailsLastMinute++;
}
return new EmailMetrics
{
TotalSent = Interlocked.Read(ref _totalSent),
TotalSuccess = Interlocked.Read(ref _totalSuccess),
TotalFailed = Interlocked.Read(ref _totalFailed),
TotalRetried = Interlocked.Read(ref _totalRetried),
AverageLatencyMs = _totalSent > 0 ? (double)Interlocked.Read(ref _totalLatencyMs) / _totalSent : 0,
EmailsLastMinute = emailsLastMinute,
LastSentAt = _lastSentAt
};
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_logger.LogInformation("Disposing email service...");
while (_connectionPool.TryTake(out var client))
{
try
{
if (client.IsConnected)
{
client.Disconnect(true);
}
client.Dispose();
}
catch { }
}
_poolSemaphore.Dispose();
_rateLimitSemaphore.Dispose();
_logger.LogInformation("Email service disposed. Final metrics: Sent={Sent}, Success={Success}, Failed={Failed}",
_totalSent, _totalSuccess, _totalFailed);
}
}