mirror of
https://github.com/SlothDpal/Relaunch-Process.git
synced 2026-02-22 17:27:38 +03:00
queue fixes
This commit is contained in:
@@ -12,16 +12,24 @@ namespace Discord.Webhook
|
||||
public class DiscordWebhook
|
||||
{
|
||||
public string Url { get; set; }
|
||||
public int queueRetryCount = 3;
|
||||
public int sendTimeoutSeconds = 5;
|
||||
|
||||
private UInt64 totalMessages = 0;
|
||||
private readonly ConcurrentQueue<(UInt64 num, DiscordMessage message, FileInfo[] files)> _queue = new ConcurrentQueue<(UInt64 num, DiscordMessage, FileInfo[])>();
|
||||
private ConcurrentQueue<(UInt64 num, DiscordMessage message, FileInfo[] files)> _queue = new ConcurrentQueue<(UInt64 num, DiscordMessage, FileInfo[])>();
|
||||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
|
||||
private bool _isProcessing;
|
||||
private CancellationTokenSource _cts = new CancellationTokenSource();
|
||||
private int queueErrorCounter = 0;
|
||||
private int queueSuppressedCounter = 0;
|
||||
private HttpRequestException lastHpptEx = null;
|
||||
|
||||
public UInt64 GetTotalMessages() => totalMessages;
|
||||
public int GetQueueSize() => _queue.Count;
|
||||
public UInt64 TotalMessages => totalMessages;
|
||||
public int QueueSize => _queue.Count;
|
||||
public void CancelProcessing() => _cts.Cancel();
|
||||
public int ErrorCount => queueErrorCounter;
|
||||
public bool IsProcessing => _isProcessing;
|
||||
public HttpRequestException LastHpptEx => lastHpptEx;
|
||||
|
||||
public async Task<bool> SendAsync(DiscordMessage message, params FileInfo[] files)
|
||||
{
|
||||
@@ -30,7 +38,7 @@ namespace Discord.Webhook
|
||||
|
||||
string boundary = "------------------------" + DateTime.Now.Ticks.ToString("x");
|
||||
|
||||
using (var client = new HttpClient() { Timeout = TimeSpan.FromSeconds(30) })
|
||||
using (var client = new HttpClient() { Timeout = TimeSpan.FromSeconds(sendTimeoutSeconds) })
|
||||
using (var content = new MultipartFormDataContent(boundary))
|
||||
{
|
||||
// Добавляем JSON payload
|
||||
@@ -64,12 +72,13 @@ namespace Discord.Webhook
|
||||
}
|
||||
catch (HttpRequestException ex)
|
||||
{
|
||||
Debug.WriteLine($"Discord webhook request failed: {ex.Message}");
|
||||
Debug.WriteLine($"SendAsync: Discord webhook request failed: {ex.Message}");
|
||||
lastHpptEx = ex;
|
||||
return false;
|
||||
}
|
||||
catch (TaskCanceledException ex)
|
||||
{
|
||||
Debug.WriteLine($"Discord webhook request cancelled: {ex.Message}");
|
||||
Debug.WriteLine($"SendAsync: Discord webhook request cancelled: {ex.Message}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -78,20 +87,34 @@ namespace Discord.Webhook
|
||||
|
||||
private async Task ProcessQueueAsync()
|
||||
{
|
||||
queueErrorCounter = 0;
|
||||
|
||||
while (_queue.TryPeek(out var item))
|
||||
{
|
||||
if (_cts.Token.IsCancellationRequested)
|
||||
{
|
||||
Debug.WriteLine("Discord queue processing cancelled.");
|
||||
Debug.WriteLine("ProcessQueueAsync: Discord queue processing cancelled.");
|
||||
break;
|
||||
}
|
||||
await _semaphore.WaitAsync();
|
||||
try
|
||||
{
|
||||
Debug.WriteLine($"Processing message {item.num}. Queue size: {_queue.Count}");
|
||||
Debug.WriteLine($"ProcessQueueAsync: Processing message {item.num}. Queue size: {_queue.Count}");
|
||||
if (await SendAsync(item.message, item.files))
|
||||
{
|
||||
_queue.TryDequeue(out var deqItem);
|
||||
queueErrorCounter = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
queueErrorCounter++;
|
||||
if (queueErrorCounter == queueRetryCount)
|
||||
{
|
||||
_queue.TryDequeue(out var deqItem);
|
||||
queueErrorCounter = 0;
|
||||
queueSuppressedCounter++;
|
||||
Debug.WriteLine($"ProcessQueueAsync: Message dropped. Total messages dropped:{queueSuppressedCounter}. Queue size: {_queue.Count}.");
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
@@ -104,10 +127,18 @@ namespace Discord.Webhook
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
Debug.WriteLine($"Discord queue processing cancelled during delay. Was {_queue.Count} messages in queue. {totalMessages} messages in session. ");
|
||||
Debug.WriteLine($"ProcessQueueAsync: Discord queue processing cancelled during delay. Was {_queue.Count} messages in queue. {totalMessages} messages in session. ");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ( _cts.IsCancellationRequested)
|
||||
{
|
||||
Debug.WriteLine($"ProcessQueueAsync: Discord queue processing cancelled. Was {_queue.Count} messages in queue.");
|
||||
Debug.WriteLine("Clearing queue.");
|
||||
var _newqueue = new ConcurrentQueue<(UInt64 num, DiscordMessage, FileInfo[])>();
|
||||
Interlocked.Exchange(ref _queue, _newqueue);
|
||||
}
|
||||
Debug.WriteLine($"ProcessQueueAsync: Discord queue processing finished.");
|
||||
_isProcessing = false;
|
||||
}
|
||||
|
||||
@@ -125,6 +156,7 @@ namespace Discord.Webhook
|
||||
_cts = new CancellationTokenSource();
|
||||
_isProcessing = true;
|
||||
Task.Run(ProcessQueueAsync);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user