mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-05-07 00:34:03 +02:00
Optimize sending instance logs to wait for output instead of constantly looping
This commit is contained in:
parent
6f11f65d91
commit
0ab165fd21
Agent/Phantom.Agent.Services
Utils/Phantom.Utils.Runtime
@ -27,6 +27,7 @@ sealed class BackupScheduler : CancellableBackgroundTask {
|
||||
this.process = process;
|
||||
this.serverPort = serverPort;
|
||||
this.serverStatusProtocol = new ServerStatusProtocol(loggerName);
|
||||
Start();
|
||||
}
|
||||
|
||||
protected override async Task RunTask() {
|
||||
|
@ -1,36 +1,62 @@
|
||||
using System.Collections.Immutable;
|
||||
using System.Threading.Channels;
|
||||
using Phantom.Agent.Rpc;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Collections;
|
||||
using Phantom.Utils.Runtime;
|
||||
|
||||
namespace Phantom.Agent.Services.Instances;
|
||||
|
||||
sealed class InstanceLogSender : CancellableBackgroundTask {
|
||||
private static readonly BoundedChannelOptions BufferOptions = new (capacity: 64) {
|
||||
SingleReader = true,
|
||||
SingleWriter = true,
|
||||
FullMode = BoundedChannelFullMode.DropNewest
|
||||
};
|
||||
|
||||
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
|
||||
|
||||
private readonly Guid instanceGuid;
|
||||
|
||||
private readonly SemaphoreSlim semaphore = new (1, 1);
|
||||
private readonly RingBuffer<string> buffer = new (1000);
|
||||
private readonly Channel<string> outputChannel;
|
||||
|
||||
private int droppedLinesSinceLastSend;
|
||||
|
||||
public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
|
||||
this.instanceGuid = instanceGuid;
|
||||
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
|
||||
Start();
|
||||
}
|
||||
|
||||
protected override async Task RunTask() {
|
||||
var lineReader = outputChannel.Reader;
|
||||
var lineBuilder = ImmutableArray.CreateBuilder<string>();
|
||||
|
||||
try {
|
||||
while (!CancellationToken.IsCancellationRequested) {
|
||||
await SendOutputToServer(await DequeueOrThrow());
|
||||
while (await lineReader.WaitToReadAsync(CancellationToken)) {
|
||||
await Task.Delay(SendDelay, CancellationToken);
|
||||
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
|
||||
}
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
}
|
||||
|
||||
// Flush remaining lines.
|
||||
await SendOutputToServer(DequeueWithoutSemaphore());
|
||||
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
|
||||
}
|
||||
|
||||
private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) {
|
||||
builder.Clear();
|
||||
|
||||
while (reader.TryRead(out string? line)) {
|
||||
builder.Add(line);
|
||||
}
|
||||
|
||||
int droppedLines = Interlocked.Exchange(ref droppedLinesSinceLastSend, 0);
|
||||
if (droppedLines > 0) {
|
||||
builder.Add($"Dropped {droppedLines} {(droppedLines == 1 ? "line" : "lines")} due to buffer overflow.");
|
||||
}
|
||||
|
||||
return builder.ToImmutable();
|
||||
}
|
||||
|
||||
private async Task SendOutputToServer(ImmutableArray<string> lines) {
|
||||
@ -39,33 +65,18 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
|
||||
}
|
||||
}
|
||||
|
||||
private ImmutableArray<string> DequeueWithoutSemaphore() {
|
||||
ImmutableArray<string> lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
|
||||
buffer.Clear();
|
||||
return lines;
|
||||
}
|
||||
|
||||
private async Task<ImmutableArray<string>> DequeueOrThrow() {
|
||||
await semaphore.WaitAsync(CancellationToken);
|
||||
|
||||
try {
|
||||
return DequeueWithoutSemaphore();
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
private void OnLineDropped(string line) {
|
||||
Logger.Warning("Buffer is full, dropped line: {Line}", line);
|
||||
Interlocked.Increment(ref droppedLinesSinceLastSend);
|
||||
}
|
||||
|
||||
public void Enqueue(string line) {
|
||||
try {
|
||||
semaphore.Wait(CancellationToken);
|
||||
} catch (Exception) {
|
||||
return;
|
||||
}
|
||||
outputChannel.Writer.TryWrite(line);
|
||||
}
|
||||
|
||||
try {
|
||||
buffer.Add(line);
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
protected override void Dispose() {
|
||||
if (!outputChannel.Writer.TryComplete()) {
|
||||
Logger.Error("Could not mark channel as completed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,9 +8,18 @@ public abstract class CancellableBackgroundTask {
|
||||
protected ILogger Logger { get; }
|
||||
protected CancellationToken CancellationToken { get; }
|
||||
|
||||
private readonly TaskManager taskManager;
|
||||
private readonly string taskName;
|
||||
|
||||
protected CancellableBackgroundTask(ILogger logger, TaskManager taskManager, string taskName) {
|
||||
this.Logger = logger;
|
||||
this.CancellationToken = cancellationTokenSource.Token;
|
||||
|
||||
this.taskManager = taskManager;
|
||||
this.taskName = taskName;
|
||||
}
|
||||
|
||||
protected void Start() {
|
||||
taskManager.Run(taskName, Run);
|
||||
}
|
||||
|
||||
@ -25,12 +34,15 @@ public abstract class CancellableBackgroundTask {
|
||||
Logger.Fatal(e, "Caught exception in task.");
|
||||
} finally {
|
||||
cancellationTokenSource.Dispose();
|
||||
Dispose();
|
||||
Logger.Debug("Task stopped.");
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Task RunTask();
|
||||
|
||||
protected virtual void Dispose() {}
|
||||
|
||||
public void Stop() {
|
||||
try {
|
||||
cancellationTokenSource.Cancel();
|
||||
|
Loading…
Reference in New Issue
Block a user