Compare commits

...

2 Commits

Author SHA1 Message Date
chylex dcfe66a337
Move RPC message receiving to Akka.NET 2024-03-15 22:00:37 +01:00
chylex 0d039b69de
Implement actors in Controller via Akka.NET 2024-03-11 00:01:54 +01:00
73 changed files with 1761 additions and 812 deletions

View File

@ -21,9 +21,11 @@ sealed class KeepAliveLoop {
private async Task Run() {
var cancellationToken = cancellationTokenSource.Token;
Logger.Information("Started keep-alive loop.");
try {
await connection.IsReady.WaitAsync(cancellationToken);
Logger.Information("Started keep-alive loop.");
while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken);
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);

View File

@ -17,6 +17,7 @@ sealed class Instance : IAsyncDisposable {
private IServerLauncher Launcher { get; set; }
private readonly SemaphoreSlim configurationSemaphore = new (1, 1);
private readonly Guid instanceGuid;
private readonly string shortName;
private readonly ILogger logger;
@ -29,7 +30,8 @@ sealed class Instance : IAsyncDisposable {
private readonly InstanceProcedureManager procedureManager;
public Instance(string shortName, InstanceServices services, InstanceConfiguration configuration, IServerLauncher launcher) {
public Instance(Guid instanceGuid, string shortName, InstanceServices services, InstanceConfiguration configuration, IServerLauncher launcher) {
this.instanceGuid = instanceGuid;
this.shortName = shortName;
this.logger = PhantomLogger.Create<Instance>(shortName);
@ -44,16 +46,16 @@ sealed class Instance : IAsyncDisposable {
}
public void ReportLastStatus() {
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, currentStatus));
}
private void ReportAndSetStatus(IInstanceStatus status) {
currentStatus = status;
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, status));
}
private void ReportEvent(IInstanceEvent instanceEvent) {
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent));
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, instanceGuid, instanceEvent));
}
internal void TransitionState(IInstanceState newState) {
@ -99,7 +101,7 @@ sealed class Instance : IAsyncDisposable {
await configurationSemaphore.WaitAsync(cancellationToken);
try {
procedure = new LaunchInstanceProcedure(Configuration, Launcher);
procedure = new LaunchInstanceProcedure(instanceGuid, Configuration, Launcher);
} finally {
configurationSemaphore.Release();
}

View File

@ -75,9 +75,8 @@ sealed class InstanceSessionManager : IAsyncDisposable {
});
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration, InstanceLaunchProperties launchProperties, bool launchNow, bool alwaysReportStatus) {
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(Guid instanceGuid, InstanceConfiguration configuration, InstanceLaunchProperties launchProperties, bool launchNow, bool alwaysReportStatus) {
return await AcquireSemaphoreAndRun(async () => {
var instanceGuid = configuration.InstanceGuid;
var instanceFolder = Path.Combine(basePath, instanceGuid.ToString());
Directories.Create(instanceFolder, Chmod.URWX_GRX);
@ -106,15 +105,15 @@ sealed class InstanceSessionManager : IAsyncDisposable {
if (instances.TryGetValue(instanceGuid, out var instance)) {
await instance.Reconfigure(configuration, launcher, shutdownCancellationToken);
Logger.Information("Reconfigured instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, configuration.InstanceGuid);
Logger.Information("Reconfigured instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
if (alwaysReportStatus) {
instance.ReportLastStatus();
}
}
else {
instances[instanceGuid] = instance = new Instance(GetInstanceLoggerName(instanceGuid), instanceServices, configuration, launcher);
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, configuration.InstanceGuid);
instances[instanceGuid] = instance = new Instance(instanceGuid, GetInstanceLoggerName(instanceGuid), instanceServices, configuration, launcher);
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
instance.ReportLastStatus();
instance.IsRunningChanged += OnInstanceIsRunningChanged;

View File

@ -6,7 +6,7 @@ using Phantom.Common.Data.Instance;
namespace Phantom.Agent.Services.Instances.Procedures;
sealed record LaunchInstanceProcedure(InstanceConfiguration Configuration, IServerLauncher Launcher, bool IsRestarting = false) : IInstanceProcedure {
sealed record LaunchInstanceProcedure(Guid InstanceGuid, InstanceConfiguration Configuration, IServerLauncher Launcher, bool IsRestarting = false) : IInstanceProcedure {
public async Task<IInstanceState?> Run(IInstanceContext context, CancellationToken cancellationToken) {
if (!IsRestarting && context.CurrentState is InstanceRunningState) {
return null;
@ -30,7 +30,7 @@ sealed record LaunchInstanceProcedure(InstanceConfiguration Configuration, IServ
context.Logger.Information("Session starting...");
try {
InstanceProcess process = await DoLaunch(context, cancellationToken);
return new InstanceRunningState(Configuration, Launcher, process, context);
return new InstanceRunningState(InstanceGuid, Configuration, Launcher, process, context);
} catch (OperationCanceledException) {
context.SetStatus(InstanceStatus.NotRunning);
} catch (LaunchFailureException e) {

View File

@ -12,6 +12,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
internal bool IsStopping { get; set; }
private readonly Guid instanceGuid;
private readonly InstanceConfiguration configuration;
private readonly IServerLauncher launcher;
private readonly IInstanceContext context;
@ -21,13 +22,14 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
private bool isDisposed;
public InstanceRunningState(InstanceConfiguration configuration, IServerLauncher launcher, InstanceProcess process, IInstanceContext context) {
public InstanceRunningState(Guid instanceGuid, InstanceConfiguration configuration, IServerLauncher launcher, InstanceProcess process, IInstanceContext context) {
this.instanceGuid = instanceGuid;
this.configuration = configuration;
this.launcher = launcher;
this.context = context;
this.Process = process;
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, instanceGuid, context.ShortName);
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;
@ -64,7 +66,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
else {
context.Logger.Information("Session ended unexpectedly, restarting...");
context.ReportEvent(InstanceEvent.Crashed);
context.EnqueueProcedure(new LaunchInstanceProcedure(configuration, launcher, IsRestarting: true));
context.EnqueueProcedure(new LaunchInstanceProcedure(instanceGuid, configuration, launcher, IsRestarting: true));
}
}

View File

@ -27,19 +27,21 @@ public sealed class MessageListener : IMessageToAgentListener {
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
Logger.Information("Agent authentication successful.");
void ShutdownAfterConfigurationFailed(InstanceConfiguration configuration) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, configuration.InstanceGuid);
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, instanceGuid);
shutdownTokenSource.Cancel();
}
foreach (var configureInstanceMessage in message.InitialInstanceConfigurations) {
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
if (!result.Is(ConfigureInstanceResult.Success)) {
ShutdownAfterConfigurationFailed(configureInstanceMessage.Configuration);
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
return NoReply.Instance;
}
}
connection.SetIsReady();
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
await agent.InstanceSessionManager.RefreshAgentStatus();
@ -62,7 +64,7 @@ public sealed class MessageListener : IMessageToAgentListener {
}
private Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
return agent.InstanceSessionManager.Configure(message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
return agent.InstanceSessionManager.Configure(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {

View File

@ -0,0 +1,15 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Agent(
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] AgentConfiguration Configuration,
[property: MemoryPackOrder(2)] AgentStats? Stats,
[property: MemoryPackOrder(3)] IAgentConnectionStatus ConnectionStatus
) {
[MemoryPackIgnore]
public RamAllocationUnits? AvailableMemory => Configuration.MaxMemory - Stats?.RunningInstanceMemory;
}

View File

@ -0,0 +1,19 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentConfiguration(
[property: MemoryPackOrder(0)] string AgentName,
[property: MemoryPackOrder(1)] ushort ProtocolVersion,
[property: MemoryPackOrder(2)] string BuildVersion,
[property: MemoryPackOrder(3)] ushort MaxInstances,
[property: MemoryPackOrder(4)] RamAllocationUnits MaxMemory,
[property: MemoryPackOrder(5)] AllowedPorts? AllowedServerPorts = null,
[property: MemoryPackOrder(6)] AllowedPorts? AllowedRconPorts = null
) {
public static AgentConfiguration From(AgentInfo agentInfo) {
return new AgentConfiguration(agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
}
}

View File

@ -1,22 +0,0 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentWithStats(
[property: MemoryPackOrder(0)] Guid Guid,
[property: MemoryPackOrder(1)] string Name,
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
[property: MemoryPackOrder(3)] string BuildVersion,
[property: MemoryPackOrder(4)] ushort MaxInstances,
[property: MemoryPackOrder(5)] RamAllocationUnits MaxMemory,
[property: MemoryPackOrder(6)] AllowedPorts? AllowedServerPorts,
[property: MemoryPackOrder(7)] AllowedPorts? AllowedRconPorts,
[property: MemoryPackOrder(8)] AgentStats? Stats,
[property: MemoryPackOrder(9)] DateTimeOffset? LastPing,
[property: MemoryPackOrder(10)] bool IsOnline
) {
[MemoryPackIgnore]
public RamAllocationUnits? AvailableMemory => MaxMemory - Stats?.RunningInstanceMemory;
}

View File

@ -0,0 +1,27 @@
using MemoryPack;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable]
[MemoryPackUnion(0, typeof(AgentIsOffline))]
[MemoryPackUnion(1, typeof(AgentIsDisconnected))]
[MemoryPackUnion(2, typeof(AgentIsOnline))]
public partial interface IAgentConnectionStatus {}
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsOffline : IAgentConnectionStatus;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsDisconnected([property: MemoryPackOrder(0)] DateTimeOffset LastPingTime) : IAgentConnectionStatus;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsOnline : IAgentConnectionStatus;
public static class AgentConnectionStatus {
public static readonly IAgentConnectionStatus Offline = new AgentIsOffline();
public static readonly IAgentConnectionStatus Online = new AgentIsOnline();
public static IAgentConnectionStatus Disconnected(DateTimeOffset lastPingTime) {
return new AgentIsDisconnected(lastPingTime);
}
}

View File

@ -5,11 +5,12 @@ namespace Phantom.Common.Data.Web.Instance;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Instance(
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(1)] IInstanceStatus Status,
[property: MemoryPackOrder(2)] bool LaunchAutomatically
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(2)] IInstanceStatus Status,
[property: MemoryPackOrder(3)] bool LaunchAutomatically
) {
public static Instance Offline(InstanceConfiguration configuration, bool launchAutomatically = false) {
return new Instance(configuration, InstanceStatus.Offline, launchAutomatically);
public static Instance Offline(Guid instanceGuid, InstanceConfiguration configuration, bool launchAutomatically = false) {
return new Instance(instanceGuid, configuration, InstanceStatus.Offline, launchAutomatically);
}
}

View File

@ -4,8 +4,8 @@ namespace Phantom.Common.Data.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentInfo(
[property: MemoryPackOrder(0)] Guid Guid,
[property: MemoryPackOrder(1)] string Name,
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] string AgentName,
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
[property: MemoryPackOrder(3)] string BuildVersion,
[property: MemoryPackOrder(4)] ushort MaxInstances,

View File

@ -7,13 +7,12 @@ namespace Phantom.Common.Data.Instance;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record InstanceConfiguration(
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] string InstanceName,
[property: MemoryPackOrder(3)] ushort ServerPort,
[property: MemoryPackOrder(4)] ushort RconPort,
[property: MemoryPackOrder(5)] string MinecraftVersion,
[property: MemoryPackOrder(6)] MinecraftServerKind MinecraftServerKind,
[property: MemoryPackOrder(7)] RamAllocationUnits MemoryAllocation,
[property: MemoryPackOrder(8)] Guid JavaRuntimeGuid,
[property: MemoryPackOrder(9)] ImmutableArray<string> JvmArguments
[property: MemoryPackOrder(1)] string InstanceName,
[property: MemoryPackOrder(2)] ushort ServerPort,
[property: MemoryPackOrder(3)] ushort RconPort,
[property: MemoryPackOrder(4)] string MinecraftVersion,
[property: MemoryPackOrder(5)] MinecraftServerKind MinecraftServerKind,
[property: MemoryPackOrder(6)] RamAllocationUnits MemoryAllocation,
[property: MemoryPackOrder(7)] Guid JavaRuntimeGuid,
[property: MemoryPackOrder(8)] ImmutableArray<string> JvmArguments
);

View File

@ -3,3 +3,12 @@
public enum ConfigureInstanceResult : byte {
Success
}
public static class ConfigureInstanceResultExtensions {
public static string ToSentence(this ConfigureInstanceResult reason) {
return reason switch {
ConfigureInstanceResult.Success => "Success.",
_ => "Unknown error."
};
}
}

View File

@ -2,6 +2,7 @@
public enum InstanceActionGeneralResult : byte {
None,
AgentDoesNotExist,
AgentShuttingDown,
AgentIsNotResponding,
InstanceDoesNotExist

View File

@ -18,6 +18,7 @@ public sealed partial record InstanceActionResult<T>(
public string ToSentence(Func<T, string> concreteResultToSentence) {
return GeneralResult switch {
InstanceActionGeneralResult.None => concreteResultToSentence(ConcreteResult!),
InstanceActionGeneralResult.AgentDoesNotExist => "Agent does not exist.",
InstanceActionGeneralResult.AgentShuttingDown => "Agent is shutting down.",
InstanceActionGeneralResult.AgentIsNotResponding => "Agent is not responding.",
InstanceActionGeneralResult.InstanceDoesNotExist => "Instance does not exist.",

View File

@ -6,9 +6,10 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ConfigureInstanceMessage(
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(1)] InstanceLaunchProperties LaunchProperties,
[property: MemoryPackOrder(2)] bool LaunchNow = false
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(2)] InstanceLaunchProperties LaunchProperties,
[property: MemoryPackOrder(3)] bool LaunchNow = false
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleConfigureInstance(this);

View File

@ -8,7 +8,8 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record CreateOrUpdateInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] InstanceConfiguration Configuration
) : IMessageToController<InstanceActionResult<CreateOrUpdateInstanceResult>> {
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateOrUpdateInstance(this);

View File

@ -6,7 +6,8 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid
) : IMessageToController<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleLaunchInstance(this);

View File

@ -6,8 +6,9 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] string Command
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] string Command
) : IMessageToController<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleSendCommandToInstance(this);

View File

@ -7,8 +7,9 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] MinecraftStopStrategy StopStrategy
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] MinecraftStopStrategy StopStrategy
) : IMessageToController<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleStopInstance(this);

View File

@ -7,7 +7,7 @@ namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RefreshAgentsMessage(
[property: MemoryPackOrder(0)] ImmutableArray<AgentWithStats> Agents
[property: MemoryPackOrder(0)] ImmutableArray<Agent> Agents
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRefreshAgents(this);

View File

@ -1,39 +0,0 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
namespace Phantom.Controller.Services.Agents;
public sealed record Agent(
Guid Guid,
string Name,
ushort ProtocolVersion,
string BuildVersion,
ushort MaxInstances,
RamAllocationUnits MaxMemory,
AllowedPorts? AllowedServerPorts = null,
AllowedPorts? AllowedRconPorts = null,
AgentStats? Stats = null,
DateTimeOffset? LastPing = null
) {
internal AgentConnection? Connection { get; init; }
public bool IsOnline { get; internal init; }
public bool IsOffline => !IsOnline;
internal Agent(AgentInfo info) : this(info.Guid, info.Name, info.ProtocolVersion, info.BuildVersion, info.MaxInstances, info.MaxMemory, info.AllowedServerPorts, info.AllowedRconPorts) {}
internal Agent AsOnline(DateTimeOffset lastPing) => this with {
LastPing = lastPing,
IsOnline = Connection != null
};
internal Agent AsDisconnected() => this with {
IsOnline = false
};
internal Agent AsOffline() => this with {
Connection = null,
Stats = null,
IsOnline = false
};
}

View File

@ -0,0 +1,348 @@
using System.Collections.Immutable;
using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Actor.Tasks;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentActor>();
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
public readonly record struct Init(Guid AgentGuid, AgentConfiguration AgentConfiguration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
}
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly Guid agentGuid;
private AgentConfiguration configuration;
private AgentStats? stats;
private ImmutableArray<TaggedJavaRuntime> javaRuntimes = ImmutableArray<TaggedJavaRuntime>.Empty;
private readonly AgentConnection connection;
private DateTimeOffset? lastPingTime;
private bool isOnline;
private IAgentConnectionStatus ConnectionStatus {
get {
if (isOnline) {
return AgentConnectionStatus.Online;
}
else if (lastPingTime == null) {
return AgentConnectionStatus.Offline;
}
else {
return AgentConnectionStatus.Disconnected(lastPingTime.Value);
}
}
}
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
private AgentActor(Init init) {
this.controllerState = init.ControllerState;
this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
this.agentGuid = init.AgentGuid;
this.configuration = init.AgentConfiguration;
this.connection = new AgentConnection(agentGuid, configuration.AgentName);
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(agentGuid, init.DbProvider, init.CancellationToken)), "DatabaseStorage");
NotifyAgentUpdated();
ReceiveAsync<InitializeCommand>(Initialize);
ReceiveAsyncAndReply<RegisterCommand, ImmutableArray<ConfigureInstanceMessage>>(Register);
Receive<UnregisterCommand>(Unregister);
Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus);
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
Receive<UpdateStatsCommand>(UpdateStats);
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
}
private void NotifyAgentUpdated() {
controllerState.UpdateAgent(new Agent(agentGuid, configuration, stats, ConnectionStatus));
}
protected override void PreStart() {
Self.Tell(new InitializeCommand());
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
}
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
UpdateInstanceData(instance);
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instance.InstanceGuid, instanceActor);
return instanceActor;
}
private void UpdateInstanceData(Instance instance) {
instanceDataByGuid[instance.InstanceGuid] = instance;
controllerState.UpdateInstance(instance);
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
var name = "Instance:" + instance.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Tell(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
}
private void TellAllInstances(InstanceActor.ICommand command) {
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(command);
}
}
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
return instance.Request(command, cancellationToken);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
}
}
private async Task<ImmutableArray<ConfigureInstanceMessage>> PrepareInitialConfigurationMessages() {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
foreach (var (instanceGuid, instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(instanceGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
public interface ICommand {}
private sealed record InitializeCommand : ICommand;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
private sealed record RefreshConnectionStatusCommand : ICommand;
public sealed record NotifyIsAliveCommand : ICommand;
public sealed record UpdateStatsCommand(int RunningInstanceCount, RamAllocationUnits RunningInstanceMemory) : ICommand;
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, Guid InstanceGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
private async Task Initialize(InitializeCommand command) {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == agentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var instanceConfiguration = new InstanceConfiguration(
entity.AgentGuid,
entity.InstanceName,
entity.ServerPort,
entity.RconPort,
entity.MinecraftVersion,
entity.MinecraftServerKind,
entity.MemoryAllocation,
entity.JavaRuntimeGuid,
JvmArgumentsHelper.Split(entity.JvmArguments)
);
CreateNewInstance(Instance.Offline(entity.InstanceGuid, instanceConfiguration, entity.LaunchAutomatically));
}
}
private async Task<ImmutableArray<ConfigureInstanceMessage>> Register(RegisterCommand command) {
var configurationMessages = await PrepareInitialConfigurationMessages();
configuration = command.Configuration;
connection.UpdateConnection(command.Connection, configuration.AgentName);
lastPingTime = DateTimeOffset.Now;
isOnline = true;
NotifyAgentUpdated();
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentConfigurationCommand(configuration));
return configurationMessages;
}
private void Unregister(UnregisterCommand command) {
if (connection.CloseIfSame(command.Connection)) {
stats = null;
lastPingTime = null;
isOnline = false;
NotifyAgentUpdated();
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
}
}
private void RefreshConnectionStatus(RefreshConnectionStatusCommand command) {
if (isOnline && lastPingTime != null && DateTimeOffset.Now - lastPingTime >= DisconnectionThreshold) {
isOnline = false;
NotifyAgentUpdated();
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
}
}
private void NotifyIsAlive(NotifyIsAliveCommand command) {
lastPingTime = DateTimeOffset.Now;
if (!isOnline) {
isOnline = true;
NotifyAgentUpdated();
}
}
private void UpdateStats(UpdateStatsCommand command) {
stats = new AgentStats(command.RunningInstanceCount, command.RunningInstanceMemory);
NotifyAgentUpdated();
}
private void UpdateJavaRuntimes(UpdateJavaRuntimesCommand command) {
javaRuntimes = command.JavaRuntimes;
controllerState.UpdateAgentJavaRuntimes(agentGuid, javaRuntimes);
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
var instanceConfiguration = command.Configuration;
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
}
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance1, command)
.Unwrap();
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
if (serverExecutableInfo == null) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
}
var instanceConfiguration = command.Configuration;
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(command.InstanceGuid, out var instanceActorRef);
if (isCreatingInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(command.InstanceGuid, instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, command.InstanceGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
}
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
var instanceGuid = command.InstanceGuid;
var instanceName = command.Configuration.InstanceName;
var isCreating = command.IsCreatingInstance;
if (result.Is(ConfigureInstanceResult.Success)) {
string action = isCreating ? "Added" : "Edited";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
}
else {
string action = isCreating ? "adding" : "editing";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
return result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
}
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
UpdateInstanceData(command.Instance);
}
}

View File

@ -1,28 +1,65 @@
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentConnection {
private readonly RpcConnectionToClient<IMessageToAgentListener> connection;
internal AgentConnection(RpcConnectionToClient<IMessageToAgentListener> connection) {
this.connection = connection;
private static readonly ILogger Logger = PhantomLogger.Create<AgentConnection>();
private readonly Guid agentGuid;
private string agentName;
private RpcConnectionToClient<IMessageToAgentListener>? connection;
public AgentConnection(Guid agentGuid, string agentName) {
this.agentName = agentName;
this.agentGuid = agentGuid;
}
public bool IsSame(RpcConnectionToClient<IMessageToAgentListener> connection) {
return this.connection.IsSame(connection);
public void UpdateConnection(RpcConnectionToClient<IMessageToAgentListener> newConnection, string newAgentName) {
lock (this) {
connection?.Close();
connection = newConnection;
agentName = newAgentName;
}
}
public void Close() {
connection.Close();
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgentListener> expected) {
lock (this) {
if (connection != null && connection.IsSame(expected)) {
connection.Close();
return true;
}
}
return false;
}
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
return connection.Send(message);
lock (this) {
if (connection == null) {
LogAgentOffline();
return Task.CompletedTask;
}
return connection.Send(message);
}
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
lock (this) {
if (connection == null) {
LogAgentOffline();
return Task.FromResult<TReply?>(default);
}
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken)!;
}
}
private void LogAgentOffline() {
Logger.Error("Could not send message to offline agent \"{Name}\" (GUID {Guid}).", agentName, agentGuid);
}
}

View File

@ -0,0 +1,82 @@
using Phantom.Common.Data.Web.Agent;
using Phantom.Controller.Database;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentDatabaseStorageActor>();
public readonly record struct Init(Guid AgentGuid, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly Guid agentGuid;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private AgentConfiguration? configurationToStore;
private bool hasScheduledFlush;
private AgentDatabaseStorageActor(Init init) {
this.agentGuid = init.AgentGuid;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<StoreAgentConfigurationCommand>(StoreAgentConfiguration);
ReceiveAsync<FlushChangesCommand>(FlushChanges);
}
public interface ICommand {}
public sealed record StoreAgentConfigurationCommand(AgentConfiguration Configuration) : ICommand;
private sealed record FlushChangesCommand : ICommand;
private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) {
this.configurationToStore = command.Configuration;
ScheduleFlush(TimeSpan.FromSeconds(2));
}
private async Task FlushChanges(FlushChangesCommand command) {
hasScheduledFlush = false;
if (configurationToStore == null) {
return;
}
try {
await using var ctx = dbProvider.Eager();
var entity = ctx.AgentUpsert.Fetch(agentGuid);
entity.Name = configurationToStore.AgentName;
entity.ProtocolVersion = configurationToStore.ProtocolVersion;
entity.BuildVersion = configurationToStore.BuildVersion;
entity.MaxInstances = configurationToStore.MaxInstances;
entity.MaxMemory = configurationToStore.MaxMemory;
await ctx.SaveChangesAsync(cancellationToken);
} catch (Exception e) {
ScheduleFlush(TimeSpan.FromSeconds(10));
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) in database.", configurationToStore.AgentName, agentGuid);
return;
}
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) in database.", configurationToStore.AgentName, agentGuid);
configurationToStore = null;
}
private void ScheduleFlush(TimeSpan delay) {
if (hasScheduledFlush) {
return;
}
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}

View File

@ -1,15 +0,0 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Utils.Collections;
namespace Phantom.Controller.Services.Agents;
sealed class AgentJavaRuntimesManager {
private readonly RwLockedDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> runtimes = new (LockRecursionPolicy.NoRecursion);
public ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> All => runtimes.ToImmutable();
internal void Update(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
this.runtimes[agentGuid] = runtimes;
}
}

View File

@ -1,153 +1,94 @@
using System.Collections.Immutable;
using System.Collections.Concurrent;
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Collections;
using Phantom.Utils.Events;
using Phantom.Controller.Minecraft;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentManager {
private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>();
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
private readonly ObservableAgents agents = new (PhantomLogger.Create<AgentManager, ObservableAgents>());
public EventSubscribers<ImmutableArray<Agent>> AgentsChanged => agents.Subs;
private readonly CancellationToken cancellationToken;
private readonly ActorSystem actorSystem;
private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
public AgentManager(AuthToken authToken, IDbContextProvider dbProvider, TaskManager taskManager, CancellationToken cancellationToken) {
private readonly CancellationToken cancellationToken;
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
public AgentManager(ActorSystem actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.actorSystem = actorSystem;
this.authToken = authToken;
this.controllerState = controllerState;
this.minecraftVersions = minecraftVersions;
this.dbProvider = dbProvider;
this.cancellationToken = cancellationToken;
taskManager.Run("Refresh agent status loop", RefreshAgentStatus);
this.addAgentActorFactory = CreateAgentActor;
}
internal async Task Initialize() {
private ActorRef<AgentActor.ICommand> CreateAgentActor(Guid agentGuid, AgentConfiguration agentConfiguration) {
var init = new AgentActor.Init(agentGuid, agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
var name = "Agent:" + agentGuid;
return actorSystem.ActorOf(AgentActor.Factory(init), name);
}
public async Task Initialize() {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Agents.AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var agent = new Agent(entity.AgentGuid, entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
if (!agents.ByGuid.AddOrReplaceIf(agent.Guid, agent, static oldAgent => oldAgent.IsOffline)) {
// TODO
throw new InvalidOperationException("Unable to register agent from database: " + agent.Guid);
var agentGuid = entity.AgentGuid;
var agentConfiguration = new AgentConfiguration(entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
if (agentsByGuid.TryAdd(agentGuid, CreateAgentActor(agentGuid, agentConfiguration))) {
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentConfiguration.AgentName, agentGuid);
}
}
}
public ImmutableDictionary<Guid, Agent> GetAgents() {
return agents.ByGuid.ToImmutable();
}
internal async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, InstanceManager instanceManager, RpcConnectionToClient<IMessageToAgentListener> connection) {
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgentListener> connection) {
if (!this.authToken.FixedTimeEquals(authToken)) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
return false;
}
var agent = new Agent(agentInfo) {
LastPing = DateTimeOffset.Now,
IsOnline = true,
Connection = new AgentConnection(connection)
};
if (agents.ByGuid.AddOrReplace(agent.Guid, agent, out var oldAgent)) {
oldAgent.Connection?.Close();
}
await using (var ctx = dbProvider.Eager()) {
var entity = ctx.AgentUpsert.Fetch(agent.Guid);
entity.Name = agent.Name;
entity.ProtocolVersion = agent.ProtocolVersion;
entity.BuildVersion = agent.BuildVersion;
entity.MaxInstances = agent.MaxInstances;
entity.MaxMemory = agent.MaxMemory;
await ctx.SaveChangesAsync(cancellationToken);
}
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
var instanceConfigurations = await instanceManager.GetInstanceConfigurationsForAgent(agent.Guid);
await connection.Send(new RegisterAgentSuccessMessage(instanceConfigurations));
var agentProperties = AgentConfiguration.From(agentInfo);
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var configureInstanceMessages = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
return true;
}
internal bool UnregisterAgent(Guid agentGuid, RpcConnectionToClient<IMessageToAgentListener> connection) {
if (agents.ByGuid.TryReplaceIf(agentGuid, static oldAgent => oldAgent.AsOffline(), oldAgent => oldAgent.Connection?.IsSame(connection) == true)) {
Logger.Information("Unregistered agent with GUID {Guid}.", agentGuid);
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
agent.Tell(command);
return true;
}
else {
Logger.Warning("Could not deliver command {CommandType} to agent {AgentGuid}, agent not registered.", command.GetType().Name, agentGuid);
return false;
}
}
internal Agent? GetAgent(Guid guid) {
return agents.ByGuid.TryGetValue(guid, out var agent) ? agent : null;
}
internal void NotifyAgentIsAlive(Guid agentGuid) {
agents.ByGuid.TryReplace(agentGuid, static agent => agent.AsOnline(DateTimeOffset.Now));
}
internal void SetAgentStats(Guid agentGuid, int runningInstanceCount, RamAllocationUnits runningInstanceMemory) {
agents.ByGuid.TryReplace(agentGuid, agent => agent with { Stats = new AgentStats(runningInstanceCount, runningInstanceMemory) });
}
private async Task RefreshAgentStatus() {
static Agent MarkAgentAsOffline(Agent agent) {
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
return agent.AsDisconnected();
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
return await agent.Request(command, cancellationToken);
}
while (!cancellationToken.IsCancellationRequested) {
await Task.Delay(DisconnectionRecheckInterval, cancellationToken);
var now = DateTimeOffset.Now;
agents.ByGuid.ReplaceAllIf(MarkAgentAsOffline, agent => agent.IsOnline && agent.LastPing is {} lastPing && now - lastPing >= DisconnectionThreshold);
}
}
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
if (connection == null || agent == null) {
// TODO handle missing agent?
return null;
}
try {
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
} catch (Exception e) {
Logger.Error(e, "Could not send message to agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
return null;
}
}
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
public RwLockedObservableDictionary<Guid, Agent> ByGuid { get; } = new (LockRecursionPolicy.NoRecursion);
public ObservableAgents(ILogger logger) : base(logger) {
ByGuid.CollectionChanged += Update;
}
protected override ImmutableArray<Agent> GetData() {
return ByGuid.ValuesCopy;
else {
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);
}
}
}

View File

@ -1,4 +1,5 @@
using Phantom.Common.Data;
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web;
using Phantom.Controller.Database;
@ -8,19 +9,21 @@ using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services;
public sealed class ControllerServices {
public sealed class ControllerServices : IAsyncDisposable {
public ActorSystem ActorSystem { get; }
private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; }
private AgentManager AgentManager { get; }
private AgentJavaRuntimesManager AgentJavaRuntimesManager { get; }
private InstanceManager InstanceManager { get; }
private InstanceLogManager InstanceLogManager { get; }
private EventLogManager EventLogManager { get; }
@ -31,18 +34,23 @@ public sealed class ControllerServices {
private UserRoleManager UserRoleManager { get; }
private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; }
private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken;
public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, AuthToken webAuthToken, CancellationToken shutdownCancellationToken) {
this.dbProvider = dbProvider;
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(agentAuthToken, dbProvider, TaskManager, shutdownCancellationToken);
this.AgentJavaRuntimesManager = new AgentJavaRuntimesManager();
this.InstanceManager = new InstanceManager(AgentManager, MinecraftVersions, dbProvider, shutdownCancellationToken);
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider);
@ -53,25 +61,25 @@ public sealed class ControllerServices {
this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager);
this.AuditLogManager = new AuditLogManager(dbProvider);
this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken);
this.dbProvider = dbProvider;
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
}
public AgentMessageListener CreateAgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection) {
return new AgentMessageListener(connection, AgentManager, AgentJavaRuntimesManager, InstanceManager, InstanceLogManager, EventLogManager, cancellationToken);
return new AgentMessageListener(connection, AgentManager, InstanceLogManager, EventLogManager, cancellationToken);
}
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(connection, webAuthToken, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, AgentJavaRuntimesManager, InstanceManager, InstanceLogManager, MinecraftVersions, EventLogManager, TaskManager);
return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
}
public async Task Initialize() {
await DatabaseMigrator.Run(dbProvider, cancellationToken);
await AgentManager.Initialize();
await PermissionManager.Initialize();
await RoleManager.Initialize();
await AgentManager.Initialize();
await InstanceManager.Initialize();
}
public async ValueTask DisposeAsync() {
await ActorSystem.Terminate();
ActorSystem.Dispose();
}
}

View File

@ -0,0 +1,33 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Utils.Actor.Event;
namespace Phantom.Controller.Services;
sealed class ControllerState {
private readonly ObservableState<ImmutableDictionary<Guid, Agent>> agentsByGuid = new (ImmutableDictionary<Guid, Agent>.Empty);
private readonly ObservableState<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> agentJavaRuntimesByGuid = new (ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>.Empty);
private readonly ObservableState<ImmutableDictionary<Guid, Instance>> instancesByGuid = new (ImmutableDictionary<Guid, Instance>.Empty);
public ImmutableDictionary<Guid, Agent> AgentsByGuid => agentsByGuid.State;
public ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> AgentJavaRuntimesByGuid => agentJavaRuntimesByGuid.State;
public ImmutableDictionary<Guid, Instance> InstancesByGuid => instancesByGuid.State;
public ObservableState<ImmutableDictionary<Guid, Agent>>.Receiver AgentsByGuidReceiver => agentsByGuid.ReceiverSide;
public ObservableState<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>.Receiver AgentJavaRuntimesByGuidReceiver => agentJavaRuntimesByGuid.ReceiverSide;
public ObservableState<ImmutableDictionary<Guid, Instance>>.Receiver InstancesByGuidReceiver => instancesByGuid.ReceiverSide;
public void UpdateAgent(Agent agent) {
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.AgentGuid, agent), agent);
}
public void UpdateAgentJavaRuntimes(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
agentJavaRuntimesByGuid.PublisherSide.Publish(static (agentJavaRuntimesByGuid, agentGuid, runtimes) => agentJavaRuntimesByGuid.SetItem(agentGuid, runtimes), agentGuid, runtimes);
}
public void UpdateInstance(Instance instance) {
instancesByGuid.PublisherSide.Publish(static (instancesByGuid, instance) => instancesByGuid.SetItem(instance.InstanceGuid, instance), instance);
}
}

View File

@ -0,0 +1,135 @@
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Services.Agents;
using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly AgentConnection agentConnection;
private readonly CancellationToken cancellationToken;
private readonly Guid instanceGuid;
private InstanceConfiguration configuration;
private IInstanceStatus status;
private bool launchAutomatically;
private readonly ActorRef<InstanceDatabaseStorageActor.ICommand> databaseStorageActor;
private InstanceActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentConnection = init.AgentConnection;
this.cancellationToken = init.CancellationToken;
(this.instanceGuid, this.configuration, this.status, this.launchAutomatically) = init.Instance;
this.databaseStorageActor = Context.ActorOf(InstanceDatabaseStorageActor.Factory(new InstanceDatabaseStorageActor.Init(instanceGuid, init.DbProvider, init.CancellationToken)), "DatabaseStorage");
Receive<SetStatusCommand>(SetStatus);
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAsyncAndReply<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
}
private void NotifyInstanceUpdated() {
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
}
private void SetLaunchAutomatically(bool newValue) {
if (launchAutomatically != newValue) {
launchAutomatically = newValue;
NotifyInstanceUpdated();
}
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
var reply = await agentConnection.Send<TMessage, InstanceActionResult<TReply>>(message, TimeSpan.FromSeconds(10), cancellationToken);
return reply.DidNotReplyIfNull();
}
public interface ICommand {}
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, Guid InstanceGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void SetStatus(SetStatusCommand command) {
status = command.Status;
NotifyInstanceUpdated();
}
private async Task<InstanceActionResult<ConfigureInstanceResult>> ConfigureInstance(ConfigureInstanceCommand command) {
var message = new ConfigureInstanceMessage(command.InstanceGuid, command.Configuration, command.LaunchProperties);
var result = await SendInstanceActionMessage<ConfigureInstanceMessage, ConfigureInstanceResult>(message);
if (result.Is(ConfigureInstanceResult.Success)) {
configuration = command.Configuration;
NotifyInstanceUpdated();
var storeCommand = new InstanceDatabaseStorageActor.StoreInstanceConfigurationCommand(
command.AuditLogUserGuid,
command.IsCreatingInstance,
configuration
);
databaseStorageActor.Tell(storeCommand);
}
return result;
}
private async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
var message = new LaunchInstanceMessage(instanceGuid);
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(message);
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
SetLaunchAutomatically(true);
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceLaunchedCommand(command.AuditLogUserGuid));
}
return result;
}
private async Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
var message = new StopInstanceMessage(instanceGuid, command.StopStrategy);
var result = await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(message);
if (result.Is(StopInstanceResult.StopInitiated)) {
SetLaunchAutomatically(false);
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceStoppedCommand(command.AuditLogUserGuid, command.StopStrategy));
}
return result;
}
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
var message = new SendCommandToInstanceMessage(instanceGuid, command.Command);
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);
if (result.Is(SendCommandToInstanceResult.Success)) {
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceCommandSentCommand(command.AuditLogUserGuid, command.Command));
}
return result;
}
}

View File

@ -0,0 +1,116 @@
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceDatabaseStorageActor : ReceiveActor<InstanceDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceDatabaseStorageActor>();
public readonly record struct Init(Guid InstanceGuid, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly Guid instanceGuid;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private InstanceDatabaseStorageActor(Init init) {
this.instanceGuid = init.InstanceGuid;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
ReceiveAsync<StoreInstanceConfigurationCommand>(StoreInstanceConfiguration);
ReceiveAsync<StoreInstanceLaunchedCommand>(StoreInstanceLaunched);
ReceiveAsync<StoreInstanceStoppedCommand>(StoreInstanceStopped);
ReceiveAsync<StoreInstanceCommandSentCommand>(StoreInstanceCommandSent);
}
private ValueTask<InstanceEntity?> FindInstanceEntity(ILazyDbContext db) {
return db.Ctx.Instances.FindAsync(new object[] { instanceGuid }, cancellationToken);
}
public interface ICommand {}
public sealed record StoreInstanceConfigurationCommand(Guid AuditLogUserGuid, bool IsCreatingInstance, InstanceConfiguration Configuration) : ICommand;
public sealed record StoreInstanceLaunchedCommand(Guid AuditLogUserGuid) : ICommand;
public sealed record StoreInstanceStoppedCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand;
public sealed record StoreInstanceCommandSentCommand(Guid AuditLogUserGuid, string Command) : ICommand;
private async Task StoreInstanceConfiguration(StoreInstanceConfigurationCommand command) {
var configuration = command.Configuration;
await using (var db = dbProvider.Lazy()) {
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(instanceGuid);
entity.AgentGuid = configuration.AgentGuid;
entity.InstanceName = configuration.InstanceName;
entity.ServerPort = configuration.ServerPort;
entity.RconPort = configuration.RconPort;
entity.MinecraftVersion = configuration.MinecraftVersion;
entity.MinecraftServerKind = configuration.MinecraftServerKind;
entity.MemoryAllocation = configuration.MemoryAllocation;
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
if (command.IsCreatingInstance) {
auditLogWriter.InstanceCreated(instanceGuid);
}
else {
auditLogWriter.InstanceEdited(instanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
}
Logger.Information("Stored instance \"{InstanceName}\" (GUID {InstanceGuid}) in database.", configuration.InstanceName, instanceGuid);
}
private async Task StoreInstanceLaunched(StoreInstanceLaunchedCommand command) {
await using var db = dbProvider.Lazy();
var entity = await FindInstanceEntity(db);
if (entity != null) {
entity.LaunchAutomatically = true;
}
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceLaunched(instanceGuid);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
private async Task StoreInstanceStopped(StoreInstanceStoppedCommand command) {
await using var db = dbProvider.Lazy();
var entity = await FindInstanceEntity(db);
if (entity != null) {
entity.LaunchAutomatically = false;
}
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceStopped(instanceGuid, command.StopStrategy.Seconds);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
private async Task StoreInstanceCommandSent(StoreInstanceCommandSentCommand command) {
await using var db = dbProvider.Lazy();
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceCommandExecuted(instanceGuid, command.Command);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
}

View File

@ -1,241 +0,0 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Utils.Collections;
using Phantom.Utils.Events;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceManager {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceManager>();
private readonly ObservableInstances instances = new (PhantomLogger.Create<InstanceManager, ObservableInstances>());
public EventSubscribers<ImmutableDictionary<Guid, Instance>> InstancesChanged => instances.Subs;
private readonly AgentManager agentManager;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly SemaphoreSlim modifyInstancesSemaphore = new (1, 1);
public InstanceManager(AgentManager agentManager, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.agentManager = agentManager;
this.minecraftVersions = minecraftVersions;
this.dbProvider = dbProvider;
this.cancellationToken = cancellationToken;
}
public async Task Initialize() {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Instances.AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var configuration = new InstanceConfiguration(
entity.AgentGuid,
entity.InstanceGuid,
entity.InstanceName,
entity.ServerPort,
entity.RconPort,
entity.MinecraftVersion,
entity.MinecraftServerKind,
entity.MemoryAllocation,
entity.JavaRuntimeGuid,
JvmArgumentsHelper.Split(entity.JvmArguments)
);
var instance = Instance.Offline(configuration, entity.LaunchAutomatically);
instances.ByGuid[instance.Configuration.InstanceGuid] = instance;
}
}
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
public async Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid auditLogUserGuid, InstanceConfiguration configuration) {
var agent = agentManager.GetAgent(configuration.AgentGuid);
if (agent == null) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.AgentNotFound);
}
if (string.IsNullOrWhiteSpace(configuration.InstanceName)) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty);
}
if (configuration.MemoryAllocation <= RamAllocationUnits.Zero) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero);
}
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
if (serverExecutableInfo == null) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound);
}
InstanceActionResult<CreateOrUpdateInstanceResult> result;
bool isNewInstance;
await modifyInstancesSemaphore.WaitAsync(cancellationToken);
try {
isNewInstance = !instances.ByGuid.TryReplace(configuration.InstanceGuid, instance => instance with { Configuration = configuration });
if (isNewInstance) {
instances.ByGuid.TryAdd(configuration.InstanceGuid, Instance.Offline(configuration));
}
var message = new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo));
var reply = await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, message, TimeSpan.FromSeconds(10));
result = reply.DidNotReplyIfNull().Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
await using var db = dbProvider.Lazy();
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
entity.AgentGuid = configuration.AgentGuid;
entity.InstanceName = configuration.InstanceName;
entity.ServerPort = configuration.ServerPort;
entity.RconPort = configuration.RconPort;
entity.MinecraftVersion = configuration.MinecraftVersion;
entity.MinecraftServerKind = configuration.MinecraftServerKind;
entity.MemoryAllocation = configuration.MemoryAllocation;
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(auditLogUserGuid);
if (isNewInstance) {
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
}
else {
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
}
else if (isNewInstance) {
instances.ByGuid.Remove(configuration.InstanceGuid);
}
} finally {
modifyInstancesSemaphore.Release();
}
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
if (isNewInstance) {
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", configuration.InstanceName, configuration.InstanceGuid, agent.Name);
}
else {
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", configuration.InstanceName, configuration.InstanceGuid, agent.Name);
}
}
else {
if (isNewInstance) {
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agent.Name, result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
}
else {
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agent.Name, result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
}
}
return result;
}
internal void SetInstanceState(Guid instanceGuid, IInstanceStatus instanceStatus) {
instances.ByGuid.TryReplace(instanceGuid, instance => instance with { Status = instanceStatus });
}
internal void SetInstanceStatesForAgent(Guid agentGuid, IInstanceStatus instanceStatus) {
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(Instance instance, TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
var reply = await agentManager.SendMessage<TMessage, InstanceActionResult<TReply>>(instance.Configuration.AgentGuid, message, TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(Guid instanceGuid, TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
return instances.ByGuid.TryGetValue(instanceGuid, out var instance) ? await SendInstanceActionMessage<TMessage, TReply>(instance, message) : InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid auditLogUserGuid, Guid instanceGuid) {
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(instanceGuid, new LaunchInstanceMessage(instanceGuid));
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(instanceGuid, true, auditLogUserGuid, auditLogWriter => auditLogWriter.InstanceLaunched(instanceGuid));
}
return result;
}
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid auditLogUserGuid, Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
var result = await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(instanceGuid, new StopInstanceMessage(instanceGuid, stopStrategy));
if (result.Is(StopInstanceResult.StopInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(instanceGuid, false, auditLogUserGuid, auditLogWriter => auditLogWriter.InstanceStopped(instanceGuid, stopStrategy.Seconds));
}
return result;
}
private async Task HandleInstanceManuallyLaunchedOrStopped(Guid instanceGuid, bool wasLaunched, Guid auditLogUserGuid, Action<AuditLogRepository.ItemWriter> addAuditEvent) {
await modifyInstancesSemaphore.WaitAsync(cancellationToken);
try {
instances.ByGuid.TryReplace(instanceGuid, instance => instance with { LaunchAutomatically = wasLaunched });
await using var db = dbProvider.Lazy();
var entity = await db.Ctx.Instances.FindAsync(new object[] { instanceGuid }, cancellationToken);
if (entity != null) {
entity.LaunchAutomatically = wasLaunched;
addAuditEvent(new AuditLogRepository(db).Writer(auditLogUserGuid));
await db.Ctx.SaveChangesAsync(cancellationToken);
}
} finally {
modifyInstancesSemaphore.Release();
}
}
public async Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommand(Guid auditLogUserId, Guid instanceGuid, string command) {
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(instanceGuid, new SendCommandToInstanceMessage(instanceGuid, command));
if (result.Is(SendCommandToInstanceResult.Success)) {
await using var db = dbProvider.Lazy();
var auditLogWriter = new AuditLogRepository(db).Writer(auditLogUserId);
auditLogWriter.InstanceCommandExecuted(instanceGuid, command);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
return result;
}
internal async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(Guid agentGuid) {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
foreach (var (configuration, _, launchAutomatically) in instances.ByGuid.ValuesCopy.Where(instance => instance.Configuration.AgentGuid == agentGuid)) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
private sealed class ObservableInstances : ObservableState<ImmutableDictionary<Guid, Instance>> {
public RwLockedObservableDictionary<Guid, Instance> ByGuid { get; } = new (LockRecursionPolicy.NoRecursion);
public ObservableInstances(ILogger logger) : base(logger) {
ByGuid.CollectionChanged += Update;
}
protected override ImmutableDictionary<Guid, Instance> GetData() {
return ByGuid.ToImmutable();
}
}
}

View File

@ -6,6 +6,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" />
<PackageReference Include="BCrypt.Net-Next.StrongName" />
</ItemGroup>
@ -14,6 +15,7 @@
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj" />
<ProjectReference Include="..\Phantom.Controller.Database\Phantom.Controller.Database.csproj" />
<ProjectReference Include="..\Phantom.Controller.Minecraft\Phantom.Controller.Minecraft.csproj" />

View File

@ -1,5 +1,4 @@
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
@ -16,32 +15,28 @@ namespace Phantom.Controller.Services.Rpc;
public sealed class AgentMessageListener : IMessageToControllerListener {
private readonly RpcConnectionToClient<IMessageToAgentListener> connection;
private readonly AgentManager agentManager;
private readonly AgentJavaRuntimesManager agentJavaRuntimesManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
private readonly CancellationToken cancellationToken;
private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>();
internal AgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection, AgentManager agentManager, AgentJavaRuntimesManager agentJavaRuntimesManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager, CancellationToken cancellationToken) {
internal AgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection, AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager, CancellationToken cancellationToken) {
this.connection = connection;
this.agentManager = agentManager;
this.agentJavaRuntimesManager = agentJavaRuntimesManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
this.eventLogManager = eventLogManager;
this.cancellationToken = cancellationToken;
}
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) {
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.AgentGuid) {
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
}
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection)) {
connection.SetAuthorizationResult(true);
agentGuidWaiter.SetResult(message.AgentInfo.Guid);
agentGuidWaiter.SetResult(message.AgentInfo.AgentGuid);
}
return NoReply.Instance;
@ -53,34 +48,31 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted) {
var agentGuid = agentGuidWaiter.Task.Result;
if (agentManager.UnregisterAgent(agentGuid, connection)) {
instanceManager.SetInstanceStatesForAgent(agentGuid, InstanceStatus.Offline);
}
agentManager.TellAgent(agentGuidWaiter.Task.Result, new AgentActor.UnregisterCommand(connection));
}
connection.Close();
return Task.FromResult(NoReply.Instance);
}
public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.NotifyAgentIsAlive(await WaitForAgentGuid());
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.NotifyIsAliveCommand());
return NoReply.Instance;
}
public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentJavaRuntimesManager.Update(await WaitForAgentGuid(), message.Runtimes);
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message) {
agentManager.SetAgentStats(await WaitForAgentGuid(), message.RunningInstanceCount, message.RunningInstanceMemory);
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
return NoReply.Instance;
}
public Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
instanceManager.SetInstanceState(message.InstanceGuid, message.InstanceStatus);
return Task.FromResult(NoReply.Instance);
public async Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message) {

View File

@ -1,9 +1,9 @@
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
@ -17,93 +17,118 @@ using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog;
using Agent = Phantom.Common.Data.Web.Agent.Agent;
namespace Phantom.Controller.Services.Rpc;
public sealed class WebMessageListener : IMessageToControllerListener {
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
private static int listenerSequenceId = 0;
private readonly ActorRef<ICommand> actor;
private readonly RpcConnectionToClient<IMessageToWebListener> connection;
private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly AgentJavaRuntimesManager agentJavaRuntimesManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
private readonly TaskManager taskManager;
internal WebMessageListener(
IActorRefFactory actorSystem,
RpcConnectionToClient<IMessageToWebListener> connection,
AuthToken authToken,
ControllerState controllerState,
UserManager userManager,
RoleManager roleManager,
UserRoleManager userRoleManager,
UserLoginManager userLoginManager,
AuditLogManager auditLogManager,
AgentManager agentManager,
AgentJavaRuntimesManager agentJavaRuntimesManager,
InstanceManager instanceManager,
InstanceLogManager instanceLogManager,
MinecraftVersions minecraftVersions,
EventLogManager eventLogManager,
TaskManager taskManager
EventLogManager eventLogManager
) {
this.actor = actorSystem.ActorOf(Actor.Factory(this), "Web-" + Interlocked.Increment(ref listenerSequenceId));
this.connection = connection;
this.authToken = authToken;
this.controllerState = controllerState;
this.userManager = userManager;
this.roleManager = roleManager;
this.userRoleManager = userRoleManager;
this.userLoginManager = userLoginManager;
this.auditLogManager = auditLogManager;
this.agentManager = agentManager;
this.agentJavaRuntimesManager = agentJavaRuntimesManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
this.minecraftVersions = minecraftVersions;
this.eventLogManager = eventLogManager;
this.taskManager = taskManager;
}
private void OnConnectionReady() {
lock (this) {
agentManager.AgentsChanged.Subscribe(this, HandleAgentsChanged);
instanceManager.InstancesChanged.Subscribe(this, HandleInstancesChanged);
instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
private sealed class Actor : ReceiveActor<ICommand> {
public static Props<ICommand> Factory(WebMessageListener listener) {
return Props<ICommand>.Create(() => new Actor(listener), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly WebMessageListener listener;
private Actor(WebMessageListener listener) {
this.listener = listener;
Receive<StartConnectionCommand>(StartConnection);
Receive<StopConnectionCommand>(StopConnection);
Receive<RefreshAgentsCommand>(RefreshAgents);
Receive<RefreshInstancesCommand>(RefreshInstances);
}
private void StartConnection(StartConnectionCommand command) {
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
}
private void StopConnection(StopConnectionCommand command) {
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
}
private void RefreshAgents(RefreshAgentsCommand command) {
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void RefreshInstances(RefreshInstancesCommand command) {
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
}
}
private void OnConnectionClosed() {
lock (this) {
agentManager.AgentsChanged.Unsubscribe(this);
instanceManager.InstancesChanged.Unsubscribe(this);
instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
}
}
private interface ICommand {}
private void HandleAgentsChanged(ImmutableArray<Agent> agents) {
var message = new RefreshAgentsMessage(agents.Select(static agent => new AgentWithStats(agent.Guid, agent.Name, agent.ProtocolVersion, agent.BuildVersion, agent.MaxInstances, agent.MaxMemory, agent.AllowedServerPorts, agent.AllowedRconPorts, agent.Stats, agent.LastPing, agent.IsOnline)).ToImmutableArray());
taskManager.Run("Send agents to web", () => connection.Send(message));
}
private sealed record StartConnectionCommand : ICommand;
private void HandleInstancesChanged(ImmutableDictionary<Guid, Instance> instances) {
var message = new RefreshInstancesMessage(instances.Values.ToImmutableArray());
taskManager.Run("Send instances to web", () => connection.Send(message));
}
private sealed record StopConnectionCommand : ICommand;
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
taskManager.Run("Send instance logs to web", () => connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines)));
}
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
if (authToken.FixedTimeEquals(message.AuthToken)) {
@ -118,7 +143,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
}
if (!connection.IsClosed) {
OnConnectionReady();
actor.Tell(new StartConnectionCommand());
}
return NoReply.Instance;
@ -127,7 +152,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
public Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message) {
if (!connection.IsClosed) {
connection.Close();
OnConnectionClosed();
actor.Tell(new StopConnectionCommand());
}
return Task.FromResult(NoReply.Instance);
@ -162,19 +187,19 @@ public sealed class WebMessageListener : IMessageToControllerListener {
}
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return instanceManager.CreateOrUpdateInstance(message.LoggedInUserGuid, message.Configuration);
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
}
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return instanceManager.LaunchInstance(message.LoggedInUserGuid, message.InstanceGuid);
return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
}
public Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return instanceManager.StopInstance(message.LoggedInUserGuid, message.InstanceGuid, message.StopStrategy);
return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
}
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return instanceManager.SendCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Command);
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
}
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
@ -182,7 +207,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
}
public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) {
return Task.FromResult(agentJavaRuntimesManager.All);
return Task.FromResult(controllerState.AgentJavaRuntimesByGuid);
}
public Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) {

View File

@ -51,22 +51,22 @@ try {
return 1;
}
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();

View File

@ -1,12 +1,15 @@
<Project>
<ItemGroup>
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="8.0.0" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="8.0.0" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Relational" Version="8.0.0" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="8.0.0" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.0" />
<PackageReference Update="System.Linq.Async" Version="6.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="8.0.0" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Update="Microsoft.EntityFrameworkCore.Relational" Version="8.0.0" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="8.0.0" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.0" />
<PackageReference Update="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
@ -14,12 +17,13 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="BCrypt.Net-Next.StrongName" Version="4.0.3" />
<PackageReference Update="Akka" Version="1.5.17.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Update="MemoryPack" Version="1.10.0" />
<PackageReference Update="NetMQ" Version="4.0.1.13" />
<PackageReference Update="BCrypt.Net-Next.StrongName" Version="4.0.3" />
<PackageReference Update="MemoryPack" Version="1.10.0" />
<PackageReference Update="NetMQ" Version="4.0.1.13" />
</ItemGroup>
<ItemGroup>

View File

@ -44,6 +44,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Controller.Services
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils", "Utils\Phantom.Utils\Phantom.Utils.csproj", "{384885E2-5113-45C5-9B15-09BDA0911852}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Actor", "Utils\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj", "{BBFF32C1-A98A-44BF-9023-04344BBB896B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Events", "Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj", "{2E81523B-5DBE-4992-A77B-1679758D0688}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Logging", "Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj", "{FCA141F5-4F18-47C2-9855-14E326FF1219}"
@ -126,6 +128,10 @@ Global
{384885E2-5113-45C5-9B15-09BDA0911852}.Debug|Any CPU.Build.0 = Debug|Any CPU
{384885E2-5113-45C5-9B15-09BDA0911852}.Release|Any CPU.ActiveCfg = Release|Any CPU
{384885E2-5113-45C5-9B15-09BDA0911852}.Release|Any CPU.Build.0 = Release|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Release|Any CPU.Build.0 = Release|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -171,6 +177,7 @@ Global
{4B3B73E6-48DD-4846-87FD-DFB86619B67C} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}
{90F0F1B1-EB0A-49C9-8DF0-1153A87F77C9} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}
{384885E2-5113-45C5-9B15-09BDA0911852} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{BBFF32C1-A98A-44BF-9023-04344BBB896B} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{2E81523B-5DBE-4992-A77B-1679758D0688} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{FCA141F5-4F18-47C2-9855-14E326FF1219} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{BB112660-7A20-45E6-9195-65363B74027F} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}

View File

@ -0,0 +1,25 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) {
if (SupervisorStrategy != null) {
props = props.WithSupervisorStrategy(SupervisorStrategy);
}
if (MailboxType != null) {
props = props.WithMailbox(MailboxType);
}
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props;
}
}

View File

@ -0,0 +1,9 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public static class ActorExtensions {
public static ActorRef<TMessage> ActorOf<TMessage>(this IActorRefFactory factory, Props<TMessage> props, string? name) {
return new ActorRef<TMessage>(factory.ActorOf(props.Inner, name));
}
}

View File

@ -0,0 +1,19 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
sealed class ActorFactory<TActor> : IIndirectActorProducer where TActor : ActorBase {
public Type ActorType => typeof(TActor);
private readonly Func<TActor> constructor;
public ActorFactory(Func<TActor> constructor) {
this.constructor = constructor;
}
public ActorBase Produce() {
return constructor();
}
public void Release(ActorBase actor) {}
}

View File

@ -0,0 +1,35 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public readonly struct ActorRef<TMessage> {
private readonly IActorRef actorRef;
internal ActorRef(IActorRef actorRef) {
this.actorRef = actorRef;
}
internal bool IsSame<TOtherMessage>(ActorRef<TOtherMessage> other) {
return actorRef.Equals(other.actorRef);
}
public void Tell(TMessage message) {
actorRef.Tell(message);
}
public void Forward(TMessage message) {
actorRef.Forward(message);
}
public Task<TReply> Request<TReply>(ICanReply<TReply> message, TimeSpan? timeout, CancellationToken cancellationToken = default) {
return actorRef.Ask<TReply>(message, timeout, cancellationToken);
}
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken);
}
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
}

View File

@ -0,0 +1,31 @@
using Akka.Actor;
using Akka.Configuration;
namespace Phantom.Utils.Actor;
public static class ActorSystemFactory {
private const string Configuration =
"""
akka {
actor {
default-dispatcher = {
executor = task-executor
}
internal-dispatcher = akka.actor.default-dispatcher
debug.unhandled = on
}
loggers = [
"Phantom.Utils.Actor.Logging.SerilogLogger, Phantom.Utils.Actor"
]
}
unbounded-jump-ahead-mailbox {
mailbox-type : "Phantom.Utils.Actor.Mailbox.UnboundedJumpAheadMailbox, Phantom.Utils.Actor"
}
""";
private static readonly BootstrapSetup Setup = BootstrapSetup.Create().WithConfig(ConfigurationFactory.ParseString(Configuration));
public static ActorSystem Create(string name) {
return ActorSystem.Create(name, Setup);
}
}

View File

@ -0,0 +1,113 @@
namespace Phantom.Utils.Actor.Event;
public sealed class ObservableState<TState> {
private readonly ReaderWriterLockSlim rwLock = new (LockRecursionPolicy.NoRecursion);
private readonly List<IListener> listeners = new ();
private TState state;
public TState State {
get {
rwLock.EnterReadLock();
try {
return state;
} finally {
rwLock.ExitReadLock();
}
}
}
public Publisher PublisherSide { get; }
public Receiver ReceiverSide { get; }
public ObservableState(TState state) {
this.state = state;
this.PublisherSide = new Publisher(this);
this.ReceiverSide = new Receiver(this);
}
private interface IListener {
bool IsFor<TMessage>(ActorRef<TMessage> other);
void Notify(TState state);
}
private readonly record struct Listener<TMessage>(ActorRef<TMessage> Actor, Func<TState, TMessage> MessageFactory) : IListener {
public bool IsFor<TOtherMessage>(ActorRef<TOtherMessage> other) {
return Actor.IsSame(other);
}
public void Notify(TState state) {
Actor.Tell(MessageFactory(state));
}
}
public readonly struct Publisher {
private readonly ObservableState<TState> owner;
internal Publisher(ObservableState<TState> owner) {
this.owner = owner;
}
public void Publish(TState state) {
Publish(static (_, newState) => newState, state);
}
public void Publish<TArg>(Func<TState, TArg, TState> stateUpdater, TArg userObject) {
owner.rwLock.EnterWriteLock();
try {
SetInternalState(stateUpdater(owner.state, userObject));
} finally {
owner.rwLock.ExitWriteLock();
}
}
public void Publish<TArg1, TArg2>(Func<TState, TArg1, TArg2, TState> stateUpdater, TArg1 userObject1, TArg2 userObject2) {
owner.rwLock.EnterWriteLock();
try {
SetInternalState(stateUpdater(owner.state, userObject1, userObject2));
} finally {
owner.rwLock.ExitWriteLock();
}
}
private void SetInternalState(TState state) {
owner.state = state;
foreach (var listener in owner.listeners) {
listener.Notify(state);
}
}
}
public readonly struct Receiver {
private readonly ObservableState<TState> owner;
internal Receiver(ObservableState<TState> owner) {
this.owner = owner;
}
public void Register<TMessage>(ActorRef<TMessage> actor, Func<TState, TMessage> messageFactory) {
var listener = new Listener<TMessage>(actor, messageFactory);
owner.rwLock.EnterReadLock();
try {
owner.listeners.Add(listener);
listener.Notify(owner.state);
} finally {
owner.rwLock.ExitReadLock();
}
}
public void Unregister<TMessage>(ActorRef<TMessage> actor) {
owner.rwLock.EnterWriteLock();
try {
int index = owner.listeners.FindIndex(listener => listener.IsFor(actor));
if (index != -1) {
owner.listeners.RemoveAt(index);
}
} finally {
owner.rwLock.ExitWriteLock();
}
}
}
}

View File

@ -0,0 +1,5 @@
using System.Diagnostics.CodeAnalysis;
namespace Phantom.Utils.Actor;
public interface ICanReply<[SuppressMessage("ReSharper", "UnusedTypeParameter")] TReply> {}

View File

@ -0,0 +1,68 @@
using System.Diagnostics.CodeAnalysis;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
using Phantom.Utils.Logging;
using Serilog;
using Serilog.Core.Enrichers;
using Serilog.Events;
using LogEvent = Akka.Event.LogEvent;
namespace Phantom.Utils.Actor.Logging;
[SuppressMessage("ReSharper", "UnusedType.Global")]
public sealed class SerilogLogger : ReceiveActor, IRequiresMessageQueue<ILoggerMessageQueueSemantics> {
private readonly Dictionary<string, ILogger> loggersBySource = new ();
public SerilogLogger() {
Receive<InitializeLogger>(Initialize);
Receive<Debug>(LogDebug);
Receive<Info>(LogInfo);
Receive<Warning>(LogWarning);
Receive<Error>(LogError);
}
private void Initialize(InitializeLogger message) {
Sender.Tell(new LoggerInitialized());
}
private void LogDebug(Debug item) {
Log(item, LogEventLevel.Debug);
}
private void LogInfo(Info item) {
Log(item, LogEventLevel.Information);
}
private void LogWarning(Warning item) {
Log(item, LogEventLevel.Warning);
}
private void LogError(Error item) {
Log(item, LogEventLevel.Error);
}
private void Log(LogEvent item, LogEventLevel level) {
GetLogger(item).Write(level, item.Cause, GetFormat(item), GetArgs(item));
}
private ILogger GetLogger(LogEvent item) {
var source = item.LogSource;
if (!loggersBySource.TryGetValue(source, out var logger)) {
var loggerName = source[(source.IndexOf(':') + 1)..];
loggersBySource[source] = logger = PhantomLogger.Create("Akka", loggerName);
}
return logger;
}
private static string GetFormat(LogEvent item) {
return item.Message is LogMessage logMessage ? logMessage.Format : "{Message:l}";
}
private static object[] GetArgs(LogEvent item) {
return item.Message is LogMessage logMessage ? logMessage.Parameters().Where(static a => a is not PropertyEnricher).ToArray() : new[] { item.Message };
}
}

View File

@ -0,0 +1,6 @@
namespace Phantom.Utils.Actor.Mailbox;
/// <summary>
/// Marker interface for messages that jump ahead in the <see cref="UnboundedJumpAheadMailbox"/>.
/// </summary>
public interface IJumpAhead {}

View File

@ -0,0 +1,16 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Dispatch.MessageQueues;
namespace Phantom.Utils.Actor.Mailbox;
public sealed class UnboundedJumpAheadMailbox : MailboxType, IProducesMessageQueue<UnboundedJumpAheadMessageQueue> {
public const string Name = "unbounded-jump-ahead-mailbox";
public UnboundedJumpAheadMailbox(Settings settings, Config config) : base(settings, config) {}
public override IMessageQueue Create(IActorRef owner, ActorSystem system) {
return new UnboundedJumpAheadMessageQueue();
}
}

View File

@ -0,0 +1,24 @@
using Akka.Actor;
using Akka.Dispatch.MessageQueues;
namespace Phantom.Utils.Actor.Mailbox;
sealed class UnboundedJumpAheadMessageQueue : BlockingMessageQueue {
private readonly Queue<Envelope> highPriorityQueue = new ();
private readonly Queue<Envelope> lowPriorityQueue = new ();
protected override int LockedCount => highPriorityQueue.Count + lowPriorityQueue.Count;
protected override void LockedEnqueue(Envelope envelope) {
if (envelope.Message is IJumpAhead) {
highPriorityQueue.Enqueue(envelope);
}
else {
lowPriorityQueue.Enqueue(envelope);
}
}
protected override bool LockedTryDequeue(out Envelope envelope) {
return highPriorityQueue.TryDequeue(out envelope) || lowPriorityQueue.TryDequeue(out envelope);
}
}

View File

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,19 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public sealed class Props<TMessage> {
internal Props Inner { get; }
private Props(Props inner) {
Inner = inner;
}
private static Props CreateInner<TActor>(Func<TActor> factory) where TActor : ReceiveActor<TMessage> {
return Props.CreateBy(new ActorFactory<TActor>(factory));
}
public static Props<TMessage> Create<TActor>(Func<TActor> factory, ActorConfiguration configuration) where TActor : ReceiveActor<TMessage> {
return new Props<TMessage>(configuration.Apply(CreateInner(factory)));
}
}

View File

@ -0,0 +1,46 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public abstract class ReceiveActor<TMessage> : ReceiveActor {
protected ActorRef<TMessage> SelfTyped => new (Self);
protected void ReceiveAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
Receive<TReplyableMessage>(message => HandleMessageWithReply(action, message));
}
protected void ReceiveAndReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
// Must be async to set default task scheduler to actor scheduler.
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyLater(action, message));
}
protected void ReceiveAsyncAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyAsync(action, message));
}
private void HandleMessageWithReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
Sender.Tell(action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
private Task HandleMessageWithReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
action(message).PipeTo(Sender, Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
return Task.CompletedTask;
}
private async Task HandleMessageWithReplyAsync<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
}

View File

@ -0,0 +1,10 @@
using Akka.Actor;
using Akka.Util.Internal;
namespace Phantom.Utils.Actor;
public static class SupervisorStrategies {
private static DeployableDecider DefaultDecider { get; } = SupervisorStrategy.DefaultDecider.AsInstanceOf<DeployableDecider>();
public static SupervisorStrategy Resume { get; } = new OneForOneStrategy(Decider.From(Directive.Resume, DefaultDecider.Pairs));
}

View File

@ -0,0 +1,33 @@
using Akka.Dispatch;
namespace Phantom.Utils.Actor.Tasks;
public static class TaskExtensions {
public static Task<TResult> ContinueOnActor<TSource, TResult>(this Task<TSource> task, Func<TSource, TResult> mapper) {
if (TaskScheduler.Current is not ActorTaskScheduler actorTaskScheduler) {
throw new InvalidOperationException("Task must be scheduled in Actor context!");
}
var continuationCompletionSource = new TaskCompletionSource<TResult>();
var continuationTask = task.ContinueWith(t => MapResult(t, mapper, continuationCompletionSource), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, actorTaskScheduler);
return continuationTask.Unwrap();
}
public static Task<TResult> ContinueOnActor<TSource, TArg, TResult>(this Task<TSource> task, Func<TSource, TArg, TResult> mapper, TArg arg) {
return task.ContinueOnActor(result => mapper(result, arg));
}
private static Task<TResult> MapResult<TSource, TResult>(Task<TSource> task, Func<TSource, TResult> mapper, TaskCompletionSource<TResult> completionSource) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult(mapper(task.Result));
}
return completionSource.Task;
}
}

View File

@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>

View File

@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string LoggerName => "Rpc:" + ServiceName;
public string TcpUrl => "tcp://" + Host + ":" + Port;
}

View File

@ -0,0 +1,64 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc;
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly MessageHandler<TServerListener> messageHandler;
private readonly RpcConnectionToClient<TClientListener> connection;
private RpcReceiverActor(Init init) {
this.messageDefinitions = init.MessageDefinitions;
this.messageHandler = init.MessageHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public interface ICommand {}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
Handle(command.Data);
if (await connection.GetAuthorization()) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
});
}
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
Handle(command.Data);
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, messageHandler);
}
}

View File

@ -1,16 +1,24 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
private readonly ClientSocket socket;
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
public Task IsReady => isReady.Task;
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
this.socket = socket;
}
public void SetIsReady() {
isReady.TrySetResult();
}
private protected override ValueTask Send(byte[] bytes) {
return socket.SendAsync(bytes);
}

View File

@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
@ -9,25 +11,29 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
}
private readonly string serviceName;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly ActorSystem actorSystem;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.actorSystem = actorSystem;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.cancellationToken = cancellationToken;
}
@ -62,18 +68,17 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
}
client.Enqueue(messageType, data);
}
foreach (var client in clients.Values) {
@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly RpcQueue processingQueue;
private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
this.Connection = connection;
this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue;
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager;
}
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
}
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving();
await processingQueue.Stop();
await receiverActor.Stop();
await Connection.StopSending();
Logger.Debug("Connection closed.");
});

View File

@ -1,102 +0,0 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
namespace Phantom.Utils.Collections;
public sealed class RwLockedObservableDictionary<TKey, TValue> where TKey : notnull {
public event EventHandler? CollectionChanged;
private readonly RwLockedDictionary<TKey, TValue> dict;
public RwLockedObservableDictionary(LockRecursionPolicy recursionPolicy) {
this.dict = new RwLockedDictionary<TKey, TValue>(recursionPolicy);
}
public RwLockedObservableDictionary(int capacity, LockRecursionPolicy recursionPolicy) {
this.dict = new RwLockedDictionary<TKey, TValue>(capacity, recursionPolicy);
}
private void FireCollectionChanged() {
CollectionChanged?.Invoke(this, EventArgs.Empty);
}
private bool FireCollectionChangedIf(bool result) {
if (result) {
FireCollectionChanged();
return true;
}
else {
return false;
}
}
public TValue this[TKey key] {
get => dict[key];
set {
dict[key] = value;
FireCollectionChanged();
}
}
public ImmutableArray<TValue> ValuesCopy => dict.ValuesCopy;
public void ForEachValue(Action<TValue> action) {
dict.ForEachValue(action);
}
public bool TryGetValue(TKey key, [MaybeNullWhen(false)] out TValue value) {
return dict.TryGetValue(key, out value);
}
public bool GetOrAdd(TKey key, Func<TKey, TValue> valueFactory, out TValue value) {
return FireCollectionChangedIf(dict.GetOrAdd(key, valueFactory, out value));
}
public bool TryAdd(TKey key, TValue newValue) {
return FireCollectionChangedIf(dict.TryAdd(key, newValue));
}
public bool AddOrReplace(TKey key, TValue newValue, [MaybeNullWhen(false)] out TValue oldValue) {
return FireCollectionChangedIf(dict.AddOrReplace(key, newValue, out oldValue));
}
public bool AddOrReplaceIf(TKey key, TValue newValue, Predicate<TValue> replaceCondition) {
return FireCollectionChangedIf(dict.AddOrReplaceIf(key, newValue, replaceCondition));
}
public bool TryReplace(TKey key, Func<TValue, TValue> replacementValue) {
return FireCollectionChangedIf(dict.TryReplace(key, replacementValue));
}
public bool TryReplaceIf(TKey key, Func<TValue, TValue> replacementValue, Predicate<TValue> replaceCondition) {
return FireCollectionChangedIf(dict.TryReplaceIf(key, replacementValue, replaceCondition));
}
public bool ReplaceAll(Func<TValue, TValue> replacementValue) {
return FireCollectionChangedIf(dict.ReplaceAll(replacementValue));
}
public bool ReplaceAllIf(Func<TValue, TValue> replacementValue, Predicate<TValue> replaceCondition) {
return FireCollectionChangedIf(dict.ReplaceAllIf(replacementValue, replaceCondition));
}
public bool Remove(TKey key) {
return FireCollectionChangedIf(dict.Remove(key));
}
public bool RemoveIf(TKey key, Predicate<TValue> removeCondition) {
return FireCollectionChangedIf(dict.RemoveIf(key, removeCondition));
}
public bool RemoveAll(Predicate<KeyValuePair<TKey, TValue>> removeCondition) {
return FireCollectionChangedIf(dict.RemoveAll(removeCondition));
}
public ImmutableDictionary<TKey, TValue> ToImmutable() {
return dict.ToImmutable();
}
public ImmutableDictionary<TKey, TNewValue> ToImmutable<TNewValue>(Func<TValue, TNewValue> valueSelector) {
return dict.ToImmutable(valueSelector);
}
}

View File

@ -6,19 +6,19 @@ using Phantom.Utils.Logging;
namespace Phantom.Web.Services.Agents;
public sealed class AgentManager {
private readonly SimpleObservableState<ImmutableArray<AgentWithStats>> agents = new (PhantomLogger.Create<AgentManager>("Agents"), ImmutableArray<AgentWithStats>.Empty);
private readonly SimpleObservableState<ImmutableArray<Agent>> agents = new (PhantomLogger.Create<AgentManager>("Agents"), ImmutableArray<Agent>.Empty);
public EventSubscribers<ImmutableArray<AgentWithStats>> AgentsChanged => agents.Subs;
public EventSubscribers<ImmutableArray<Agent>> AgentsChanged => agents.Subs;
internal void RefreshAgents(ImmutableArray<AgentWithStats> newAgents) {
internal void RefreshAgents(ImmutableArray<Agent> newAgents) {
agents.SetTo(newAgents);
}
public ImmutableArray<AgentWithStats> GetAll() {
public ImmutableArray<Agent> GetAll() {
return agents.Value;
}
public ImmutableDictionary<Guid, AgentWithStats> ToDictionaryByGuid() {
return agents.Value.ToImmutableDictionary(static agent => agent.Guid);
public ImmutableDictionary<Guid, Agent> ToDictionaryByGuid() {
return agents.Value.ToImmutableDictionary(static agent => agent.AgentGuid);
}
}

View File

@ -23,7 +23,7 @@ public sealed class InstanceManager {
public EventSubscribers<InstanceDictionary> InstancesChanged => instances.Subs;
internal void RefreshInstances(ImmutableArray<Instance> newInstances) {
instances.SetTo(newInstances.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid));
instances.SetTo(newInstances.ToImmutableDictionary(static instance => instance.InstanceGuid));
}
public InstanceDictionary GetAll() {
@ -34,23 +34,23 @@ public sealed class InstanceManager {
return instances.Value.GetValueOrDefault(instanceGuid);
}
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid loggedInUserGuid, InstanceConfiguration configuration, CancellationToken cancellationToken) {
var message = new CreateOrUpdateInstanceMessage(loggedInUserGuid, configuration);
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid loggedInUserGuid, Guid instanceGuid, InstanceConfiguration configuration, CancellationToken cancellationToken) {
var message = new CreateOrUpdateInstanceMessage(loggedInUserGuid, instanceGuid, configuration);
return controllerConnection.Send<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(message, cancellationToken);
}
public Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid loggedInUserGuid, Guid instanceGuid, CancellationToken cancellationToken) {
var message = new LaunchInstanceMessage(loggedInUserGuid, instanceGuid);
public Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid loggedInUserGuid, Guid agentGuid, Guid instanceGuid, CancellationToken cancellationToken) {
var message = new LaunchInstanceMessage(loggedInUserGuid, agentGuid, instanceGuid);
return controllerConnection.Send<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(message, cancellationToken);
}
public Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid loggedInUserGuid, Guid instanceGuid, MinecraftStopStrategy stopStrategy, CancellationToken cancellationToken) {
var message = new StopInstanceMessage(loggedInUserGuid, instanceGuid, stopStrategy);
public Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid loggedInUserGuid, Guid agentGuid, Guid instanceGuid, MinecraftStopStrategy stopStrategy, CancellationToken cancellationToken) {
var message = new StopInstanceMessage(loggedInUserGuid, agentGuid, instanceGuid, stopStrategy);
return controllerConnection.Send<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(message, cancellationToken);
}
public Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommandToInstance(Guid loggedInUserGuid, Guid instanceGuid, string command, CancellationToken cancellationToken) {
var message = new SendCommandToInstanceMessage(loggedInUserGuid, instanceGuid, command);
public Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommandToInstance(Guid loggedInUserGuid, Guid agentGuid, Guid instanceGuid, string command, CancellationToken cancellationToken) {
var message = new SendCommandToInstanceMessage(loggedInUserGuid, agentGuid, instanceGuid, command);
return controllerConnection.Send<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(message, cancellationToken);
}
}

View File

@ -18,42 +18,47 @@
</HeaderRow>
<ItemRow Context="agent">
@{
var configuration = agent.Configuration;
var usedInstances = agent.Stats?.RunningInstanceCount;
var usedMemory = agent.Stats?.RunningInstanceMemory.InMegabytes;
}
<Cell>
<p class="fw-semibold">@agent.Name</p>
<small class="font-monospace text-uppercase">@agent.Guid.ToString()</small>
<p class="fw-semibold">@configuration.AgentName</p>
<small class="font-monospace text-uppercase">@agent.AgentGuid.ToString()</small>
</Cell>
<Cell class="text-end">
<ProgressBar Value="@(usedInstances ?? 0)" Maximum="@agent.MaxInstances">
@(usedInstances?.ToString() ?? "?") / @agent.MaxInstances.ToString()
<ProgressBar Value="@(usedInstances ?? 0)" Maximum="@configuration.MaxInstances">
@(usedInstances?.ToString() ?? "?") / @configuration.MaxInstances.ToString()
</ProgressBar>
</Cell>
<Cell class="text-end">
<ProgressBar Value="@(usedMemory ?? 0)" Maximum="@agent.MaxMemory.InMegabytes">
@(usedMemory?.ToString() ?? "?") / @agent.MaxMemory.InMegabytes.ToString() MB
<ProgressBar Value="@(usedMemory ?? 0)" Maximum="@configuration.MaxMemory.InMegabytes">
@(usedMemory?.ToString() ?? "?") / @configuration.MaxMemory.InMegabytes.ToString() MB
</ProgressBar>
</Cell>
<Cell class="text-condensed">
Build: <span class="font-monospace">@agent.BuildVersion</span>
Build: <span class="font-monospace">@configuration.BuildVersion</span>
<br>
Protocol: <span class="font-monospace">v@(agent.ProtocolVersion.ToString())</span>
Protocol: <span class="font-monospace">v@(configuration.ProtocolVersion.ToString())</span>
</Cell>
@if (agent.IsOnline) {
<Cell class="fw-semibold text-center text-success">Online</Cell>
<Cell class="text-end">-</Cell>
}
else {
<Cell class="fw-semibold text-center">Offline</Cell>
<Cell class="text-end">
@if (agent.LastPing is {} lastPing) {
<TimeWithOffset Time="lastPing" />
}
else {
<text>N/A</text>
}
</Cell>
@switch (agent.ConnectionStatus) {
case AgentIsOnline:
<Cell class="fw-semibold text-center text-success">Online</Cell>
<Cell class="text-end">-</Cell>
break;
case AgentIsOffline:
<Cell class="fw-semibold text-center">Offline</Cell>
<Cell class="text-end">N/A</Cell>
break;
case AgentIsDisconnected status:
<Cell class="fw-semibold text-center">Offline</Cell>
<Cell class="text-end">
<TimeWithOffset Time="status.LastPingTime" />
</Cell>
break;
default:
<Cell class="fw-semibold text-center">N/A</Cell>
break;
}
</ItemRow>
<NoItemsRow>
@ -63,12 +68,12 @@
@code {
private readonly TableData<AgentWithStats, Guid> agentTable = new();
private readonly TableData<Agent, Guid> agentTable = new();
protected override void OnInitialized() {
AgentManager.AgentsChanged.Subscribe(this, agents => {
var sortedAgents = agents.Sort(static (a1, a2) => a1.Name.CompareTo(a2.Name));
agentTable.UpdateFrom(sortedAgents, static agent => agent.Guid, static agent => agent, static (agent, _) => agent);
var sortedAgents = agents.Sort(static (a1, a2) => a1.Configuration.AgentName.CompareTo(a2.Configuration.AgentName));
agentTable.UpdateFrom(sortedAgents, static agent => agent.AgentGuid, static agent => agent, static (agent, _) => agent);
InvokeAsync(StateHasChanged);
});
}

View File

@ -58,7 +58,7 @@
try {
logItems = await AuditLogManager.GetMostRecentItems(50, cancellationToken);
userNamesByGuid = (await UserManager.GetAll(cancellationToken)).ToImmutableDictionary(static user => user.Guid, static user => user.Name);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.InstanceGuid, static instance => instance.Configuration.InstanceName);
} finally {
initializationCancellationTokenSource.Dispose();
}

View File

@ -61,8 +61,8 @@
try {
logItems = await EventLogManager.GetMostRecentItems(50, cancellationToken);
agentNamesByGuid = AgentManager.GetAll().ToImmutableDictionary(static kvp => kvp.Guid, static kvp => kvp.Name);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
agentNamesByGuid = AgentManager.GetAll().ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.Configuration.AgentName);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.InstanceGuid, static instance => instance.Configuration.InstanceName);
} finally {
initializationCancellationTokenSource.Dispose();
}

View File

@ -3,4 +3,4 @@
@attribute [Authorize(Permission.CreateInstancesPolicy)]
<h1>New Instance</h1>
<InstanceAddOrEditForm EditedInstanceConfiguration="null" />
<InstanceAddOrEditForm EditedInstance="null" />

View File

@ -41,10 +41,10 @@ else {
<PermissionView Permission="Permission.ControlInstances">
<div class="mb-3">
<InstanceCommandInput InstanceGuid="InstanceGuid" Disabled="@(!Instance.Status.CanSendCommand())" />
<InstanceCommandInput AgentGuid="Instance.Configuration.AgentGuid" InstanceGuid="InstanceGuid" Disabled="@(!Instance.Status.CanSendCommand())" />
</div>
<InstanceStopDialog InstanceGuid="InstanceGuid" ModalId="stop-instance" Disabled="@(!Instance.Status.CanStop())" />
<InstanceStopDialog AgentGuid="Instance.Configuration.AgentGuid" InstanceGuid="InstanceGuid" ModalId="stop-instance" Disabled="@(!Instance.Status.CanStop())" />
</PermissionView>
}
@ -79,7 +79,12 @@ else {
return;
}
var result = await InstanceManager.LaunchInstance(loggedInUserGuid.Value, InstanceGuid, CancellationToken);
if (Instance == null) {
lastError = "Instance not found.";
return;
}
var result = await InstanceManager.LaunchInstance(loggedInUserGuid.Value, Instance.Configuration.AgentGuid, InstanceGuid, CancellationToken);
if (!result.Is(LaunchInstanceResult.LaunchInitiated)) {
lastError = result.ToSentence(Messages.ToSentence);
}

View File

@ -1,18 +1,18 @@
@page "/instances/{InstanceGuid:guid}/edit"
@attribute [Authorize(Permission.CreateInstancesPolicy)]
@using Phantom.Common.Data.Instance
@using Phantom.Common.Data.Web.Instance
@using Phantom.Common.Data.Web.Users
@using Phantom.Web.Services.Instances
@inherits Phantom.Web.Components.PhantomComponent
@inherits PhantomComponent
@inject InstanceManager InstanceManager
@if (InstanceConfiguration == null) {
@if (Instance == null) {
<h1>Instance Not Found</h1>
<p>Return to <a href="instances">all instances</a>.</p>
}
else {
<h1>Edit Instance: @InstanceConfiguration.InstanceName</h1>
<InstanceAddOrEditForm EditedInstanceConfiguration="InstanceConfiguration" />
<h1>Edit Instance: @Instance.Configuration.InstanceName</h1>
<InstanceAddOrEditForm EditedInstance="Instance" />
}
@code {
@ -20,10 +20,10 @@ else {
[Parameter]
public Guid InstanceGuid { get; init; }
private InstanceConfiguration? InstanceConfiguration { get; set; }
private Instance? Instance { get; set; }
protected override void OnInitialized() {
InstanceConfiguration = InstanceManager.GetByGuid(InstanceGuid)?.Configuration;
Instance = InstanceManager.GetByGuid(InstanceGuid);
}
}

View File

@ -16,7 +16,7 @@
<a href="instances/create" class="btn btn-primary" role="button">New Instance</a>
</PermissionView>
<Table TItem="Instance" Items="instances" ItemUrl="@(static instance => "instances/" + instance.Configuration.InstanceGuid)">
<Table TItem="Instance" Items="instances" ItemUrl="@(static instance => "instances/" + instance.InstanceGuid)">
<HeaderRow>
<Column Width="40%">Agent</Column>
<Column Width="40%">Name</Column>
@ -35,7 +35,7 @@
</Cell>
<Cell>
<p class="fw-semibold">@configuration.InstanceName</p>
<small class="font-monospace text-uppercase">@configuration.InstanceGuid.ToString()</small>
<small class="font-monospace text-uppercase">@instance.InstanceGuid.ToString()</small>
</Cell>
<Cell>
<InstanceStatusText Status="instance.Status" />
@ -51,7 +51,7 @@
<p class="font-monospace">@configuration.MemoryAllocation.InMegabytes.ToString() MB</p>
</Cell>
<Cell>
<a href="instances/@configuration.InstanceGuid.ToString()" class="btn btn-info btn-sm">Detail</a>
<a href="instances/@instance.InstanceGuid.ToString()" class="btn btn-info btn-sm">Detail</a>
</Cell>
</ItemRow>
<NoItemsRow>
@ -66,7 +66,7 @@
protected override void OnInitialized() {
AgentManager.AgentsChanged.Subscribe(this, agents => {
this.agentNamesByGuid = agents.ToImmutableDictionary(static agent => agent.Guid, static agent => agent.Name);
this.agentNamesByGuid = agents.ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.Configuration.AgentName);
InvokeAsync(StateHasChanged);
});

View File

@ -8,9 +8,9 @@
@using Phantom.Common.Data.Web.Minecraft
@using Phantom.Common.Data.Web.Users
@using Phantom.Common.Messages.Web.ToController
@using Phantom.Common.Data.Instance
@using Phantom.Common.Data.Java
@using Phantom.Common.Data
@using Phantom.Common.Data.Instance
@using Phantom.Web.Services
@using Phantom.Web.Services.Agents
@using Phantom.Web.Services.Instances
@ -26,20 +26,21 @@
<div class="row">
<div class="col-xl-7 mb-3">
@{
static RenderFragment GetAgentOption(AgentWithStats agent) {
return @<option value="@agent.Guid">
@agent.Name
static RenderFragment GetAgentOption(Agent agent) {
var configuration = agent.Configuration;
return @<option value="@agent.AgentGuid">
@configuration.AgentName
&bullet;
@(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(agent.MaxInstances) @(agent.MaxInstances == 1 ? "Instance" : "Instances")
@(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(configuration.MaxInstances) @(configuration.MaxInstances == 1 ? "Instance" : "Instances")
&bullet;
@(agent.Stats?.RunningInstanceMemory.InMegabytes.ToString() ?? "?")/@(agent.MaxMemory.InMegabytes) MB RAM
@(agent.Stats?.RunningInstanceMemory.InMegabytes.ToString() ?? "?")/@(configuration.MaxMemory.InMegabytes) MB RAM
</option>;
}
}
@if (EditedInstanceConfiguration == null) {
@if (EditedInstance == null) {
<FormSelectInput Id="instance-agent" Label="Agent" @bind-Value="form.SelectedAgentGuid">
<option value="" selected>Select which agent will run the instance...</option>
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.IsOnline).OrderBy(static agent => agent.Name)) {
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.ConnectionStatus is AgentIsOnline).OrderBy(static agent => agent.Configuration.AgentName)) {
@GetAgentOption(agent)
}
</FormSelectInput>
@ -97,8 +98,8 @@
</div>
@{
string? allowedServerPorts = selectedAgent?.AllowedServerPorts?.ToString();
string? allowedRconPorts = selectedAgent?.AllowedRconPorts?.ToString();
string? allowedServerPorts = selectedAgent?.Configuration.AllowedServerPorts?.ToString();
string? allowedRconPorts = selectedAgent?.Configuration.AllowedRconPorts?.ToString();
}
<div class="col-sm-6 col-xl-2 mb-3">
<FormNumberInput Id="instance-server-port" @bind-Value="form.ServerPort" min="0" max="65535">
@ -141,7 +142,7 @@
<text>RAM</text>
}
else {
<text>RAM &bullet; <code>@(form.MemoryAllocation?.InMegabytes ?? 0) / @(selectedAgent?.MaxMemory.InMegabytes) MB</code></text>
<text>RAM &bullet; <code>@(form.MemoryAllocation?.InMegabytes ?? 0) / @(selectedAgent?.Configuration.MaxMemory.InMegabytes) MB</code></text>
}
</LabelFragment>
</FormNumberInput>
@ -158,18 +159,18 @@
</div>
</div>
<FormButtonSubmit Label="@(EditedInstanceConfiguration == null ? "Create Instance" : "Edit Instance")" class="btn btn-primary" disabled="@(!IsSubmittable)" />
<FormButtonSubmit Label="@(EditedInstance == null ? "Create Instance" : "Edit Instance")" class="btn btn-primary" disabled="@(!IsSubmittable)" />
<FormSubmitError />
</Form>
@code {
[Parameter, EditorRequired]
public InstanceConfiguration? EditedInstanceConfiguration { get; init; }
public Instance? EditedInstance { get; init; }
private ConfigureInstanceFormModel form = null!;
private ImmutableDictionary<Guid, AgentWithStats> allAgentsByGuid = ImmutableDictionary<Guid, AgentWithStats>.Empty;
private ImmutableDictionary<Guid, Agent> allAgentsByGuid = ImmutableDictionary<Guid, Agent>.Empty;
private ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> allAgentJavaRuntimes = ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>.Empty;
private MinecraftVersionType minecraftVersionType = MinecraftVersionType.Release;
@ -197,15 +198,15 @@
}
}
private bool TryGetAgent(Guid? agentGuid, [NotNullWhen(true)] out AgentWithStats? agent) {
private bool TryGetAgent(Guid? agentGuid, [NotNullWhen(true)] out Agent? agent) {
return TryGet(page.allAgentsByGuid, agentGuid, out agent);
}
public AgentWithStats? SelectedAgent => TryGetAgent(SelectedAgentGuid, out var agent) ? agent : null;
public Agent? SelectedAgent => TryGetAgent(SelectedAgentGuid, out var agent) ? agent : null;
public ImmutableArray<TaggedJavaRuntime> JavaRuntimesForSelectedAgent => TryGet(page.allAgentJavaRuntimes, SelectedAgentGuid, out var javaRuntimes) ? javaRuntimes : ImmutableArray<TaggedJavaRuntime>.Empty;
public ushort MaximumMemoryUnits => SelectedAgent?.MaxMemory.RawValue ?? 0;
public ushort MaximumMemoryUnits => SelectedAgent?.Configuration.MaxMemory.RawValue ?? 0;
public ushort AvailableMemoryUnits => Math.Min((SelectedAgent?.AvailableMemory + editedInstanceRamAllocation)?.RawValue ?? MaximumMemoryUnits, MaximumMemoryUnits);
private ushort selectedMemoryUnits = 4;
@ -246,12 +247,12 @@
public sealed class ServerPortMustBeAllowedAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int> {
protected override string FieldName => nameof(ServerPort);
protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.AllowedServerPorts?.Contains((ushort) value) == true;
protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.Configuration.AllowedServerPorts?.Contains((ushort) value) == true;
}
public sealed class RconPortMustBeAllowedAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int> {
protected override string FieldName => nameof(RconPort);
protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.AllowedRconPorts?.Contains((ushort) value) == true;
protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.Configuration.AllowedRconPorts?.Contains((ushort) value) == true;
}
public sealed class RconPortMustDifferFromServerPortAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int?> {
@ -270,7 +271,7 @@
}
protected override void OnInitialized() {
form = new ConfigureInstanceFormModel(this, EditedInstanceConfiguration?.MemoryAllocation);
form = new ConfigureInstanceFormModel(this, EditedInstance?.Configuration.MemoryAllocation);
}
protected override async Task OnInitializedAsync() {
@ -281,18 +282,19 @@
allAgentJavaRuntimes = await agentJavaRuntimesTask;
allMinecraftVersions = await minecraftVersionsTask;
if (EditedInstanceConfiguration != null) {
form.SelectedAgentGuid = EditedInstanceConfiguration.AgentGuid;
form.InstanceName = EditedInstanceConfiguration.InstanceName;
form.ServerPort = EditedInstanceConfiguration.ServerPort;
form.RconPort = EditedInstanceConfiguration.RconPort;
form.MinecraftVersion = EditedInstanceConfiguration.MinecraftVersion;
form.MinecraftServerKind = EditedInstanceConfiguration.MinecraftServerKind;
form.MemoryUnits = EditedInstanceConfiguration.MemoryAllocation.RawValue;
form.JavaRuntimeGuid = EditedInstanceConfiguration.JavaRuntimeGuid;
form.JvmArguments = JvmArgumentsHelper.Join(EditedInstanceConfiguration.JvmArguments);
if (EditedInstance != null) {
var configuration = EditedInstance.Configuration;
form.SelectedAgentGuid = configuration.AgentGuid;
form.InstanceName = configuration.InstanceName;
form.ServerPort = configuration.ServerPort;
form.RconPort = configuration.RconPort;
form.MinecraftVersion = configuration.MinecraftVersion;
form.MinecraftServerKind = configuration.MinecraftServerKind;
form.MemoryUnits = configuration.MemoryAllocation.RawValue;
form.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
form.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
minecraftVersionType = allMinecraftVersions.FirstOrDefault(version => version.Id == EditedInstanceConfiguration.MinecraftVersion)?.Type ?? minecraftVersionType;
minecraftVersionType = allMinecraftVersions.FirstOrDefault(version => version.Id == configuration.MinecraftVersion)?.Type ?? minecraftVersionType;
}
form.EditContext.RevalidateWhenFieldChanges(tracked: nameof(ConfigureInstanceFormModel.SelectedAgentGuid), revalidated: nameof(ConfigureInstanceFormModel.MemoryUnits));
@ -326,10 +328,10 @@
form.SubmitModel.StopSubmitting("You do not have permission to edit instances.");
return;
}
var instance = new InstanceConfiguration(
EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Guid,
EditedInstanceConfiguration?.InstanceGuid ?? Guid.NewGuid(),
var instanceGuid = EditedInstance?.InstanceGuid ?? Guid.NewGuid();
var instanceConfiguration = new InstanceConfiguration(
EditedInstance?.Configuration.AgentGuid ?? selectedAgent.AgentGuid,
form.InstanceName,
(ushort) form.ServerPort,
(ushort) form.RconPort,
@ -340,9 +342,9 @@
JvmArgumentsHelper.Split(form.JvmArguments)
);
var result = await InstanceManager.CreateOrUpdateInstance(loggedInUserGuid.Value, instance, CancellationToken);
var result = await InstanceManager.CreateOrUpdateInstance(loggedInUserGuid.Value, instanceGuid, instanceConfiguration, CancellationToken);
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
await Navigation.NavigateTo("instances/" + instance.InstanceGuid);
await Navigation.NavigateTo("instances/" + instanceGuid);
}
else {
form.SubmitModel.StopSubmitting(result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));

View File

@ -16,7 +16,10 @@
@code {
[Parameter]
[Parameter, EditorRequired]
public Guid AgentGuid { get; set; }
[Parameter, EditorRequired]
public Guid InstanceGuid { get; set; }
[Parameter]
@ -39,7 +42,7 @@
return;
}
var result = await InstanceManager.SendCommandToInstance(loggedInUserGuid.Value, InstanceGuid, form.Command, CancellationToken);
var result = await InstanceManager.SendCommandToInstance(loggedInUserGuid.Value, AgentGuid, InstanceGuid, form.Command, CancellationToken);
if (result.Is(SendCommandToInstanceResult.Success)) {
form.Command = string.Empty;
form.SubmitModel.StopSubmitting();

View File

@ -31,6 +31,9 @@
@code {
[Parameter, EditorRequired]
public Guid AgentGuid { get; init; }
[Parameter, EditorRequired]
public Guid InstanceGuid { get; init; }
@ -56,7 +59,7 @@
return;
}
var result = await InstanceManager.StopInstance(loggedInUserGuid.Value, InstanceGuid, new MinecraftStopStrategy(form.StopInSeconds), CancellationToken);
var result = await InstanceManager.StopInstance(loggedInUserGuid.Value, AgentGuid, InstanceGuid, new MinecraftStopStrategy(form.StopInSeconds), CancellationToken);
if (result.Is(StopInstanceResult.StopInitiated)) {
await Js.InvokeVoidAsync("closeModal", ModalId);
form.SubmitModel.StopSubmitting();