From 0b51a4509ed653a248d71fa64389d59d420e8c85 Mon Sep 17 00:00:00 2001
From: chylex <contact@chylex.com>
Date: Tue, 11 Oct 2022 20:51:06 +0200
Subject: [PATCH] Change instance log sender from dedicated thread to async
 task

---
 ...ogSenderThread.cs => InstanceLogSender.cs} | 61 ++++++++++---------
 .../Instances/States/InstanceRunningState.cs  |  8 +--
 2 files changed, 35 insertions(+), 34 deletions(-)
 rename Agent/Phantom.Agent.Services/Instances/{InstanceLogSenderThread.cs => InstanceLogSender.cs} (55%)

diff --git a/Agent/Phantom.Agent.Services/Instances/InstanceLogSenderThread.cs b/Agent/Phantom.Agent.Services/Instances/InstanceLogSender.cs
similarity index 55%
rename from Agent/Phantom.Agent.Services/Instances/InstanceLogSenderThread.cs
rename to Agent/Phantom.Agent.Services/Instances/InstanceLogSender.cs
index 2419a5a..a3da2bb 100644
--- a/Agent/Phantom.Agent.Services/Instances/InstanceLogSenderThread.cs
+++ b/Agent/Phantom.Agent.Services/Instances/InstanceLogSender.cs
@@ -1,14 +1,16 @@
 using System.Collections.Immutable;
-using System.Diagnostics.CodeAnalysis;
 using Phantom.Agent.Rpc;
 using Phantom.Common.Logging;
 using Phantom.Common.Messages.ToServer;
 using Phantom.Utils.Collections;
+using Phantom.Utils.Threading;
 using Serilog;
 
 namespace Phantom.Agent.Services.Instances; 
 
-sealed class InstanceLogSenderThread {
+sealed class InstanceLogSender {
+	private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
+	
 	private readonly Guid instanceGuid;
 	private readonly ILogger logger;
 	private readonly CancellationTokenSource cancellationTokenSource;
@@ -16,51 +18,46 @@ sealed class InstanceLogSenderThread {
 	
 	private readonly SemaphoreSlim semaphore = new (1, 1);
 	private readonly RingBuffer<string> buffer = new (1000);
-	
-	public InstanceLogSenderThread(Guid instanceGuid, string name) {
+
+	public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string name) {
 		this.instanceGuid = instanceGuid;
-		this.logger = PhantomLogger.Create<InstanceLogSenderThread>(name);
+		this.logger = PhantomLogger.Create<InstanceLogSender>(name);
 		this.cancellationTokenSource = new CancellationTokenSource();
 		this.cancellationToken = cancellationTokenSource.Token;
-		
-		var thread = new Thread(Run) {
-			IsBackground = true,
-			Name = "Instance Log Sender (" + name + ")"
-		};
-		
-		thread.Start();
+		taskManager.Run(Run);
 	}
 
-	[SuppressMessage("ReSharper", "LocalVariableHidesMember")]
 	private async void Run() {
-		logger.Verbose("Thread started.");
+		logger.Verbose("Task started.");
 		
 		try {
 			while (!cancellationToken.IsCancellationRequested) {
-				await semaphore.WaitAsync(cancellationToken);
-
-				ImmutableArray<string> lines;
-				
-				try {
-					lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
-					buffer.Clear();
-				} finally {
-					semaphore.Release();
-				}
-
-				if (lines.Length > 0) {
+				var lines = await DequeueOrThrow();
+				if (!lines.IsEmpty) {
 					await ServerMessaging.SendMessage(new InstanceOutputMessage(instanceGuid, lines));
 				}
 
-				await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken);
+				await Task.Delay(SendDelay, cancellationToken);
 			}
 		} catch (OperationCanceledException) {
 			// Ignore.
 		} catch (Exception e) {
-			logger.Error(e, "Caught exception in thread.");
+			logger.Error(e, "Caught exception in task.");
 		} finally {
 			cancellationTokenSource.Dispose();
-			logger.Verbose("Thread stopped.");
+			logger.Verbose("Task stopped.");
+		}
+	}
+
+	private async Task<ImmutableArray<string>> DequeueOrThrow() {
+		await semaphore.WaitAsync(cancellationToken);
+
+		try {
+			ImmutableArray<string> lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
+			buffer.Clear();
+			return lines;
+		} finally {
+			semaphore.Release();
 		}
 	}
 
@@ -79,6 +76,10 @@ sealed class InstanceLogSenderThread {
 	}
 
 	public void Cancel() {
-		cancellationTokenSource.Cancel();
+		try {
+			cancellationTokenSource.Cancel();
+		} catch (ObjectDisposedException) {
+			// Ignore.
+		}
 	}
 }
diff --git a/Agent/Phantom.Agent.Services/Instances/States/InstanceRunningState.cs b/Agent/Phantom.Agent.Services/Instances/States/InstanceRunningState.cs
index fc28e52..0c34576 100644
--- a/Agent/Phantom.Agent.Services/Instances/States/InstanceRunningState.cs
+++ b/Agent/Phantom.Agent.Services/Instances/States/InstanceRunningState.cs
@@ -9,7 +9,7 @@ namespace Phantom.Agent.Services.Instances.States;
 sealed class InstanceRunningState : IInstanceState {
 	private readonly InstanceContext context;
 	private readonly InstanceSession session;
-	private readonly InstanceLogSenderThread logSenderThread;
+	private readonly InstanceLogSender logSender;
 	private readonly SessionObjects sessionObjects;
 	
 	private readonly CancellationTokenSource delayedStopCancellationTokenSource = new ();
@@ -19,7 +19,7 @@ sealed class InstanceRunningState : IInstanceState {
 	public InstanceRunningState(InstanceContext context, InstanceSession session) {
 		this.context = context;
 		this.session = session;
-		this.logSenderThread = new InstanceLogSenderThread(context.Configuration.InstanceGuid, context.ShortName);
+		this.logSender = new InstanceLogSender(context.LaunchServices.TaskManager, context.Configuration.InstanceGuid, context.ShortName);
 		this.sessionObjects = new SessionObjects(this);
 	}
 
@@ -42,7 +42,7 @@ sealed class InstanceRunningState : IInstanceState {
 
 	private void SessionOutput(object? sender, string e) {
 		context.Logger.Verbose("[Server] {Line}", e);
-		logSenderThread.Enqueue(e);
+		logSender.Enqueue(e);
 	}
 
 	private void SessionEnded(object? sender, EventArgs e) {
@@ -153,7 +153,7 @@ sealed class InstanceRunningState : IInstanceState {
 				state.CancelDelayedStop();
 			}
 			
-			state.logSenderThread.Cancel();
+			state.logSender.Cancel();
 			state.session.Dispose();
 			state.context.PortManager.Release(state.context.Configuration);
 			return true;