1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-06-02 04:34:03 +02:00

Change instance log sender from dedicated thread to async task

This commit is contained in:
chylex 2022-10-11 20:51:06 +02:00
parent f880a46887
commit 0b51a4509e
Signed by: chylex
GPG Key ID: 4DE42C8F19A80548
2 changed files with 35 additions and 34 deletions
Agent/Phantom.Agent.Services/Instances

View File

@ -1,14 +1,16 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Phantom.Agent.Rpc; using Phantom.Agent.Rpc;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Phantom.Utils.Threading;
using Serilog; using Serilog;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;
sealed class InstanceLogSenderThread { sealed class InstanceLogSender {
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
private readonly Guid instanceGuid; private readonly Guid instanceGuid;
private readonly ILogger logger; private readonly ILogger logger;
private readonly CancellationTokenSource cancellationTokenSource; private readonly CancellationTokenSource cancellationTokenSource;
@ -16,51 +18,46 @@ sealed class InstanceLogSenderThread {
private readonly SemaphoreSlim semaphore = new (1, 1); private readonly SemaphoreSlim semaphore = new (1, 1);
private readonly RingBuffer<string> buffer = new (1000); private readonly RingBuffer<string> buffer = new (1000);
public InstanceLogSenderThread(Guid instanceGuid, string name) { public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string name) {
this.instanceGuid = instanceGuid; this.instanceGuid = instanceGuid;
this.logger = PhantomLogger.Create<InstanceLogSenderThread>(name); this.logger = PhantomLogger.Create<InstanceLogSender>(name);
this.cancellationTokenSource = new CancellationTokenSource(); this.cancellationTokenSource = new CancellationTokenSource();
this.cancellationToken = cancellationTokenSource.Token; this.cancellationToken = cancellationTokenSource.Token;
taskManager.Run(Run);
var thread = new Thread(Run) {
IsBackground = true,
Name = "Instance Log Sender (" + name + ")"
};
thread.Start();
} }
[SuppressMessage("ReSharper", "LocalVariableHidesMember")]
private async void Run() { private async void Run() {
logger.Verbose("Thread started."); logger.Verbose("Task started.");
try { try {
while (!cancellationToken.IsCancellationRequested) { while (!cancellationToken.IsCancellationRequested) {
await semaphore.WaitAsync(cancellationToken); var lines = await DequeueOrThrow();
if (!lines.IsEmpty) {
ImmutableArray<string> lines;
try {
lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
buffer.Clear();
} finally {
semaphore.Release();
}
if (lines.Length > 0) {
await ServerMessaging.SendMessage(new InstanceOutputMessage(instanceGuid, lines)); await ServerMessaging.SendMessage(new InstanceOutputMessage(instanceGuid, lines));
} }
await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); await Task.Delay(SendDelay, cancellationToken);
} }
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// Ignore. // Ignore.
} catch (Exception e) { } catch (Exception e) {
logger.Error(e, "Caught exception in thread."); logger.Error(e, "Caught exception in task.");
} finally { } finally {
cancellationTokenSource.Dispose(); cancellationTokenSource.Dispose();
logger.Verbose("Thread stopped."); logger.Verbose("Task stopped.");
}
}
private async Task<ImmutableArray<string>> DequeueOrThrow() {
await semaphore.WaitAsync(cancellationToken);
try {
ImmutableArray<string> lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
buffer.Clear();
return lines;
} finally {
semaphore.Release();
} }
} }
@ -79,6 +76,10 @@ sealed class InstanceLogSenderThread {
} }
public void Cancel() { public void Cancel() {
cancellationTokenSource.Cancel(); try {
cancellationTokenSource.Cancel();
} catch (ObjectDisposedException) {
// Ignore.
}
} }
} }

View File

@ -9,7 +9,7 @@ namespace Phantom.Agent.Services.Instances.States;
sealed class InstanceRunningState : IInstanceState { sealed class InstanceRunningState : IInstanceState {
private readonly InstanceContext context; private readonly InstanceContext context;
private readonly InstanceSession session; private readonly InstanceSession session;
private readonly InstanceLogSenderThread logSenderThread; private readonly InstanceLogSender logSender;
private readonly SessionObjects sessionObjects; private readonly SessionObjects sessionObjects;
private readonly CancellationTokenSource delayedStopCancellationTokenSource = new (); private readonly CancellationTokenSource delayedStopCancellationTokenSource = new ();
@ -19,7 +19,7 @@ sealed class InstanceRunningState : IInstanceState {
public InstanceRunningState(InstanceContext context, InstanceSession session) { public InstanceRunningState(InstanceContext context, InstanceSession session) {
this.context = context; this.context = context;
this.session = session; this.session = session;
this.logSenderThread = new InstanceLogSenderThread(context.Configuration.InstanceGuid, context.ShortName); this.logSender = new InstanceLogSender(context.LaunchServices.TaskManager, context.Configuration.InstanceGuid, context.ShortName);
this.sessionObjects = new SessionObjects(this); this.sessionObjects = new SessionObjects(this);
} }
@ -42,7 +42,7 @@ sealed class InstanceRunningState : IInstanceState {
private void SessionOutput(object? sender, string e) { private void SessionOutput(object? sender, string e) {
context.Logger.Verbose("[Server] {Line}", e); context.Logger.Verbose("[Server] {Line}", e);
logSenderThread.Enqueue(e); logSender.Enqueue(e);
} }
private void SessionEnded(object? sender, EventArgs e) { private void SessionEnded(object? sender, EventArgs e) {
@ -153,7 +153,7 @@ sealed class InstanceRunningState : IInstanceState {
state.CancelDelayedStop(); state.CancelDelayedStop();
} }
state.logSenderThread.Cancel(); state.logSender.Cancel();
state.session.Dispose(); state.session.Dispose();
state.context.PortManager.Release(state.context.Configuration); state.context.PortManager.Release(state.context.Configuration);
return true; return true;