Compare commits

...

2 Commits

Author SHA1 Message Date
chylex 02828a91c6
Migrate event log to Akka.NET 2024-03-23 00:21:39 +01:00
chylex 7cdb0a1910
Migrate RPC message handling to Akka.NET 2024-03-23 00:21:39 +01:00
104 changed files with 1141 additions and 1251 deletions

View File

@ -8,9 +8,9 @@ namespace Phantom.Agent.Rpc;
public sealed class ControllerConnection {
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ControllerConnection));
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly RpcConnectionToServer<IMessageToController> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
Logger.Information("Connection ready.");
}

View File

@ -11,10 +11,10 @@ sealed class KeepAliveLoop {
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(10);
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly CancellationTokenSource cancellationTokenSource = new ();
public KeepAliveLoop(RpcConnectionToServer<IMessageToControllerListener> connection) {
public KeepAliveLoop(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
Task.Run(Run);
}

View File

@ -3,20 +3,21 @@ using NetMQ.Sockets;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Sockets;
using Serilog;
namespace Phantom.Agent.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgent, IMessageToController, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToAgent, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToAgent> handlerActorRef, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, handlerActorRef, disconnectSemaphore, receiveCancellationToken).Launch();
}
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
private RpcClientRuntime(RpcClientSocket<IMessageToAgent, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToAgent> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, handlerActor, disconnectSemaphore, receiveCancellationToken) {}
protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection) {
protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToController> connection) {
var keepAliveLoop = new KeepAliveLoop(connection);
try {
await base.RunWithConnection(socket, connection);

View File

@ -4,27 +4,41 @@ using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Agent.Services.Rpc;
public sealed class MessageListener : IMessageToAgentListener {
private static ILogger Logger { get; } = PhantomLogger.Create<MessageListener>();
public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent> {
private static ILogger Logger { get; } = PhantomLogger.Create<ControllerMessageHandlerActor>();
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
public readonly record struct Init(RpcConnectionToServer<IMessageToController> Connection, AgentServices Agent, CancellationTokenSource ShutdownTokenSource);
public static Props<IMessageToAgent> Factory(Init init) {
return Props<IMessageToAgent>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentServices agent;
private readonly CancellationTokenSource shutdownTokenSource;
public MessageListener(RpcConnectionToServer<IMessageToControllerListener> connection, AgentServices agent, CancellationTokenSource shutdownTokenSource) {
this.connection = connection;
this.agent = agent;
this.shutdownTokenSource = shutdownTokenSource;
private ControllerMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.agent = init.Agent;
this.shutdownTokenSource = init.ShutdownTokenSource;
ReceiveAsync<RegisterAgentSuccessMessage>(HandleRegisterAgentSuccess);
Receive<RegisterAgentFailureMessage>(HandleRegisterAgentFailure);
ReceiveAndReplyLater<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(HandleConfigureInstance);
ReceiveAndReplyLater<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(HandleLaunchInstance);
ReceiveAndReplyLater<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(HandleStopInstance);
ReceiveAndReplyLater<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(HandleSendCommandToInstance);
Receive<ReplyMessage>(HandleReply);
}
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
private async Task HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
Logger.Information("Agent authentication successful.");
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
@ -36,7 +50,7 @@ public sealed class MessageListener : IMessageToAgentListener {
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
if (!result.Is(ConfigureInstanceResult.Success)) {
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
return NoReply.Instance;
return;
}
}
@ -44,11 +58,9 @@ public sealed class MessageListener : IMessageToAgentListener {
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
await agent.InstanceSessionManager.RefreshAgentStatus();
return NoReply.Instance;
}
public Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
private void HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
string errorMessage = message.FailureKind switch {
RegisterAgentFailure.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
RegisterAgentFailure.InvalidToken => "Invalid token.",
@ -59,32 +71,29 @@ public sealed class MessageListener : IMessageToAgentListener {
PhantomLogger.Dispose();
Environment.Exit(1);
return Task.FromResult(NoReply.Instance);
}
private Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
return agent.InstanceSessionManager.Configure(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
private async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
return await HandleConfigureInstance(message, alwaysReportStatus: false);
}
public async Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
private async Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return await agent.InstanceSessionManager.Launch(message.InstanceGuid);
}
public async Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
private async Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return await agent.InstanceSessionManager.Stop(message.InstanceGuid, message.StopStrategy);
}
public async Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
private async Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return await agent.InstanceSessionManager.SendCommand(message.InstanceGuid, message.Command);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -7,6 +7,7 @@ using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
@ -51,15 +52,19 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var rpcConfiguration = new RpcConfiguration("Rpc", controllerHost, controllerPort, controllerCertificate);
var rpcConfiguration = new RpcConfiguration("Agent", controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
await agentServices.Initialize();
using var actorSystem = ActorSystemFactory.Create("Agent");
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
var rpcMessageHandlerActor = actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageHandlerActor, rpcDisconnectSemaphore, shutdownCancellationToken);
try {
await rpcTask.WaitAsync(shutdownCancellationToken);
} finally {

View File

@ -8,10 +8,10 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public static class AgentMessageRegistries {
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToAgent> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToController> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
public static IMessageDefinitions<IMessageToAgent, IMessageToController, ReplyMessage> Definitions { get; } = new MessageDefinitions();
static AgentMessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0);
@ -33,13 +33,9 @@ public static class AgentMessageRegistries {
ToController.Add<ReplyMessage>(127);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
public bool IsRegistrationMessage(Type messageType) {
return messageType == typeof(RegisterAgentMessage);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgent, IMessageToController, ReplyMessage> {
public MessageRegistry<IMessageToAgent> ToClient => ToAgent;
public MessageRegistry<IMessageToController> ToServer => ToController;
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);

View File

@ -7,17 +7,4 @@ namespace Phantom.Common.Messages.Agent.BiDirectional;
public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToController, IMessageToAgent, IReply {
private static readonly MessageQueueKey MessageQueueKey = new ("Reply");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this);
}
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleReply(this);
}
}
) : IMessageToController, IMessageToAgent, IReply;

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {
MessageQueueKey IMessage<IMessageToAgentListener, TReply>.QueueKey => IMessageToAgent.DefaultQueueKey;
}
public interface IMessageToAgent : IMessageToAgent<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
MessageQueueKey IMessage<IMessageToAgentListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToAgent {}

View File

@ -1,16 +0,0 @@
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgentListener {
Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message);
Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message);
Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message);
Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message);
Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message);
Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
}
public interface IMessageToController : IMessageToController<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToController {}

View File

@ -1,17 +0,0 @@
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToControllerListener {
Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message);
Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message);
Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message);
Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message);
Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message);
Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message);
Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message);
Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,6 +1,7 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
@ -10,8 +11,4 @@ public sealed partial record ConfigureInstanceMessage(
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(2)] InstanceLaunchProperties LaunchProperties,
[property: MemoryPackOrder(3)] bool LaunchNow = false
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleConfigureInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;

View File

@ -1,13 +1,10 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleLaunchInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<LaunchInstanceResult>>;

View File

@ -1,14 +1,9 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentFailureMessage(
[property: MemoryPackOrder(0)] RegisterAgentFailure FailureKind
) : IMessageToAgent {
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentFailure(this);
}
}
) : IMessageToAgent;

View File

@ -1,14 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentSuccessMessage(
[property: MemoryPackOrder(0)] ImmutableArray<ConfigureInstanceMessage> InitialInstanceConfigurations
) : IMessageToAgent {
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentSuccess(this);
}
}
) : IMessageToAgent;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] string Command
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleSendCommandToInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;

View File

@ -1,6 +1,7 @@
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
@ -8,8 +9,4 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] MinecraftStopStrategy StopStrategy
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleStopInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<StopInstanceResult>>;

View File

@ -1,15 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Java;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AdvertiseJavaRuntimesMessage(
[property: MemoryPackOrder(0)] ImmutableArray<TaggedJavaRuntime> Runtimes
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleAdvertiseJavaRuntimes(this);
}
}
) : IMessageToController;

View File

@ -1,11 +1,6 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsAliveMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleAgentIsAlive(this);
}
}
public sealed partial record AgentIsAliveMessage : IMessageToController;

View File

@ -1,6 +1,5 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -8,13 +7,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToController {
private static readonly MessageQueueKey MessageQueueKey = new ("Agent.InstanceOutput");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleInstanceOutput(this);
}
}
) : IMessageToController;

View File

@ -1,7 +1,6 @@
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -9,8 +8,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken,
[property: MemoryPackOrder(1)] AgentInfo AgentInfo
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleRegisterAgent(this);
}
}
) : IMessageToController;

View File

@ -1,6 +1,5 @@
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -8,8 +7,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record ReportAgentStatusMessage(
[property: MemoryPackOrder(0)] int RunningInstanceCount,
[property: MemoryPackOrder(1)] RamAllocationUnits RunningInstanceMemory
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportAgentStatus(this);
}
}
) : IMessageToController;

View File

@ -1,6 +1,5 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -10,8 +9,4 @@ public sealed partial record ReportInstanceEventMessage(
[property: MemoryPackOrder(1)] DateTime UtcTime,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] IInstanceEvent Event
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportInstanceEvent(this);
}
}
) : IMessageToController;

View File

@ -1,6 +1,5 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -8,8 +7,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record ReportInstanceStatusMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] IInstanceStatus InstanceStatus
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportInstanceStatus(this);
}
}
) : IMessageToController;

View File

@ -1,11 +1,6 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterAgentMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterAgent(this);
}
}
public sealed partial record UnregisterAgentMessage : IMessageToController;

View File

@ -7,12 +7,4 @@ namespace Phantom.Common.Messages.Web.BiDirectional;
public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToController, IMessageToWeb, IReply {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this);
}
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleReply(this);
}
}
) : IMessageToController, IMessageToWeb, IReply;

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
namespace Phantom.Common.Messages.Web;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
}
public interface IMessageToController : IMessageToController<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToController {}

View File

@ -1,35 +0,0 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToControllerListener {
Task<NoReply> HandleRegisterWeb(RegisterWebMessage message);
Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message);
Task<LogInSuccess?> HandleLogIn(LogInMessage message);
Task<CreateOrUpdateAdministratorUserResult> HandleCreateOrUpdateAdministratorUser(CreateOrUpdateAdministratorUserMessage message);
Task<CreateUserResult> HandleCreateUser(CreateUserMessage message);
Task<ImmutableArray<UserInfo>> HandleGetUsers(GetUsersMessage message);
Task<ImmutableArray<RoleInfo>> HandleGetRoles(GetRolesMessage message);
Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> HandleGetUserRoles(GetUserRolesMessage message);
Task<ChangeUserRolesResult> HandleChangeUserRoles(ChangeUserRolesMessage message);
Task<DeleteUserResult> HandleDeleteUser(DeleteUserMessage message);
Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message);
Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message);
Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message);
Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message);
Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message);
Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message);
Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message);
Task<ImmutableArray<EventLogItem>> HandleGetEventLog(GetEventLogMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
namespace Phantom.Common.Messages.Web;
public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {
MessageQueueKey IMessage<IMessageToWebListener, TReply>.QueueKey => IMessageToWeb.DefaultQueueKey;
}
public interface IMessageToWeb : IMessageToWeb<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
MessageQueueKey IMessage<IMessageToWebListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToWeb {}

View File

@ -1,13 +0,0 @@
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToWebListener {
Task<NoReply> HandleRegisterWebResult(RegisterWebResultMessage message);
Task<NoReply> HandleRefreshAgents(RefreshAgentsMessage message);
Task<NoReply> HandleRefreshInstances(RefreshInstancesMessage message);
Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,6 +1,7 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -10,8 +11,4 @@ public sealed partial record ChangeUserRolesMessage(
[property: MemoryPackOrder(1)] Guid SubjectUserGuid,
[property: MemoryPackOrder(2)] ImmutableHashSet<Guid> AddToRoleGuids,
[property: MemoryPackOrder(3)] ImmutableHashSet<Guid> RemoveFromRoleGuids
) : IMessageToController<ChangeUserRolesResult> {
public Task<ChangeUserRolesResult> Accept(IMessageToControllerListener listener) {
return listener.HandleChangeUserRoles(this);
}
}
) : IMessageToController, ICanReply<ChangeUserRolesResult>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Web.ToController;
public sealed partial record CreateOrUpdateAdministratorUserMessage(
[property: MemoryPackOrder(0)] string Username,
[property: MemoryPackOrder(1)] string Password
) : IMessageToController<CreateOrUpdateAdministratorUserResult> {
public Task<CreateOrUpdateAdministratorUserResult> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateOrUpdateAdministratorUser(this);
}
}
) : IMessageToController, ICanReply<CreateOrUpdateAdministratorUserResult>;

View File

@ -2,6 +2,7 @@
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -10,8 +11,4 @@ public sealed partial record CreateOrUpdateInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] InstanceConfiguration Configuration
) : IMessageToController<InstanceActionResult<CreateOrUpdateInstanceResult>> {
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateOrUpdateInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -8,8 +9,4 @@ public sealed partial record CreateUserMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] string Username,
[property: MemoryPackOrder(2)] string Password
) : IMessageToController<CreateUserResult> {
public Task<CreateUserResult> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateUser(this);
}
}
) : IMessageToController, ICanReply<CreateUserResult>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Web.ToController;
public sealed partial record DeleteUserMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid SubjectUserGuid
) : IMessageToController<DeleteUserResult> {
public Task<DeleteUserResult> Accept(IMessageToControllerListener listener) {
return listener.HandleDeleteUser(this);
}
}
) : IMessageToController, ICanReply<DeleteUserResult>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Java;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetAgentJavaRuntimesMessage : IMessageToController<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> {
public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetAgentJavaRuntimes(this);
}
}
public sealed partial record GetAgentJavaRuntimesMessage : IMessageToController, ICanReply<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>;

View File

@ -1,14 +1,11 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetAuditLogMessage(
[property: MemoryPackOrder(0)] int Count
) : IMessageToController<ImmutableArray<AuditLogItem>> {
public Task<ImmutableArray<AuditLogItem>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetAuditLog(this);
}
}
) : IMessageToController, ICanReply<ImmutableArray<AuditLogItem>>;

View File

@ -1,14 +1,11 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetEventLogMessage(
[property: MemoryPackOrder(0)] int Count
) : IMessageToController<ImmutableArray<EventLogItem>> {
public Task<ImmutableArray<EventLogItem>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetEventLog(this);
}
}
) : IMessageToController, ICanReply<ImmutableArray<EventLogItem>>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetMinecraftVersionsMessage : IMessageToController<ImmutableArray<MinecraftVersion>> {
public Task<ImmutableArray<MinecraftVersion>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetMinecraftVersions(this);
}
}
public sealed partial record GetMinecraftVersionsMessage : IMessageToController, ICanReply<ImmutableArray<MinecraftVersion>>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetRolesMessage : IMessageToController<ImmutableArray<RoleInfo>> {
public Task<ImmutableArray<RoleInfo>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetRoles(this);
}
}
public sealed partial record GetRolesMessage : IMessageToController, ICanReply<ImmutableArray<RoleInfo>>;

View File

@ -1,13 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetUserRolesMessage(
[property: MemoryPackOrder(0)] ImmutableHashSet<Guid> UserGuids
) : IMessageToController<ImmutableDictionary<Guid, ImmutableArray<Guid>>> {
public Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetUserRoles(this);
}
}
) : IMessageToController, ICanReply<ImmutableDictionary<Guid, ImmutableArray<Guid>>>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetUsersMessage : IMessageToController<ImmutableArray<UserInfo>> {
public Task<ImmutableArray<UserInfo>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetUsers(this);
}
}
public sealed partial record GetUsersMessage : IMessageToController, ICanReply<ImmutableArray<UserInfo>>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -8,8 +9,4 @@ public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid
) : IMessageToController<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleLaunchInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<LaunchInstanceResult>>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Web.ToController;
public sealed partial record LogInMessage(
[property: MemoryPackOrder(0)] string Username,
[property: MemoryPackOrder(1)] string Password
) : IMessageToController<LogInSuccess?> {
public Task<LogInSuccess?> Accept(IMessageToControllerListener listener) {
return listener.HandleLogIn(this);
}
}
) : IMessageToController, ICanReply<LogInSuccess?>;

View File

@ -1,14 +1,9 @@
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterWebMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleRegisterWeb(this);
}
}
) : IMessageToController;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -9,8 +10,4 @@ public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] string Command
) : IMessageToController<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleSendCommandToInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;

View File

@ -1,6 +1,7 @@
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -10,8 +11,4 @@ public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] MinecraftStopStrategy StopStrategy
) : IMessageToController<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleStopInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<StopInstanceResult>>;

View File

@ -1,11 +1,6 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterWebMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterWeb(this);
}
}
public sealed partial record UnregisterWebMessage : IMessageToController;

View File

@ -1,6 +1,5 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
@ -8,13 +7,4 @@ namespace Phantom.Common.Messages.Web.ToWeb;
public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToWeb {
private static readonly MessageQueueKey MessageQueueKey = new ("Web.InstanceOutput");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleInstanceOutput(this);
}
}
) : IMessageToWeb;

View File

@ -1,15 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Agent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RefreshAgentsMessage(
[property: MemoryPackOrder(0)] ImmutableArray<Agent> Agents
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRefreshAgents(this);
}
}
) : IMessageToWeb;

View File

@ -1,15 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RefreshInstancesMessage(
[property: MemoryPackOrder(0)] ImmutableArray<Instance> Instances
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRefreshInstances(this);
}
}
) : IMessageToWeb;

View File

@ -1,13 +1,8 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterWebResultMessage(
[property: MemoryPackOrder(0)] bool Success
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRegisterWebResult(this);
}
}
) : IMessageToWeb;

View File

@ -15,10 +15,10 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public static class WebMessageRegistries {
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static MessageRegistry<IMessageToController> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToWeb> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
public static IMessageDefinitions<IMessageToWeb, IMessageToController, ReplyMessage> Definitions { get; } = new MessageDefinitions();
static WebMessageRegistries() {
ToController.Add<RegisterWebMessage>(0);
@ -48,13 +48,9 @@ public static class WebMessageRegistries {
ToWeb.Add<ReplyMessage>(127);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> {
public MessageRegistry<IMessageToWebListener> ToClient => ToWeb;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
public bool IsRegistrationMessage(Type messageType) {
return messageType == typeof(RegisterWebMessage);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWeb, IMessageToController, ReplyMessage> {
public MessageRegistry<IMessageToWeb> ToClient => ToWeb;
public MessageRegistry<IMessageToController> ToServer => ToController;
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);

View File

@ -169,9 +169,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private sealed record InitializeCommand : ICommand;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgent> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgent> Connection) : ICommand;
private sealed record RefreshConnectionStatusCommand : ICommand;

View File

@ -1,4 +1,5 @@
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
@ -11,14 +12,14 @@ sealed class AgentConnection {
private readonly Guid agentGuid;
private string agentName;
private RpcConnectionToClient<IMessageToAgentListener>? connection;
private RpcConnectionToClient<IMessageToAgent>? connection;
public AgentConnection(Guid agentGuid, string agentName) {
this.agentName = agentName;
this.agentGuid = agentGuid;
}
public void UpdateConnection(RpcConnectionToClient<IMessageToAgentListener> newConnection, string newAgentName) {
public void UpdateConnection(RpcConnectionToClient<IMessageToAgent> newConnection, string newAgentName) {
lock (this) {
connection?.Close();
connection = newConnection;
@ -26,7 +27,7 @@ sealed class AgentConnection {
}
}
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgentListener> expected) {
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgent> expected) {
lock (this) {
if (connection != null && connection.IsSame(expected)) {
connection.Close();
@ -48,7 +49,7 @@ sealed class AgentConnection {
}
}
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent, ICanReply<TReply> where TReply : class {
lock (this) {
if (connection == null) {
LogAgentOffline();

View File

@ -38,7 +38,7 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
private sealed record FlushChangesCommand : ICommand;
private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) {
this.configurationToStore = command.Configuration;
configurationToStore = command.Configuration;
ScheduleFlush(TimeSpan.FromSeconds(2));
}
@ -72,11 +72,9 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
}
private void ScheduleFlush(TimeSpan delay) {
if (hasScheduledFlush) {
return;
if (!hasScheduledFlush) {
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}

View File

@ -18,7 +18,7 @@ namespace Phantom.Controller.Services.Agents;
sealed class AgentManager {
private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>();
private readonly ActorSystem actorSystem;
private readonly IActorRefFactory actorSystem;
private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
@ -28,7 +28,7 @@ sealed class AgentManager {
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
public AgentManager(ActorSystem actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
public AgentManager(IActorRefFactory actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.actorSystem = actorSystem;
this.authToken = authToken;
this.controllerState = controllerState;
@ -58,15 +58,15 @@ sealed class AgentManager {
}
}
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgentListener> connection) {
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgent> connection) {
if (!this.authToken.FixedTimeEquals(authToken)) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
return false;
}
var agentProperties = AgentConfiguration.From(agentInfo);
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var configureInstanceMessages = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
var agentActor = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var configureInstanceMessages = await agentActor.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
return true;

View File

@ -1,7 +1,9 @@
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
@ -10,14 +12,15 @@ using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;
namespace Phantom.Controller.Services;
public sealed class ControllerServices : IAsyncDisposable {
private TaskManager TaskManager { get; }
public sealed class ControllerServices : IDisposable {
public ActorSystem ActorSystem { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; }
@ -32,24 +35,23 @@ public sealed class ControllerServices : IAsyncDisposable {
private UserRoleManager UserRoleManager { get; }
private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
public IRegistrationHandler<IMessageToAgent, IMessageFromAgentToController, RegisterAgentMessage> AgentRegistrationHandler { get; }
public IRegistrationHandler<IMessageToWeb, IMessageFromWebToController, RegisterWebMessage> WebRegistrationHandler { get; }
private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken;
public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, AuthToken webAuthToken, CancellationToken shutdownCancellationToken) {
this.dbProvider = dbProvider;
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller");
this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider);
@ -59,15 +61,10 @@ public sealed class ControllerServices : IAsyncDisposable {
this.UserRoleManager = new UserRoleManager(dbProvider);
this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager);
this.AuditLogManager = new AuditLogManager(dbProvider);
this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken);
}
public AgentMessageListener CreateAgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection) {
return new AgentMessageListener(connection, AgentManager, InstanceLogManager, EventLogManager, cancellationToken);
}
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
this.EventLogManager = new EventLogManager(ActorSystem, dbProvider, shutdownCancellationToken);
this.AgentRegistrationHandler = new AgentRegistrationHandler(AgentManager, InstanceLogManager, EventLogManager);
this.WebRegistrationHandler = new WebRegistrationHandler(webAuthToken, ControllerState, InstanceLogManager, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, MinecraftVersions, EventLogManager);
}
public async Task Initialize() {
@ -77,8 +74,7 @@ public sealed class ControllerServices : IAsyncDisposable {
await RoleManager.Initialize();
}
public async ValueTask DisposeAsync() {
await actorSystem.Terminate();
actorSystem.Dispose();
public void Dispose() {
ActorSystem.Dispose();
}
}

View File

@ -0,0 +1,77 @@
using Phantom.Common.Data.Web.EventLog;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Events;
sealed class EventLogDatabaseStorageActor : ReceiveActor<EventLogDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<EventLogDatabaseStorageActor>();
public readonly record struct Init(IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new EventLogDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly LinkedList<StoreEventCommand> pendingCommands = new ();
private bool hasScheduledFlush = false;
private EventLogDatabaseStorageActor(Init init) {
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<StoreEventCommand>(StoreEvent);
ReceiveAsync<FlushChangesCommand>(FlushChanges);
}
public interface ICommand {}
public sealed record StoreEventCommand(Guid EventGuid, DateTime UtcTime, Guid? AgentGuid, EventLogEventType EventType, string SubjectId, Dictionary<string, object?>? Extra = null) : ICommand;
private sealed record FlushChangesCommand : ICommand;
private void StoreEvent(StoreEventCommand command) {
pendingCommands.AddLast(command);
ScheduleFlush(TimeSpan.FromMilliseconds(500));
}
private async Task FlushChanges(FlushChangesCommand command) {
hasScheduledFlush = false;
if (pendingCommands.Count == 0) {
return;
}
try {
await using var db = dbProvider.Lazy();
var eventLogRepository = new EventLogRepository(db);
foreach (var (eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra) in pendingCommands) {
eventLogRepository.AddItem(eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
} catch (Exception e) {
ScheduleFlush(TimeSpan.FromSeconds(10));
Logger.Error(e, "Could not store {EventCount} event(s) in database.", pendingCommands.Count);
return;
}
Logger.Information("Stored {EventCount} event(s) in database.", pendingCommands.Count);
pendingCommands.Clear();
}
private void ScheduleFlush(TimeSpan delay) {
if (!hasScheduledFlush) {
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}
}

View File

@ -1,32 +1,27 @@
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Tasks;
using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Events;
sealed partial class EventLogManager {
private readonly ActorRef<EventLogDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly IDbContextProvider dbProvider;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
public EventLogManager(IDbContextProvider dbProvider, TaskManager taskManager, CancellationToken cancellationToken) {
public EventLogManager(IActorRefFactory actorSystem, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.databaseStorageActor = actorSystem.ActorOf(EventLogDatabaseStorageActor.Factory(new EventLogDatabaseStorageActor.Init(dbProvider, cancellationToken)), "EventLogDatabaseStorage");
this.dbProvider = dbProvider;
this.taskManager = taskManager;
this.cancellationToken = cancellationToken;
}
public void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
taskManager.Run("Store event log item to database", () => AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra));
private void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
databaseStorageActor.Tell(new EventLogDatabaseStorageActor.StoreEventCommand(eventGuid, utcTime, agentGuid, eventType, subjectId, extra));
}
public async Task AddItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
await using var db = dbProvider.Lazy();
new EventLogRepository(db).AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
public async Task<ImmutableArray<EventLogItem>> GetMostRecentItems(int count) {
await using var db = dbProvider.Lazy();
return await new EventLogRepository(db).GetMostRecentItems(count, cancellationToken);

View File

@ -11,13 +11,13 @@ using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActor, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly ActorRef<AgentActor.ICommand> agentActor;
private readonly AgentConnection agentConnection;
private readonly CancellationToken cancellationToken;
@ -30,7 +30,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
private readonly ActorRef<InstanceDatabaseStorageActor.ICommand> databaseStorageActor;
private InstanceActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentActor = init.AgentActor;
this.agentConnection = init.AgentConnection;
this.cancellationToken = init.CancellationToken;
@ -46,7 +46,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
private void NotifyInstanceUpdated() {
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
agentActor.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
}
private void SetLaunchAutomatically(bool newValue) {
@ -56,7 +56,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent, ICanReply<InstanceActionResult<TReply>> {
var reply = await agentConnection.Send<TMessage, InstanceActionResult<TReply>>(message, TimeSpan.FromSeconds(10), cancellationToken);
return reply.DidNotReplyIfNull();
}

View File

@ -6,7 +6,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" />
<PackageReference Include="BCrypt.Net-Next.StrongName" />
</ItemGroup>

View File

@ -0,0 +1,91 @@
using Akka.Actor;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
public readonly record struct Init(Guid AgentGuid, RpcConnectionToClient<IMessageToAgent> Connection, AgentRegistrationHandler AgentRegistrationHandler, AgentManager AgentManager, InstanceLogManager InstanceLogManager, EventLogManager EventLogManager);
public static Props<IMessageToController> Factory(Init init) {
return Props<IMessageToController>.Create(() => new AgentMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
public IStash Stash { get; set; } = null!;
private readonly Guid agentGuid;
private readonly RpcConnectionToClient<IMessageToAgent> connection;
private readonly AgentRegistrationHandler agentRegistrationHandler;
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
private AgentMessageHandlerActor(Init init) {
this.agentGuid = init.AgentGuid;
this.connection = init.Connection;
this.agentRegistrationHandler = init.AgentRegistrationHandler;
this.agentManager = init.AgentManager;
this.instanceLogManager = init.InstanceLogManager;
this.eventLogManager = init.EventLogManager;
ReceiveAsync<RegisterAgentMessage>(HandleRegisterAgent);
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
Receive<AgentIsAliveMessage>(HandleAgentIsAlive);
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
Receive<ReportAgentStatusMessage>(HandleReportAgentStatus);
Receive<ReportInstanceStatusMessage>(HandleReportInstanceStatus);
Receive<ReportInstanceEventMessage>(HandleReportInstanceEvent);
Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<ReplyMessage>(HandleReply);
}
private async Task HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuid != message.AgentInfo.AgentGuid) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
}
else {
await agentRegistrationHandler.TryRegisterImpl(connection, message);
}
}
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UnregisterCommand(connection));
connection.Close();
}
private void HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.NotifyIsAliveCommand());
}
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
}
private void HandleReportAgentStatus(ReportAgentStatusMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
}
private void HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
}
private void HandleReportInstanceEvent(ReportInstanceEventMessage message) {
message.Event.Accept(eventLogManager.CreateInstanceEventVisitor(message.EventGuid, message.UtcTime, agentGuid, message.InstanceGuid));
}
private void HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.ReceiveLines(message.InstanceGuid, message.Lines);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@ -1,92 +0,0 @@
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services.Rpc;
public sealed class AgentMessageListener : IMessageToControllerListener {
private readonly RpcConnectionToClient<IMessageToAgentListener> connection;
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
private readonly CancellationToken cancellationToken;
private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>();
internal AgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection, AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager, CancellationToken cancellationToken) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceLogManager = instanceLogManager;
this.eventLogManager = eventLogManager;
this.cancellationToken = cancellationToken;
}
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.AgentGuid) {
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
}
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection)) {
connection.SetAuthorizationResult(true);
agentGuidWaiter.SetResult(message.AgentInfo.AgentGuid);
}
return NoReply.Instance;
}
private async Task<Guid> WaitForAgentGuid() {
return await agentGuidWaiter.Task.WaitAsync(cancellationToken);
}
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted) {
agentManager.TellAgent(agentGuidWaiter.Task.Result, new AgentActor.UnregisterCommand(connection));
}
connection.Close();
return Task.FromResult(NoReply.Instance);
}
public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.NotifyIsAliveCommand());
return NoReply.Instance;
}
public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message) {
message.Event.Accept(eventLogManager.CreateInstanceEventVisitor(message.EventGuid, message.UtcTime, await WaitForAgentGuid(), message.InstanceGuid));
return NoReply.Instance;
}
public Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.ReceiveLines(message.InstanceGuid, message.Lines);
return Task.FromResult(NoReply.Instance);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -0,0 +1,34 @@
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class AgentRegistrationHandler : IRegistrationHandler<IMessageToAgent, IMessageToController, RegisterAgentMessage> {
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
public AgentRegistrationHandler(AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager) {
this.agentManager = agentManager;
this.instanceLogManager = instanceLogManager;
this.eventLogManager = eventLogManager;
}
async Task<Props<IMessageToController>?> IRegistrationHandler<IMessageToAgent, IMessageToController, RegisterAgentMessage>.TryRegister(RpcConnectionToClient<IMessageToAgent> connection, RegisterAgentMessage message) {
return await TryRegisterImpl(connection, message) ? CreateMessageHandlerActorProps(message.AgentInfo.AgentGuid, connection) : null;
}
public Task<bool> TryRegisterImpl(RpcConnectionToClient<IMessageToAgent> connection, RegisterAgentMessage message) {
return agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection);
}
private Props<IMessageToController> CreateMessageHandlerActorProps(Guid agentGuid, RpcConnectionToClient<IMessageToAgent> connection) {
var init = new AgentMessageHandlerActor.Init(agentGuid, connection, this, agentManager, instanceLogManager, eventLogManager);
return AgentMessageHandlerActor.Factory(init);
}
}

View File

@ -0,0 +1,72 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class WebMessageDataUpdateSenderActor : ReceiveActor<WebMessageDataUpdateSenderActor.ICommand> {
public readonly record struct Init(RpcConnectionToClient<IMessageToWeb> Connection, ControllerState ControllerState, InstanceLogManager InstanceLogManager);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new WebMessageDataUpdateSenderActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToClient<IMessageToWeb> connection;
private readonly ControllerState controllerState;
private readonly InstanceLogManager instanceLogManager;
private readonly ActorRef<ICommand> selfCached;
private WebMessageDataUpdateSenderActor(Init init) {
this.connection = init.Connection;
this.controllerState = init.ControllerState;
this.instanceLogManager = init.InstanceLogManager;
this.selfCached = SelfTyped;
ReceiveAsync<RefreshAgentsCommand>(RefreshAgents);
ReceiveAsync<RefreshInstancesCommand>(RefreshInstances);
ReceiveAsync<ReceiveInstanceLogsCommand>(ReceiveInstanceLogs);
}
protected override void PreStart() {
controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
instanceLogManager.LogsReceived += OnInstanceLogsReceived;
}
protected override void PostStop() {
instanceLogManager.LogsReceived -= OnInstanceLogsReceived;
controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
}
private void OnInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
selfCached.Tell(new ReceiveInstanceLogsCommand(e.InstanceGuid, e.Lines));
}
public interface ICommand {}
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
private sealed record ReceiveInstanceLogsCommand(Guid InstanceGuid, ImmutableArray<string> Lines) : ICommand;
private Task RefreshAgents(RefreshAgentsCommand command) {
return connection.Send(new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray()));
}
private Task RefreshInstances(RefreshInstancesCommand command) {
return connection.Send(new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray()));
}
private Task ReceiveInstanceLogs(ReceiveInstanceLogsCommand command) {
return connection.Send(new InstanceOutputMessage(command.InstanceGuid, command.Lines));
}
}

View File

@ -0,0 +1,166 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
public readonly record struct Init(
RpcConnectionToClient<IMessageToWeb> Connection,
WebRegistrationHandler WebRegistrationHandler,
ControllerState ControllerState,
InstanceLogManager InstanceLogManager,
UserManager UserManager,
RoleManager RoleManager,
UserRoleManager UserRoleManager,
UserLoginManager UserLoginManager,
AuditLogManager AuditLogManager,
AgentManager AgentManager,
MinecraftVersions MinecraftVersions,
EventLogManager EventLogManager
);
public static Props<IMessageToController> Factory(Init init) {
return Props<IMessageToController>.Create(() => new WebMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToClient<IMessageToWeb> connection;
private readonly WebRegistrationHandler webRegistrationHandler;
private readonly ControllerState controllerState;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
private WebMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.webRegistrationHandler = init.WebRegistrationHandler;
this.controllerState = init.ControllerState;
this.userManager = init.UserManager;
this.roleManager = init.RoleManager;
this.userRoleManager = init.UserRoleManager;
this.userLoginManager = init.UserLoginManager;
this.auditLogManager = init.AuditLogManager;
this.agentManager = init.AgentManager;
this.minecraftVersions = init.MinecraftVersions;
this.eventLogManager = init.EventLogManager;
var senderActorInit = new WebMessageDataUpdateSenderActor.Init(connection, controllerState, init.InstanceLogManager);
Context.ActorOf(WebMessageDataUpdateSenderActor.Factory(senderActorInit), "DataUpdateSender");
ReceiveAsync<RegisterWebMessage>(HandleRegisterWeb);
Receive<UnregisterWebMessage>(HandleUnregisterWeb);
ReceiveAndReplyLater<CreateOrUpdateAdministratorUserMessage, CreateOrUpdateAdministratorUserResult>(HandleCreateOrUpdateAdministratorUser);
ReceiveAndReplyLater<CreateUserMessage, CreateUserResult>(HandleCreateUser);
ReceiveAndReplyLater<GetUsersMessage, ImmutableArray<UserInfo>>(HandleGetUsers);
ReceiveAndReplyLater<GetRolesMessage, ImmutableArray<RoleInfo>>(HandleGetRoles);
ReceiveAndReplyLater<GetUserRolesMessage, ImmutableDictionary<Guid, ImmutableArray<Guid>>>(HandleGetUserRoles);
ReceiveAndReplyLater<ChangeUserRolesMessage, ChangeUserRolesResult>(HandleChangeUserRoles);
ReceiveAndReplyLater<DeleteUserMessage, DeleteUserResult>(HandleDeleteUser);
ReceiveAndReplyLater<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(HandleCreateOrUpdateInstance);
ReceiveAndReplyLater<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(HandleLaunchInstance);
ReceiveAndReplyLater<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(HandleStopInstance);
ReceiveAndReplyLater<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(HandleSendCommandToInstance);
ReceiveAndReplyLater<GetMinecraftVersionsMessage, ImmutableArray<MinecraftVersion>>(HandleGetMinecraftVersions);
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
ReceiveAndReplyLater<GetAuditLogMessage, ImmutableArray<AuditLogItem>>(HandleGetAuditLog);
ReceiveAndReplyLater<GetEventLogMessage, ImmutableArray<EventLogItem>>(HandleGetEventLog);
ReceiveAndReplyLater<LogInMessage, LogInSuccess?>(HandleLogIn);
Receive<ReplyMessage>(HandleReply);
}
private async Task HandleRegisterWeb(RegisterWebMessage message) {
await webRegistrationHandler.TryRegisterImpl(connection, message);
}
private void HandleUnregisterWeb(UnregisterWebMessage message) {
connection.Close();
}
private Task<CreateOrUpdateAdministratorUserResult> HandleCreateOrUpdateAdministratorUser(CreateOrUpdateAdministratorUserMessage message) {
return userManager.CreateOrUpdateAdministrator(message.Username, message.Password);
}
private Task<CreateUserResult> HandleCreateUser(CreateUserMessage message) {
return userManager.Create(message.LoggedInUserGuid, message.Username, message.Password);
}
private Task<ImmutableArray<UserInfo>> HandleGetUsers(GetUsersMessage message) {
return userManager.GetAll();
}
private Task<ImmutableArray<RoleInfo>> HandleGetRoles(GetRolesMessage message) {
return roleManager.GetAll();
}
private Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> HandleGetUserRoles(GetUserRolesMessage message) {
return userRoleManager.GetUserRoles(message.UserGuids);
}
private Task<ChangeUserRolesResult> HandleChangeUserRoles(ChangeUserRolesMessage message) {
return userRoleManager.ChangeUserRoles(message.LoggedInUserGuid, message.SubjectUserGuid, message.AddToRoleGuids, message.RemoveFromRoleGuids);
}
private Task<DeleteUserResult> HandleDeleteUser(DeleteUserMessage message) {
return userManager.DeleteByGuid(message.LoggedInUserGuid, message.SubjectUserGuid);
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
}
private Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
}
private Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
return minecraftVersions.GetVersions(CancellationToken.None);
}
private ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) {
return controllerState.AgentJavaRuntimesByGuid;
}
private Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) {
return auditLogManager.GetMostRecentItems(message.Count);
}
private Task<ImmutableArray<EventLogItem>> HandleGetEventLog(GetEventLogMessage message) {
return eventLogManager.GetMostRecentItems(message.Count);
}
private Task<LogInSuccess?> HandleLogIn(LogInMessage message) {
return userLoginManager.LogIn(message.Username, message.Password);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@ -1,229 +0,0 @@
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
using Agent = Phantom.Common.Data.Web.Agent.Agent;
namespace Phantom.Controller.Services.Rpc;
public sealed class WebMessageListener : IMessageToControllerListener {
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
private static int listenerSequenceId = 0;
private readonly ActorRef<ICommand> actor;
private readonly RpcConnectionToClient<IMessageToWebListener> connection;
private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
internal WebMessageListener(
IActorRefFactory actorSystem,
RpcConnectionToClient<IMessageToWebListener> connection,
AuthToken authToken,
ControllerState controllerState,
UserManager userManager,
RoleManager roleManager,
UserRoleManager userRoleManager,
UserLoginManager userLoginManager,
AuditLogManager auditLogManager,
AgentManager agentManager,
InstanceLogManager instanceLogManager,
MinecraftVersions minecraftVersions,
EventLogManager eventLogManager
) {
this.actor = actorSystem.ActorOf(Actor.Factory(this), "Web-" + Interlocked.Increment(ref listenerSequenceId));
this.connection = connection;
this.authToken = authToken;
this.controllerState = controllerState;
this.userManager = userManager;
this.roleManager = roleManager;
this.userRoleManager = userRoleManager;
this.userLoginManager = userLoginManager;
this.auditLogManager = auditLogManager;
this.agentManager = agentManager;
this.instanceLogManager = instanceLogManager;
this.minecraftVersions = minecraftVersions;
this.eventLogManager = eventLogManager;
}
private sealed class Actor : ReceiveActor<ICommand> {
public static Props<ICommand> Factory(WebMessageListener listener) {
return Props<ICommand>.Create(() => new Actor(listener), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly WebMessageListener listener;
private Actor(WebMessageListener listener) {
this.listener = listener;
Receive<StartConnectionCommand>(StartConnection);
Receive<StopConnectionCommand>(StopConnection);
Receive<RefreshAgentsCommand>(RefreshAgents);
Receive<RefreshInstancesCommand>(RefreshInstances);
}
private void StartConnection(StartConnectionCommand command) {
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
}
private void StopConnection(StopConnectionCommand command) {
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
}
private void RefreshAgents(RefreshAgentsCommand command) {
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void RefreshInstances(RefreshInstancesCommand command) {
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
}
}
private interface ICommand {}
private sealed record StartConnectionCommand : ICommand;
private sealed record StopConnectionCommand : ICommand;
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
if (authToken.FixedTimeEquals(message.AuthToken)) {
Logger.Information("Web authorized successfully.");
connection.SetAuthorizationResult(true);
await connection.Send(new RegisterWebResultMessage(true));
}
else {
Logger.Warning("Web failed to authorize, invalid token.");
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterWebResultMessage(false));
}
if (!connection.IsClosed) {
actor.Tell(new StartConnectionCommand());
}
return NoReply.Instance;
}
public Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message) {
if (!connection.IsClosed) {
connection.Close();
actor.Tell(new StopConnectionCommand());
}
return Task.FromResult(NoReply.Instance);
}
public Task<CreateOrUpdateAdministratorUserResult> HandleCreateOrUpdateAdministratorUser(CreateOrUpdateAdministratorUserMessage message) {
return userManager.CreateOrUpdateAdministrator(message.Username, message.Password);
}
public Task<CreateUserResult> HandleCreateUser(CreateUserMessage message) {
return userManager.Create(message.LoggedInUserGuid, message.Username, message.Password);
}
public Task<ImmutableArray<UserInfo>> HandleGetUsers(GetUsersMessage message) {
return userManager.GetAll();
}
public Task<ImmutableArray<RoleInfo>> HandleGetRoles(GetRolesMessage message) {
return roleManager.GetAll();
}
public Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> HandleGetUserRoles(GetUserRolesMessage message) {
return userRoleManager.GetUserRoles(message.UserGuids);
}
public Task<ChangeUserRolesResult> HandleChangeUserRoles(ChangeUserRolesMessage message) {
return userRoleManager.ChangeUserRoles(message.LoggedInUserGuid, message.SubjectUserGuid, message.AddToRoleGuids, message.RemoveFromRoleGuids);
}
public Task<DeleteUserResult> HandleDeleteUser(DeleteUserMessage message) {
return userManager.DeleteByGuid(message.LoggedInUserGuid, message.SubjectUserGuid);
}
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
}
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
}
public Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
}
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
}
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
return minecraftVersions.GetVersions(CancellationToken.None);
}
public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) {
return Task.FromResult(controllerState.AgentJavaRuntimesByGuid);
}
public Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) {
return auditLogManager.GetMostRecentItems(message.Count);
}
public Task<ImmutableArray<EventLogItem>> HandleGetEventLog(GetEventLogMessage message) {
return eventLogManager.GetMostRecentItems(message.Count);
}
public Task<LogInSuccess?> HandleLogIn(LogInMessage message) {
return userLoginManager.LogIn(message.Username, message.Password);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -0,0 +1,67 @@
using Phantom.Common.Data;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Controller.Services.Rpc;
sealed class WebRegistrationHandler : IRegistrationHandler<IMessageToWeb, IMessageToController, RegisterWebMessage> {
private static readonly ILogger Logger = PhantomLogger.Create<WebRegistrationHandler>();
private readonly AuthToken webAuthToken;
private readonly ControllerState controllerState;
private readonly InstanceLogManager instanceLogManager;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
public WebRegistrationHandler(AuthToken webAuthToken, ControllerState controllerState, InstanceLogManager instanceLogManager, UserManager userManager, RoleManager roleManager, UserRoleManager userRoleManager, UserLoginManager userLoginManager, AuditLogManager auditLogManager, AgentManager agentManager, MinecraftVersions minecraftVersions, EventLogManager eventLogManager) {
this.webAuthToken = webAuthToken;
this.controllerState = controllerState;
this.userManager = userManager;
this.roleManager = roleManager;
this.userRoleManager = userRoleManager;
this.userLoginManager = userLoginManager;
this.auditLogManager = auditLogManager;
this.agentManager = agentManager;
this.minecraftVersions = minecraftVersions;
this.eventLogManager = eventLogManager;
this.instanceLogManager = instanceLogManager;
}
async Task<Props<IMessageToController>?> IRegistrationHandler<IMessageToWeb, IMessageToController, RegisterWebMessage>.TryRegister(RpcConnectionToClient<IMessageToWeb> connection, RegisterWebMessage message) {
return await TryRegisterImpl(connection, message) ? CreateMessageHandlerActorProps(connection) : null;
}
public async Task<bool> TryRegisterImpl(RpcConnectionToClient<IMessageToWeb> connection, RegisterWebMessage message) {
if (webAuthToken.FixedTimeEquals(message.AuthToken)) {
Logger.Information("Web authorized successfully.");
await connection.Send(new RegisterWebResultMessage(true));
return true;
}
else {
Logger.Warning("Web failed to authorize, invalid token.");
await connection.Send(new RegisterWebResultMessage(false));
return false;
}
}
private Props<IMessageToController> CreateMessageHandlerActorProps(RpcConnectionToClient<IMessageToWeb> connection) {
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
return WebMessageHandlerActor.Factory(init);
}
}

View File

@ -54,24 +54,23 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
}
using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
await controllerServices.Initialize();
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.AgentRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.WebRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
return 0;

View File

@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) {
if (SupervisorStrategy != null) {
@ -14,6 +15,10 @@ public readonly struct ActorConfiguration {
if (MailboxType != null) {
props = props.WithMailbox(MailboxType);
}
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props;
}

View File

@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken);
}
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
}

View File

@ -1,6 +0,0 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessage<TListener, TReply> {
MessageQueueKey QueueKey { get; }
Task<TReply> Accept(TListener listener);
}

View File

@ -1,9 +1,6 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
MessageRegistry<TClientListener> ToClient { get; }
MessageRegistry<TServerListener> ToServer { get; }
bool IsRegistrationMessage(Type messageType);
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
public interface IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> : IReplyMessageFactory<TReplyMessage> where TReplyMessage : TClientMessage, TServerMessage {
MessageRegistry<TClientMessage> ToClient { get; }
MessageRegistry<TServerMessage> ToServer { get; }
}

View File

@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Message;
public interface IReplyMessageFactory<TReplyMessage> {
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
}

View File

@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Message;
interface IReplySender {
Task SendReply(uint sequenceId, byte[] serializedReply);
}

View File

@ -1,41 +1,35 @@
using Phantom.Utils.Logging;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
abstract class MessageHandler<TListener> {
protected ILogger Logger { get; }
sealed class MessageHandler<TMessageBase> {
private readonly ILogger logger;
private readonly ActorRef<TMessageBase> handlerActor;
private readonly IReplySender replySender;
private readonly TListener listener;
private readonly MessageQueues messageQueues;
public MessageHandler(string loggerName, ActorRef<TMessageBase> handlerActor, IReplySender replySender) {
this.logger = PhantomLogger.Create("MessageHandler", loggerName);
this.handlerActor = handlerActor;
this.replySender = replySender;
}
protected MessageHandler(string loggerName, TListener listener) {
this.Logger = PhantomLogger.Create("MessageHandler", loggerName);
this.listener = listener;
this.messageQueues = new MessageQueues(loggerName + ":Receive");
public void Tell(TMessageBase message) {
handlerActor.Tell(message);
}
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
messageQueues.Enqueue(message.QueueKey, () => TryHandle<TMessage, TReply>(sequenceId, message));
}
private async Task TryHandle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply;
try {
reply = await message.Accept(listener);
} catch (Exception e) {
Logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
return;
}
if (reply is not NoReply) {
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
}
}
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
internal Task StopReceiving() {
return messageQueues.Stop();
public Task TellAndReply<TMessage, TReply>(TMessage message, uint sequenceId) where TMessage : ICanReply<TReply> {
return handlerActor.Request(message).ContinueWith(task => {
if (task.IsCompletedSuccessfully) {
return replySender.SendReply(sequenceId, MessageSerializer.Serialize(task.Result));
}
if (task.IsFaulted) {
logger.Error(task.Exception, "Failed to handle message {Type}.", message.GetType().Name);
}
return task;
}, TaskScheduler.Default);
}
}

View File

@ -1,9 +0,0 @@
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageQueueKey {
public string Name { get; }
public MessageQueueKey(string name) {
Name = name;
}
}

View File

@ -1,53 +0,0 @@
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
sealed class MessageQueues {
private readonly ILogger logger;
private readonly TaskManager taskManager;
private readonly Dictionary<MessageQueueKey, RpcQueue> queues = new ();
private Task? stopTask;
public MessageQueues(string loggerName) {
this.logger = PhantomLogger.Create<MessageQueues>(loggerName);
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(loggerName));
}
private RpcQueue GetOrCreateQueue(MessageQueueKey key) {
if (!queues.TryGetValue(key, out var queue)) {
queues[key] = queue = new RpcQueue(taskManager, "Message queue for " + key.Name);
}
return queue;
}
public Task Enqueue(MessageQueueKey key, Func<Task> task) {
lock (this) {
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException(new OperationCanceledException());
}
}
public Task<T> Enqueue<T>(MessageQueueKey key, Func<Task<T>> task) {
lock (this) {
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException<T>(new OperationCanceledException());
}
}
internal Task Stop() {
lock (this) {
if (stopTask == null) {
logger.Debug("Stopping " + queues.Count + " message queue(s)...");
stopTask = Task.WhenAll(queues.Values.Select(static queue => queue.Stop()))
.ContinueWith(_ => logger.Debug("All queues stopped."));
queues.Clear();
}
return stopTask;
}
}
}

View File

@ -1,41 +1,49 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using Phantom.Utils.Actor;
using Serilog;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageRegistry<TListener> {
public sealed class MessageRegistry<TMessageBase> {
private const int DefaultBufferSize = 512;
private readonly ILogger logger;
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TListener>>> codeToHandlerMapping = new ();
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TMessageBase>>> codeToHandlerMapping = new ();
public MessageRegistry(ILogger logger) {
this.logger = logger;
}
public void Add<TMessage>(ushort code) where TMessage : IMessage<TListener, NoReply> {
AddTypeCodeMapping<TMessage, NoReply>(code);
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
if (HasReplyType(typeof(TMessage))) {
throw new ArgumentException("This overload is for messages without a reply");
}
AddTypeCodeMapping<TMessage>(code);
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage>);
}
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
if (typeof(TReply) == typeof(NoReply)) {
throw new InvalidOperationException("This overload of Add must not be used with NoReply as the reply type!");
}
AddTypeCodeMapping<TMessage, TReply>(code);
public void Add<TMessage, TReply>(ushort code) where TMessage : TMessageBase, ICanReply<TReply> {
AddTypeCodeMapping<TMessage>(code);
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage, TReply>);
}
private void AddTypeCodeMapping<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
private void AddTypeCodeMapping<TMessage>(ushort code) where TMessage : TMessageBase {
typeToCodeMapping.Add(typeof(TMessage), code);
codeToTypeMapping.Add(code, typeof(TMessage));
}
private bool HasReplyType(Type messageType) {
string replyInterfaceName = typeof(ICanReply<object>).FullName!;
replyInterfaceName = replyInterfaceName[..(replyInterfaceName.IndexOf('`') + 1)];
return messageType.GetInterfaces().Any(type => type.FullName is {} name && name.StartsWith(replyInterfaceName, StringComparison.Ordinal));
}
internal bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
try {
var code = MessageSerializer.ReadCode(ref data);
@ -46,13 +54,27 @@ public sealed class MessageRegistry<TListener> {
}
}
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return Write<TMessage, NoReply>(0, message);
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : TMessageBase {
if (!GetMessageCode<TMessage>(out var code)) {
return default;
}
var buffer = new ArrayBufferWriter<byte>(DefaultBufferSize);
try {
MessageSerializer.WriteCode(buffer, code);
MessageSerializer.Serialize(buffer, message);
CheckWrittenBufferLength<TMessage>(buffer);
return buffer.WrittenSpan;
} catch (Exception e) {
LogWriteFailure<TMessage>(e);
return default;
}
}
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
logger.Error("Unknown message type {Type}.", typeof(TMessage));
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : TMessageBase, ICanReply<TReply> {
if (!GetMessageCode<TMessage>(out var code)) {
return default;
}
@ -60,30 +82,49 @@ public sealed class MessageRegistry<TListener> {
try {
MessageSerializer.WriteCode(buffer, code);
if (typeof(TReply) != typeof(NoReply)) {
MessageSerializer.WriteSequenceId(buffer, sequenceId);
}
MessageSerializer.WriteSequenceId(buffer, sequenceId);
MessageSerializer.Serialize(buffer, message);
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
}
CheckWrittenBufferLength<TMessage>(buffer);
return buffer.WrittenSpan;
} catch (Exception e) {
logger.Error(e, "Failed to serialize message {Type}.", typeof(TMessage).Name);
LogWriteFailure<TMessage>(e);
return default;
}
}
internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
ushort code;
try {
code = MessageSerializer.ReadCode(ref data);
} catch (Exception e) {
logger.Error(e, "Failed to deserialize message code.");
private bool GetMessageCode<TMessage>(out ushort code) where TMessage : TMessageBase {
if (typeToCodeMapping.TryGetValue(typeof(TMessage), out code)) {
return true;
}
else {
logger.Error("Unknown message type {Type}.", typeof(TMessage));
return false;
}
}
private void CheckWrittenBufferLength<TMessage>(ArrayBufferWriter<byte> buffer) where TMessage : TMessageBase {
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
}
}
private void LogWriteFailure<TMessage>(Exception e) where TMessage : TMessageBase {
logger.Error(e, "Failed to serialize message {Type}.", typeof(TMessage).Name);
}
internal bool Read<TMessage>(ReadOnlyMemory<byte> data, out TMessage message) where TMessage : TMessageBase {
if (ReadTypeCode(ref data, out ushort code) && codeToTypeMapping.TryGetValue(code, out var expectedType) && expectedType == typeof(TMessage) && ReadMessage(data, out message)) {
return true;
}
else {
message = default!;
return false;
}
}
internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TMessageBase> handler) {
if (!ReadTypeCode(ref data, out var code)) {
return;
}
@ -95,31 +136,48 @@ public sealed class MessageRegistry<TListener> {
handle(data, code, handler);
}
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, NoReply> {
DeserializeAndEnqueueMessage<TMessage, NoReply>(data, code, handler, 0);
private bool ReadTypeCode(ref ReadOnlyMemory<byte> data, out ushort code) {
try {
code = MessageSerializer.ReadCode(ref data);
return true;
} catch (Exception e) {
code = default;
logger.Error(e, "Failed to deserialize message code.");
return false;
}
}
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
uint sequenceId;
private bool ReadSequenceId<TMessage, TReply>(ref ReadOnlyMemory<byte> data, out uint sequenceId) where TMessage : TMessageBase, ICanReply<TReply> {
try {
sequenceId = MessageSerializer.ReadSequenceId(ref data);
return true;
} catch (Exception e) {
logger.Error(e, "Failed to deserialize sequence ID of message with code {Code}.", code);
return;
sequenceId = default;
logger.Error(e, "Failed to deserialize sequence ID of message {Type}.", typeof(TMessage).Name);
return false;
}
DeserializeAndEnqueueMessage<TMessage, TReply>(data, code, handler, sequenceId);
}
private void DeserializeAndEnqueueMessage<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler, uint sequenceId) where TMessage : IMessage<TListener, TReply> {
TMessage message;
private bool ReadMessage<TMessage>(ReadOnlyMemory<byte> data, out TMessage message) where TMessage : TMessageBase {
try {
message = MessageSerializer.Deserialize<TMessage>(data);
return true;
} catch (Exception e) {
logger.Error(e, "Failed to deserialize message with code {Code}.", code);
return;
message = default!;
logger.Error(e, "Failed to deserialize message {Type}.", typeof(TMessage).Name);
return false;
}
}
handler.Enqueue<TMessage, TReply>(sequenceId, message);
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TMessageBase> handler) where TMessage : TMessageBase {
if (ReadMessage<TMessage>(data, out var message)) {
handler.Tell(message);
}
}
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TMessageBase> handler) where TMessage : TMessageBase, ICanReply<TReply> {
if (ReadSequenceId<TMessage, TReply>(ref data, out var sequenceId) && ReadMessage<TMessage>(data, out var message)) {
handler.TellAndReply<TMessage, TReply>(message, sequenceId);
}
}
}

View File

@ -18,7 +18,7 @@ static class MessageSerializer {
public static T Deserialize<T>(ReadOnlyMemory<byte> memory) {
return MemoryPackSerializer.Deserialize<T>(memory.Span) ?? throw new NullReferenceException();
}
public static void WriteCode(IBufferWriter<byte> destination, ushort value) {
Span<byte> buffer = stackalloc byte[2];
BinaryPrimitives.WriteUInt16LittleEndian(buffer, value);

View File

@ -1,5 +0,0 @@
namespace Phantom.Utils.Rpc.Message;
public readonly struct NoReply {
public static NoReply Instance { get; } = new ();
}

View File

@ -0,0 +1,17 @@
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc.Message;
sealed class ReplySender<TMessageBase, TReplyMessage> : IReplySender where TReplyMessage : TMessageBase {
private readonly RpcConnection<TMessageBase> connection;
private readonly IReplyMessageFactory<TReplyMessage> replyMessageFactory;
public ReplySender(RpcConnection<TMessageBase> connection, IReplyMessageFactory<TReplyMessage> replyMessageFactory) {
this.connection = connection;
this.replyMessageFactory = replyMessageFactory;
}
public Task SendReply(uint sequenceId, byte[] serializedReply) {
return connection.Send(replyMessageFactory.CreateReplyMessage(sequenceId, serializedReply));
}
}

View File

@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>

View File

@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string TcpUrl => "tcp://" + Host + ":" + Port;
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
internal string LoggerName => "Rpc:" + ServiceName;
internal string TcpUrl => "tcp://" + Host + ":" + Port;
}

View File

@ -1,60 +0,0 @@
using System.Threading.Channels;
using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc;
sealed class RpcQueue {
private readonly Channel<Func<Task>> channel = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions {
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});
private readonly Task processingTask;
public RpcQueue(TaskManager taskManager, string taskName) {
this.processingTask = taskManager.Run(taskName, Process);
}
public Task Enqueue(Action action) {
return Enqueue(() => {
action();
return Task.CompletedTask;
});
}
public Task Enqueue(Func<Task> task) {
var completionSource = AsyncTasks.CreateCompletionSource();
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
completionSource.SetCanceled();
}
return completionSource.Task;
}
public Task<T> Enqueue<T>(Func<Task<T>> task) {
var completionSource = AsyncTasks.CreateCompletionSource<T>();
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
completionSource.SetCanceled();
}
return completionSource.Task;
}
private async Task Process() {
try {
await foreach (var task in channel.Reader.ReadAllAsync()) {
await task();
}
} catch (OperationCanceledException) {
// Ignore.
}
}
public Task Stop() {
channel.Writer.Complete();
return processingTask;
}
}

View File

@ -0,0 +1,7 @@
using Phantom.Utils.Actor;
namespace Phantom.Utils.Rpc.Runtime;
public interface IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> where TRegistrationMessage : TServerMessage {
Task<Props<TServerMessage>?> TryRegister(RpcConnectionToClient<TClientMessage> connection, TRegistrationMessage message);
}

View File

@ -1,4 +1,5 @@
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Serilog;
@ -6,18 +7,18 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TClientListener messageListener;
public abstract class RpcClientRuntime<TClientMessage, TServerMessage, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : TClientMessage, TServerMessage {
private readonly RpcConnectionToServer<TServerMessage> connection;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly ActorRef<TClientMessage> handlerActor;
private readonly SemaphoreSlim disconnectSemaphore;
private readonly CancellationToken receiveCancellationToken;
protected RpcClientRuntime(RpcClientSocket<TClientListener, TServerListener, TReplyMessage> socket, TClientListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
protected RpcClientRuntime(RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> socket, ActorRef<TClientMessage> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
this.connection = socket.Connection;
this.messageDefinitions = socket.MessageDefinitions;
this.messageListener = messageListener;
this.handlerActor = handlerActor;
this.disconnectSemaphore = disconnectSemaphore;
this.receiveCancellationToken = receiveCancellationToken;
}
@ -26,8 +27,9 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
return RunWithConnection(socket, connection);
}
protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection) {
var handler = new Handler(LoggerName, connection, messageDefinitions, messageListener);
protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerMessage> connection) {
var replySender = new ReplySender<TServerMessage, TReplyMessage>(connection, messageDefinitions);
var messageHandler = new MessageHandler<TClientMessage>(LoggerName, handlerActor, replySender);
try {
while (!receiveCancellationToken.IsCancellationRequested) {
@ -36,13 +38,13 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
LogMessageType(RuntimeLogger, data);
if (data.Length > 0) {
messageDefinitions.ToClient.Handle(data, handler);
messageDefinitions.ToClient.Handle(data, messageHandler);
}
}
} catch (OperationCanceledException) {
// Ignore.
} finally {
await handler.StopReceiving();
await handlerActor.Stop();
RuntimeLogger.Debug("ZeroMQ client stopped receiving messages.");
await disconnectSemaphore.WaitAsync(CancellationToken.None);
@ -50,12 +52,6 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
}
private protected sealed override async Task Disconnect(ClientSocket socket) {
try {
await connection.StopSending().WaitAsync(TimeSpan.FromSeconds(10), CancellationToken.None);
} catch (TimeoutException) {
RuntimeLogger.Error("Timed out waiting for message sending queue.");
}
await SendDisconnectMessage(socket, RuntimeLogger);
}
@ -73,18 +69,4 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
logger.Verbose("Received {Bytes} B message.", data.Length);
}
}
private sealed class Handler : MessageHandler<TClientListener> {
private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Handler(string loggerName, RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener) : base(loggerName, listener) {
this.connection = connection;
this.messageDefinitions = messageDefinitions;
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
}
}

View File

@ -1,36 +1,27 @@
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcConnection<TListener> {
private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageQueues sendingQueues;
public abstract class RpcConnection<TMessageBase> {
private readonly MessageRegistry<TMessageBase> messageRegistry;
private readonly MessageReplyTracker replyTracker;
internal RpcConnection(string loggerName, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
internal RpcConnection(MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) {
this.messageRegistry = messageRegistry;
this.sendingQueues = new MessageQueues(loggerName + ":Send");
this.replyTracker = replyTracker;
}
private protected abstract ValueTask Send(byte[] bytes);
public Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return sendingQueues.Enqueue(message.QueueKey, () => SendTask(message));
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
return sendingQueues.Enqueue(message.QueueKey, () => SendTask<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken));
}
private async Task SendTask<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
public async Task Send<TMessage>(TMessage message) where TMessage : TMessageBase {
var bytes = messageRegistry.Write(message).ToArray();
if (bytes.Length > 0) {
await Send(bytes);
}
}
private async Task<TReply> SendTask<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
public async Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
@ -46,8 +37,4 @@ public abstract class RpcConnection<TListener> {
public void Receive(IReply message) {
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
internal Task StopSending() {
return sendingQueues.Stop();
}
}

View File

@ -4,29 +4,19 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToClient<TListener> : RpcConnection<TListener> {
public sealed class RpcConnectionToClient<TMessageBase> : RpcConnection<TMessageBase> {
private readonly ServerSocket socket;
private readonly uint routingId;
private readonly TaskCompletionSource<bool> authorizationCompletionSource = new ();
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
public bool IsClosed { get; private set; }
internal RpcConnectionToClient(string loggerName, ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
internal RpcConnectionToClient(ServerSocket socket, uint routingId, MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) : base(messageRegistry, replyTracker) {
this.socket = socket;
this.routingId = routingId;
}
internal Task<bool> GetAuthorization() {
return authorizationCompletionSource.Task;
}
public void SetAuthorizationResult(bool isAuthorized) {
authorizationCompletionSource.SetResult(isAuthorized);
}
public bool IsSame(RpcConnectionToClient<TListener> other) {
public bool IsSame(RpcConnectionToClient<TMessageBase> other) {
return this.routingId == other.routingId && this.socket == other.socket;
}

View File

@ -5,13 +5,13 @@ using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
public sealed class RpcConnectionToServer<TMessageBase> : RpcConnection<TMessageBase> {
private readonly ClientSocket socket;
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
public Task IsReady => isReady.Task;
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) : base(messageRegistry, replyTracker) {
this.socket = socket;
}

View File

@ -0,0 +1,75 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
sealed class RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand>, IWithStash where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
public readonly record struct Init(
string LoggerName,
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> MessageDefinitions,
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> RegistrationHandler,
RpcConnectionToClient<TClientMessage> Connection
);
public static Props<ReceiveMessageCommand> Factory(Init init) {
return Props<ReceiveMessageCommand>.Create(() => new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly string loggerName;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
private readonly RpcConnectionToClient<TClientMessage> connection;
private RpcReceiverActor(Init init) {
this.loggerName = init.LoggerName;
this.messageDefinitions = init.MessageDefinitions;
this.registrationHandler = init.RegistrationHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data);
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (command.MessageType == typeof(TRegistrationMessage)) {
await HandleRegistrationMessage(command);
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private async Task HandleRegistrationMessage(ReceiveMessageCommand command) {
if (!messageDefinitions.ToServer.Read(command.Data, out TRegistrationMessage message)) {
return;
}
var props = await registrationHandler.TryRegister(connection, message);
if (props == null) {
return;
}
var handlerActor = Context.ActorOf(props, "Handler");
var replySender = new ReplySender<TClientMessage, TReplyMessage>(connection, messageDefinitions);
BecomeAuthorized(new MessageHandler<TServerMessage>(loggerName, handlerActor, replySender));
}
private void BecomeAuthorized(MessageHandler<TServerMessage> handler) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(command => messageDefinitions.ToServer.Handle(command.Data, handler));
});
}
}

View File

@ -1,34 +1,44 @@
using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
RpcConfiguration config,
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
IActorRefFactory actorSystem,
CancellationToken cancellationToken
) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
}
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly TaskManager taskManager;
private readonly string serviceName;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
private readonly IActorRefFactory actorSystem;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.registrationHandler = registrationHandler;
this.actorSystem = actorSystem;
this.cancellationToken = cancellationToken;
}
@ -56,31 +66,30 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
if (!clients.TryGetValue(routingId, out var client)) {
if (!messageDefinitions.IsRegistrationMessage(messageType)) {
if (messageType != typeof(TRegistrationMessage)) {
RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
continue;
}
var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
var clientActorName = "Rpc-" + serviceName + "-" + routingId;
// TODO add pings and tear down connection after too much inactivity
var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
}
client.Enqueue(messageType, data);
}
foreach (var client in clients.Values) {
client.Connection.Close();
}
return taskManager.Stop();
return Task.CompletedTask;
}
private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
@ -91,66 +100,38 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
return Task.CompletedTask;
}
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private sealed class Client {
public RpcConnectionToClient<TClientMessage> Connection { get; }
private readonly RpcQueue processingQueue;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
private readonly ILogger logger;
private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
this.Connection = connection;
this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue;
this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager;
}
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
this.logger = PhantomLogger.Create(loggerName);
var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
}
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
if (Logger.IsEnabled(LogEventLevel.Verbose)) {
Logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
if (logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
}
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
Connection.Closed -= OnConnectionClosed;
Logger.Debug("Closing connection...");
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving();
await processingQueue.Stop();
await Connection.StopSending();
Logger.Debug("Connection closed.");
});
logger.Debug("Closing connection...");
receiverActor.Stop();
}
}
}

View File

@ -7,13 +7,13 @@ using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc.Sockets;
public static class RpcClientSocket {
public static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<TClientListener, TServerListener, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcClientSocket<TClientListener, TServerListener, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
public static RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> Connect<TClientMessage, TServerMessage, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
return RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
}
}
public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> {
public sealed class RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : TClientMessage, TServerMessage {
internal static RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : TServerMessage {
var socket = new ClientSocket();
var options = socket.Options;
@ -29,14 +29,14 @@ public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMess
socket.Connect(url);
logger.Information("ZeroMQ client ready.");
return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions);
return new RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage>(socket, config, messageDefinitions);
}
public RpcConnectionToServer<TServerListener> Connection { get; }
internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; }
public RpcConnectionToServer<TServerMessage> Connection { get; }
internal IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> MessageDefinitions { get; }
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions) : base(socket, config) {
MessageDefinitions = messageDefinitions;
Connection = new RpcConnectionToServer<TServerListener>(config.LoggerName, socket, messageDefinitions.ToServer, ReplyTracker);
Connection = new RpcConnectionToServer<TServerMessage>(socket, messageDefinitions.ToServer, ReplyTracker);
}
}

View File

@ -3,7 +3,7 @@ using Phantom.Utils.Logging;
namespace Phantom.Utils.Rpc.Sockets;
public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
sealed class RpcServerSocket : RpcSocket<ServerSocket> {
public static RpcServerSocket Connect(RpcConfiguration config) {
var socket = new ServerSocket();
var options = socket.Options;

View File

@ -8,28 +8,4 @@ public static class AsyncTasks {
public static TaskCompletionSource<T> CreateCompletionSource<T>() {
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
}
public static void SetResultFrom(this TaskCompletionSource completionSource, Task task) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult();
}
}
public static void SetResultFrom<T>(this TaskCompletionSource<T> completionSource, Task<T> task) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult(task.Result);
}
}
}

View File

@ -14,7 +14,7 @@ namespace Phantom.Web.Services;
public static class PhantomWebServices {
public static void AddPhantomServices(this IServiceCollection services) {
services.AddSingleton<ControllerConnection>();
services.AddSingleton<MessageListener>();
services.AddSingleton<ControllerMessageHandlerFactory>();
services.AddSingleton<AgentManager>();
services.AddSingleton<InstanceManager>();

View File

@ -1,12 +1,13 @@
using Phantom.Common.Messages.Web;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Web.Services.Rpc;
public sealed class ControllerConnection {
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly RpcConnectionToServer<IMessageToController> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
}
@ -14,11 +15,11 @@ public sealed class ControllerConnection {
return connection.Send(message);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController<TReply> {
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> {
public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, Timeout.InfiniteTimeSpan, waitForReplyCancellationToken);
}
}

View File

@ -0,0 +1,57 @@
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Web.Services.Agents;
using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc;
sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
public readonly record struct Init(RpcConnectionToServer<IMessageToController> Connection, AgentManager AgentManager, InstanceManager InstanceManager, InstanceLogManager InstanceLogManager, TaskCompletionSource<bool> RegisterSuccessWaiter);
public static Props<IMessageToWeb> Factory(Init init) {
return Props<IMessageToWeb>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly TaskCompletionSource<bool> registerSuccessWaiter;
private ControllerMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.agentManager = init.AgentManager;
this.instanceManager = init.InstanceManager;
this.instanceLogManager = init.InstanceLogManager;
this.registerSuccessWaiter = init.RegisterSuccessWaiter;
Receive<RegisterWebResultMessage>(HandleRegisterWebResult);
Receive<RefreshAgentsMessage>(HandleRefreshAgents);
Receive<RefreshInstancesMessage>(HandleRefreshInstances);
Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<ReplyMessage>(HandleReply);
}
private void HandleRegisterWebResult(RegisterWebResultMessage message) {
registerSuccessWaiter.TrySetResult(message.Success);
}
private void HandleRefreshAgents(RefreshAgentsMessage message) {
agentManager.RefreshAgents(message.Agents);
}
private void HandleRefreshInstances(RefreshInstancesMessage message) {
instanceManager.RefreshInstances(message.Instances);
}
private void HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.AddLines(message.InstanceGuid, message.Lines);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@ -0,0 +1,34 @@
using Akka.Actor;
using Phantom.Common.Messages.Web;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Phantom.Web.Services.Agents;
using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc;
public sealed class ControllerMessageHandlerFactory {
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly TaskCompletionSource<bool> registerSuccessWaiter = AsyncTasks.CreateCompletionSource<bool>();
public Task<bool> RegisterSuccessWaiter => registerSuccessWaiter.Task;
private int messageHandlerId = 0;
public ControllerMessageHandlerFactory(RpcConnectionToServer<IMessageToController> connection, AgentManager agentManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
}
public ActorRef<IMessageToWeb> Create(IActorRefFactory actorSystem) {
int id = Interlocked.Increment(ref messageHandlerId);
return actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(new ControllerMessageHandlerActor.Init(connection, agentManager, instanceManager, instanceLogManager, registerSuccessWaiter)), "ControllerMessageHandler-" + id);
}
}

Some files were not shown because too many files have changed in this diff Show More