1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-05-02 12:34:05 +02:00

Replace remaining uses of TaskManager

This commit is contained in:
chylex 2024-03-30 09:25:08 +01:00
parent 94148add2d
commit 137a2a53c3
Signed by: chylex
GPG Key ID: 4DE42C8F19A80548
11 changed files with 11 additions and 106 deletions

View File

@ -6,7 +6,6 @@ using Phantom.Agent.Services.Instances;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Agent.Services; namespace Phantom.Agent.Services;
@ -18,7 +17,6 @@ public sealed class AgentServices {
private AgentFolders AgentFolders { get; } private AgentFolders AgentFolders { get; }
private AgentState AgentState { get; } private AgentState AgentState { get; }
private TaskManager TaskManager { get; }
private BackupManager BackupManager { get; } private BackupManager BackupManager { get; }
internal JavaRuntimeRepository JavaRuntimeRepository { get; } internal JavaRuntimeRepository JavaRuntimeRepository { get; }
@ -30,13 +28,12 @@ public sealed class AgentServices {
this.AgentFolders = agentFolders; this.AgentFolders = agentFolders;
this.AgentState = new AgentState(); this.AgentState = new AgentState();
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks); this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
this.JavaRuntimeRepository = new JavaRuntimeRepository(); this.JavaRuntimeRepository = new JavaRuntimeRepository();
this.InstanceTicketManager = new InstanceTicketManager(agentInfo, controllerConnection); this.InstanceTicketManager = new InstanceTicketManager(agentInfo, controllerConnection);
var instanceManagerInit = new InstanceManagerActor.Init(controllerConnection, agentFolders, AgentState, JavaRuntimeRepository, InstanceTicketManager, TaskManager, BackupManager); var instanceManagerInit = new InstanceManagerActor.Init(controllerConnection, agentFolders, AgentState, JavaRuntimeRepository, InstanceTicketManager, BackupManager);
this.InstanceManager = ActorSystem.ActorOf(InstanceManagerActor.Factory(instanceManagerInit), "InstanceManager"); this.InstanceManager = ActorSystem.ActorOf(InstanceManagerActor.Factory(instanceManagerInit), "InstanceManager");
} }
@ -50,7 +47,6 @@ public sealed class AgentServices {
Logger.Information("Stopping services..."); Logger.Information("Stopping services...");
await InstanceManager.Stop(new InstanceManagerActor.ShutdownCommand()); await InstanceManager.Stop(new InstanceManagerActor.ShutdownCommand());
await TaskManager.Stop();
BackupManager.Dispose(); BackupManager.Dispose();

View File

@ -24,7 +24,7 @@ sealed class BackupScheduler : CancellableBackgroundTask {
public event EventHandler<BackupCreationResult>? BackupCompleted; public event EventHandler<BackupCreationResult>? BackupCompleted;
public BackupScheduler(InstanceContext context, InstanceProcess process, int serverPort) : base(PhantomLogger.Create<BackupScheduler>(context.ShortName), context.Services.TaskManager, "Backup scheduler for " + context.ShortName) { public BackupScheduler(InstanceContext context, InstanceProcess process, int serverPort) : base(PhantomLogger.Create<BackupScheduler>(context.ShortName)) {
this.backupManager = context.Services.BackupManager; this.backupManager = context.Services.BackupManager;
this.context = context; this.context = context;
this.process = process; this.process = process;

View File

@ -20,7 +20,7 @@ namespace Phantom.Agent.Services.Instances;
sealed class InstanceManagerActor : ReceiveActor<InstanceManagerActor.ICommand> { sealed class InstanceManagerActor : ReceiveActor<InstanceManagerActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceManagerActor>(); private static readonly ILogger Logger = PhantomLogger.Create<InstanceManagerActor>();
public readonly record struct Init(ControllerConnection ControllerConnection, AgentFolders AgentFolders, AgentState AgentState, JavaRuntimeRepository JavaRuntimeRepository, InstanceTicketManager InstanceTicketManager, TaskManager TaskManager, BackupManager BackupManager); public readonly record struct Init(ControllerConnection ControllerConnection, AgentFolders AgentFolders, AgentState AgentState, JavaRuntimeRepository JavaRuntimeRepository, InstanceTicketManager InstanceTicketManager, BackupManager BackupManager);
public static Props<ICommand> Factory(Init init) { public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceManagerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume }); return Props<ICommand>.Create(() => new InstanceManagerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
@ -47,7 +47,7 @@ sealed class InstanceManagerActor : ReceiveActor<InstanceManagerActor.ICommand>
var minecraftServerExecutables = new MinecraftServerExecutables(init.AgentFolders.ServerExecutableFolderPath); var minecraftServerExecutables = new MinecraftServerExecutables(init.AgentFolders.ServerExecutableFolderPath);
var launchServices = new LaunchServices(minecraftServerExecutables, init.JavaRuntimeRepository); var launchServices = new LaunchServices(minecraftServerExecutables, init.JavaRuntimeRepository);
this.instanceServices = new InstanceServices(init.ControllerConnection, init.TaskManager, init.BackupManager, launchServices); this.instanceServices = new InstanceServices(init.ControllerConnection, init.BackupManager, launchServices);
ReceiveAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance); ReceiveAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
ReceiveAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance); ReceiveAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);

View File

@ -1,8 +1,7 @@
using Phantom.Agent.Minecraft.Launcher; using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc; using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Utils.Tasks;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;
sealed record InstanceServices(ControllerConnection ControllerConnection, TaskManager TaskManager, BackupManager BackupManager, LaunchServices LaunchServices); sealed record InstanceServices(ControllerConnection ControllerConnection, BackupManager BackupManager, LaunchServices LaunchServices);

View File

@ -22,7 +22,7 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
private int droppedLinesSinceLastSend; private int droppedLinesSinceLastSend;
public InstanceLogSender(ControllerConnection controllerConnection, TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) { public InstanceLogSender(ControllerConnection controllerConnection, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName)) {
this.controllerConnection = controllerConnection; this.controllerConnection = controllerConnection;
this.instanceGuid = instanceGuid; this.instanceGuid = instanceGuid;
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped); this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);

View File

@ -31,7 +31,7 @@ sealed class InstanceRunningState : IDisposable {
this.Process = process; this.Process = process;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, context.InstanceGuid, context.ShortName); this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.InstanceGuid, context.ShortName);
this.backupScheduler = new BackupScheduler(context, process, configuration.ServerPort); this.backupScheduler = new BackupScheduler(context, process, configuration.ServerPort);
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted; this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;

View File

@ -10,7 +10,6 @@ using Phantom.Utils.Logging;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks;
var shutdownCancellationTokenSource = new CancellationTokenSource(); var shutdownCancellationTokenSource = new CancellationTokenSource();
var shutdownCancellationToken = shutdownCancellationTokenSource.Token; var shutdownCancellationToken = shutdownCancellationTokenSource.Token;
@ -64,14 +63,12 @@ try {
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate); return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
} }
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try { try {
await Task.WhenAll( await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.AgentRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken), RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.AgentRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.WebRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken) RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.WebRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken)
); );
} finally { } finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup(); NetMQConfig.Cleanup();
} }

View File

@ -8,19 +8,13 @@ public abstract class CancellableBackgroundTask {
protected ILogger Logger { get; } protected ILogger Logger { get; }
protected CancellationToken CancellationToken { get; } protected CancellationToken CancellationToken { get; }
private readonly TaskManager taskManager; protected CancellableBackgroundTask(ILogger logger) {
private readonly string taskName;
protected CancellableBackgroundTask(ILogger logger, TaskManager taskManager, string taskName) {
this.Logger = logger; this.Logger = logger;
this.CancellationToken = cancellationTokenSource.Token; this.CancellationToken = cancellationTokenSource.Token;
this.taskManager = taskManager;
this.taskName = taskName;
} }
protected void Start() { protected void Start() {
taskManager.Run(taskName, Run); Task.Run(Run, CancellationToken.None);
} }
private async Task Run() { private async Task Run() {

View File

@ -1,76 +0,0 @@
using System.Collections.Concurrent;
using Phantom.Utils.Collections;
using Serilog;
namespace Phantom.Utils.Tasks;
public sealed class TaskManager {
private readonly ILogger logger;
private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly CancellationToken cancellationToken;
private readonly ConcurrentDictionary<Task, string> runningTasks = new (ReferenceEqualityComparer<Task>.Instance);
public TaskManager(ILogger logger) {
this.logger = logger;
this.cancellationToken = cancellationTokenSource.Token;
}
private T Add<T>(string name, T task) where T : Task {
cancellationToken.ThrowIfCancellationRequested();
runningTasks.TryAdd(task, name);
task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default);
return task;
}
private void OnFinished(Task task) {
runningTasks.TryRemove(task, out _);
}
public Task Run(string name, Action action) {
return Add(name, Task.Run(action, cancellationToken));
}
public Task Run(string name, Func<Task> taskFunc) {
return Add(name, Task.Run(taskFunc, cancellationToken));
}
public Task<T> Run<T>(string name, Func<Task<T>> taskFunc) {
return Add(name, Task.Run(taskFunc, cancellationToken));
}
public async Task Stop() {
logger.Information("Stopping task manager...");
cancellationTokenSource.Cancel();
var remainingTasksAwaiterTask = WaitForRemainingTasks();
while (true) {
var logStateTimeoutTask = Task.Delay(TimeSpan.FromSeconds(10), CancellationToken.None);
var completedTask = await Task.WhenAny(remainingTasksAwaiterTask, logStateTimeoutTask);
if (completedTask == logStateTimeoutTask) {
var remainingTaskNames = runningTasks.Values.Order().ToList();
var remainingTaskNameList = string.Join('\n', remainingTaskNames.Select(static name => "- " + name));
logger.Warning("Waiting for {TaskCount} task(s) to finish:\n{TaskNames}", remainingTaskNames.Count, remainingTaskNameList);
}
else {
break;
}
}
runningTasks.Clear();
cancellationTokenSource.Dispose();
logger.Information("Task manager stopped.");
}
private async Task WaitForRemainingTasks() {
foreach (var task in runningTasks.Keys) {
try {
await task;
} catch (Exception) {
// ignored
}
}
}
}

View File

@ -9,7 +9,6 @@ using Phantom.Utils.Logging;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks;
using Phantom.Web; using Phantom.Web;
using Phantom.Web.Services; using Phantom.Web.Services;
using Phantom.Web.Services.Rpc; using Phantom.Web.Services.Rpc;
@ -59,8 +58,7 @@ try {
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken)); var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
var webConfiguration = new WebLauncher.Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken); var webConfiguration = new WebLauncher.Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web")); var webApplication = WebLauncher.CreateApplication(webConfiguration, applicationProperties, rpcSocket.Connection);
var webApplication = WebLauncher.CreateApplication(webConfiguration, taskManager, applicationProperties, rpcSocket.Connection);
using var actorSystem = ActorSystemFactory.Create("Web"); using var actorSystem = ActorSystemFactory.Create("Web");
@ -88,7 +86,6 @@ try {
await WebLauncher.Launch(webConfiguration, webApplication); await WebLauncher.Launch(webConfiguration, webApplication);
} finally { } finally {
shutdownCancellationTokenSource.Cancel(); shutdownCancellationTokenSource.Cancel();
await taskManager.Stop();
rpcDisconnectSemaphore.Release(); rpcDisconnectSemaphore.Release();
await rpcTask; await rpcTask;

View File

@ -1,7 +1,6 @@
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Phantom.Web.Services; using Phantom.Web.Services;
using Serilog; using Serilog;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
@ -13,7 +12,7 @@ static class WebLauncher {
public string HttpUrl => "http://" + Host + ":" + Port; public string HttpUrl => "http://" + Host + ":" + Port;
} }
internal static WebApplication CreateApplication(Configuration config, TaskManager taskManager, ApplicationProperties applicationProperties, RpcConnectionToServer<IMessageToController> controllerConnection) { internal static WebApplication CreateApplication(Configuration config, ApplicationProperties applicationProperties, RpcConnectionToServer<IMessageToController> controllerConnection) {
var assembly = typeof(WebLauncher).Assembly; var assembly = typeof(WebLauncher).Assembly;
var builder = WebApplication.CreateBuilder(new WebApplicationOptions { var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
ApplicationName = assembly.GetName().Name, ApplicationName = assembly.GetName().Name,
@ -29,7 +28,6 @@ static class WebLauncher {
builder.WebHost.UseStaticWebAssets(); builder.WebHost.UseStaticWebAssets();
} }
builder.Services.AddSingleton(taskManager);
builder.Services.AddSingleton(applicationProperties); builder.Services.AddSingleton(applicationProperties);
builder.Services.AddSingleton(controllerConnection); builder.Services.AddSingleton(controllerConnection);
builder.Services.AddPhantomServices(); builder.Services.AddPhantomServices();