Compare commits

...

2 Commits

Author SHA1 Message Date
chylex 873d036895
Migrate event log to Akka.NET 2024-03-22 03:47:29 +01:00
chylex b32a9dba6b
Migrate RPC message handling to Akka.NET 2024-03-22 03:47:29 +01:00
104 changed files with 1141 additions and 1251 deletions

View File

@ -8,9 +8,9 @@ namespace Phantom.Agent.Rpc;
public sealed class ControllerConnection {
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ControllerConnection));
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly RpcConnectionToServer<IMessageToController> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
Logger.Information("Connection ready.");
}

View File

@ -11,10 +11,10 @@ sealed class KeepAliveLoop {
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(10);
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly CancellationTokenSource cancellationTokenSource = new ();
public KeepAliveLoop(RpcConnectionToServer<IMessageToControllerListener> connection) {
public KeepAliveLoop(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
Task.Run(Run);
}

View File

@ -3,20 +3,21 @@ using NetMQ.Sockets;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Sockets;
using Serilog;
namespace Phantom.Agent.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgent, IMessageToController, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToAgent, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToAgent> handlerActorRef, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, handlerActorRef, disconnectSemaphore, receiveCancellationToken).Launch();
}
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
private RpcClientRuntime(RpcClientSocket<IMessageToAgent, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToAgent> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, handlerActor, disconnectSemaphore, receiveCancellationToken) {}
protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection) {
protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToController> connection) {
var keepAliveLoop = new KeepAliveLoop(connection);
try {
await base.RunWithConnection(socket, connection);

View File

@ -4,27 +4,41 @@ using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Agent.Services.Rpc;
public sealed class MessageListener : IMessageToAgentListener {
private static ILogger Logger { get; } = PhantomLogger.Create<MessageListener>();
public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent> {
private static ILogger Logger { get; } = PhantomLogger.Create<ControllerMessageHandlerActor>();
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
public readonly record struct Init(RpcConnectionToServer<IMessageToController> Connection, AgentServices Agent, CancellationTokenSource ShutdownTokenSource);
public static Props<IMessageToAgent> Factory(Init init) {
return Props<IMessageToAgent>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentServices agent;
private readonly CancellationTokenSource shutdownTokenSource;
public MessageListener(RpcConnectionToServer<IMessageToControllerListener> connection, AgentServices agent, CancellationTokenSource shutdownTokenSource) {
this.connection = connection;
this.agent = agent;
this.shutdownTokenSource = shutdownTokenSource;
private ControllerMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.agent = init.Agent;
this.shutdownTokenSource = init.ShutdownTokenSource;
ReceiveAsync<RegisterAgentSuccessMessage>(HandleRegisterAgentSuccess);
Receive<RegisterAgentFailureMessage>(HandleRegisterAgentFailure);
ReceiveAndReplyLater<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(HandleConfigureInstance);
ReceiveAndReplyLater<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(HandleLaunchInstance);
ReceiveAndReplyLater<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(HandleStopInstance);
ReceiveAndReplyLater<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(HandleSendCommandToInstance);
Receive<ReplyMessage>(HandleReply);
}
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
private async Task HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
Logger.Information("Agent authentication successful.");
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
@ -36,7 +50,7 @@ public sealed class MessageListener : IMessageToAgentListener {
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
if (!result.Is(ConfigureInstanceResult.Success)) {
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
return NoReply.Instance;
return;
}
}
@ -44,11 +58,9 @@ public sealed class MessageListener : IMessageToAgentListener {
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
await agent.InstanceSessionManager.RefreshAgentStatus();
return NoReply.Instance;
}
public Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
private void HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
string errorMessage = message.FailureKind switch {
RegisterAgentFailure.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
RegisterAgentFailure.InvalidToken => "Invalid token.",
@ -59,32 +71,29 @@ public sealed class MessageListener : IMessageToAgentListener {
PhantomLogger.Dispose();
Environment.Exit(1);
return Task.FromResult(NoReply.Instance);
}
private Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
return agent.InstanceSessionManager.Configure(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
private async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
return await HandleConfigureInstance(message, alwaysReportStatus: false);
}
public async Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
private async Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return await agent.InstanceSessionManager.Launch(message.InstanceGuid);
}
public async Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
private async Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return await agent.InstanceSessionManager.Stop(message.InstanceGuid, message.StopStrategy);
}
public async Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
private async Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return await agent.InstanceSessionManager.SendCommand(message.InstanceGuid, message.Command);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -7,6 +7,7 @@ using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
@ -51,15 +52,19 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var rpcConfiguration = new RpcConfiguration("Rpc", controllerHost, controllerPort, controllerCertificate);
var rpcConfiguration = new RpcConfiguration("Agent", controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
await agentServices.Initialize();
using var actorSystem = ActorSystemFactory.Create("Web");
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
var rpcMessageHandlerActor = actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageHandlerActor, rpcDisconnectSemaphore, shutdownCancellationToken);
try {
await rpcTask.WaitAsync(shutdownCancellationToken);
} finally {

View File

@ -8,10 +8,10 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public static class AgentMessageRegistries {
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToAgent> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToController> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
public static IMessageDefinitions<IMessageToAgent, IMessageToController, ReplyMessage> Definitions { get; } = new MessageDefinitions();
static AgentMessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0);
@ -33,13 +33,9 @@ public static class AgentMessageRegistries {
ToController.Add<ReplyMessage>(127);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
public bool IsRegistrationMessage(Type messageType) {
return messageType == typeof(RegisterAgentMessage);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgent, IMessageToController, ReplyMessage> {
public MessageRegistry<IMessageToAgent> ToClient => ToAgent;
public MessageRegistry<IMessageToController> ToServer => ToController;
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);

View File

@ -7,17 +7,4 @@ namespace Phantom.Common.Messages.Agent.BiDirectional;
public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToController, IMessageToAgent, IReply {
private static readonly MessageQueueKey MessageQueueKey = new ("Reply");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this);
}
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleReply(this);
}
}
) : IMessageToController, IMessageToAgent, IReply;

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {
MessageQueueKey IMessage<IMessageToAgentListener, TReply>.QueueKey => IMessageToAgent.DefaultQueueKey;
}
public interface IMessageToAgent : IMessageToAgent<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
MessageQueueKey IMessage<IMessageToAgentListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToAgent {}

View File

@ -1,16 +0,0 @@
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgentListener {
Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message);
Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message);
Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message);
Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message);
Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message);
Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
}
public interface IMessageToController : IMessageToController<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToController {}

View File

@ -1,17 +0,0 @@
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToControllerListener {
Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message);
Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message);
Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message);
Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message);
Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message);
Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message);
Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message);
Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,6 +1,7 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
@ -10,8 +11,4 @@ public sealed partial record ConfigureInstanceMessage(
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(2)] InstanceLaunchProperties LaunchProperties,
[property: MemoryPackOrder(3)] bool LaunchNow = false
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleConfigureInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;

View File

@ -1,13 +1,10 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleLaunchInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<LaunchInstanceResult>>;

View File

@ -1,14 +1,9 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentFailureMessage(
[property: MemoryPackOrder(0)] RegisterAgentFailure FailureKind
) : IMessageToAgent {
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentFailure(this);
}
}
) : IMessageToAgent;

View File

@ -1,14 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentSuccessMessage(
[property: MemoryPackOrder(0)] ImmutableArray<ConfigureInstanceMessage> InitialInstanceConfigurations
) : IMessageToAgent {
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentSuccess(this);
}
}
) : IMessageToAgent;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] string Command
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleSendCommandToInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;

View File

@ -1,6 +1,7 @@
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToAgent;
@ -8,8 +9,4 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] MinecraftStopStrategy StopStrategy
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleStopInstance(this);
}
}
) : IMessageToAgent, ICanReply<InstanceActionResult<StopInstanceResult>>;

View File

@ -1,15 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Java;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AdvertiseJavaRuntimesMessage(
[property: MemoryPackOrder(0)] ImmutableArray<TaggedJavaRuntime> Runtimes
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleAdvertiseJavaRuntimes(this);
}
}
) : IMessageToController;

View File

@ -1,11 +1,6 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsAliveMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleAgentIsAlive(this);
}
}
public sealed partial record AgentIsAliveMessage : IMessageToController;

View File

@ -1,6 +1,5 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -8,13 +7,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToController {
private static readonly MessageQueueKey MessageQueueKey = new ("Agent.InstanceOutput");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleInstanceOutput(this);
}
}
) : IMessageToController;

View File

@ -1,7 +1,6 @@
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -9,8 +8,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken,
[property: MemoryPackOrder(1)] AgentInfo AgentInfo
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleRegisterAgent(this);
}
}
) : IMessageToController;

View File

@ -1,6 +1,5 @@
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -8,8 +7,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record ReportAgentStatusMessage(
[property: MemoryPackOrder(0)] int RunningInstanceCount,
[property: MemoryPackOrder(1)] RamAllocationUnits RunningInstanceMemory
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportAgentStatus(this);
}
}
) : IMessageToController;

View File

@ -1,6 +1,5 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -10,8 +9,4 @@ public sealed partial record ReportInstanceEventMessage(
[property: MemoryPackOrder(1)] DateTime UtcTime,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] IInstanceEvent Event
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportInstanceEvent(this);
}
}
) : IMessageToController;

View File

@ -1,6 +1,5 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
@ -8,8 +7,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
public sealed partial record ReportInstanceStatusMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] IInstanceStatus InstanceStatus
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportInstanceStatus(this);
}
}
) : IMessageToController;

View File

@ -1,11 +1,6 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterAgentMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterAgent(this);
}
}
public sealed partial record UnregisterAgentMessage : IMessageToController;

View File

@ -7,12 +7,4 @@ namespace Phantom.Common.Messages.Web.BiDirectional;
public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToController, IMessageToWeb, IReply {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this);
}
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleReply(this);
}
}
) : IMessageToController, IMessageToWeb, IReply;

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
namespace Phantom.Common.Messages.Web;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
}
public interface IMessageToController : IMessageToController<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToController {}

View File

@ -1,35 +0,0 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToControllerListener {
Task<NoReply> HandleRegisterWeb(RegisterWebMessage message);
Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message);
Task<LogInSuccess?> HandleLogIn(LogInMessage message);
Task<CreateOrUpdateAdministratorUserResult> HandleCreateOrUpdateAdministratorUser(CreateOrUpdateAdministratorUserMessage message);
Task<CreateUserResult> HandleCreateUser(CreateUserMessage message);
Task<ImmutableArray<UserInfo>> HandleGetUsers(GetUsersMessage message);
Task<ImmutableArray<RoleInfo>> HandleGetRoles(GetRolesMessage message);
Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> HandleGetUserRoles(GetUserRolesMessage message);
Task<ChangeUserRolesResult> HandleChangeUserRoles(ChangeUserRolesMessage message);
Task<DeleteUserResult> HandleDeleteUser(DeleteUserMessage message);
Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message);
Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message);
Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message);
Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message);
Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message);
Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message);
Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message);
Task<ImmutableArray<EventLogItem>> HandleGetEventLog(GetEventLogMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,12 +1,3 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
namespace Phantom.Common.Messages.Web;
public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {
MessageQueueKey IMessage<IMessageToWebListener, TReply>.QueueKey => IMessageToWeb.DefaultQueueKey;
}
public interface IMessageToWeb : IMessageToWeb<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
MessageQueueKey IMessage<IMessageToWebListener, NoReply>.QueueKey => DefaultQueueKey;
}
public interface IMessageToWeb {}

View File

@ -1,13 +0,0 @@
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToWebListener {
Task<NoReply> HandleRegisterWebResult(RegisterWebResultMessage message);
Task<NoReply> HandleRefreshAgents(RefreshAgentsMessage message);
Task<NoReply> HandleRefreshInstances(RefreshInstancesMessage message);
Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,6 +1,7 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -10,8 +11,4 @@ public sealed partial record ChangeUserRolesMessage(
[property: MemoryPackOrder(1)] Guid SubjectUserGuid,
[property: MemoryPackOrder(2)] ImmutableHashSet<Guid> AddToRoleGuids,
[property: MemoryPackOrder(3)] ImmutableHashSet<Guid> RemoveFromRoleGuids
) : IMessageToController<ChangeUserRolesResult> {
public Task<ChangeUserRolesResult> Accept(IMessageToControllerListener listener) {
return listener.HandleChangeUserRoles(this);
}
}
) : IMessageToController, ICanReply<ChangeUserRolesResult>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Web.ToController;
public sealed partial record CreateOrUpdateAdministratorUserMessage(
[property: MemoryPackOrder(0)] string Username,
[property: MemoryPackOrder(1)] string Password
) : IMessageToController<CreateOrUpdateAdministratorUserResult> {
public Task<CreateOrUpdateAdministratorUserResult> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateOrUpdateAdministratorUser(this);
}
}
) : IMessageToController, ICanReply<CreateOrUpdateAdministratorUserResult>;

View File

@ -2,6 +2,7 @@
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -10,8 +11,4 @@ public sealed partial record CreateOrUpdateInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] InstanceConfiguration Configuration
) : IMessageToController<InstanceActionResult<CreateOrUpdateInstanceResult>> {
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateOrUpdateInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -8,8 +9,4 @@ public sealed partial record CreateUserMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] string Username,
[property: MemoryPackOrder(2)] string Password
) : IMessageToController<CreateUserResult> {
public Task<CreateUserResult> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateUser(this);
}
}
) : IMessageToController, ICanReply<CreateUserResult>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Web.ToController;
public sealed partial record DeleteUserMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid SubjectUserGuid
) : IMessageToController<DeleteUserResult> {
public Task<DeleteUserResult> Accept(IMessageToControllerListener listener) {
return listener.HandleDeleteUser(this);
}
}
) : IMessageToController, ICanReply<DeleteUserResult>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Java;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetAgentJavaRuntimesMessage : IMessageToController<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> {
public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetAgentJavaRuntimes(this);
}
}
public sealed partial record GetAgentJavaRuntimesMessage : IMessageToController, ICanReply<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>;

View File

@ -1,14 +1,11 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetAuditLogMessage(
[property: MemoryPackOrder(0)] int Count
) : IMessageToController<ImmutableArray<AuditLogItem>> {
public Task<ImmutableArray<AuditLogItem>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetAuditLog(this);
}
}
) : IMessageToController, ICanReply<ImmutableArray<AuditLogItem>>;

View File

@ -1,14 +1,11 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetEventLogMessage(
[property: MemoryPackOrder(0)] int Count
) : IMessageToController<ImmutableArray<EventLogItem>> {
public Task<ImmutableArray<EventLogItem>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetEventLog(this);
}
}
) : IMessageToController, ICanReply<ImmutableArray<EventLogItem>>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetMinecraftVersionsMessage : IMessageToController<ImmutableArray<MinecraftVersion>> {
public Task<ImmutableArray<MinecraftVersion>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetMinecraftVersions(this);
}
}
public sealed partial record GetMinecraftVersionsMessage : IMessageToController, ICanReply<ImmutableArray<MinecraftVersion>>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetRolesMessage : IMessageToController<ImmutableArray<RoleInfo>> {
public Task<ImmutableArray<RoleInfo>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetRoles(this);
}
}
public sealed partial record GetRolesMessage : IMessageToController, ICanReply<ImmutableArray<RoleInfo>>;

View File

@ -1,13 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetUserRolesMessage(
[property: MemoryPackOrder(0)] ImmutableHashSet<Guid> UserGuids
) : IMessageToController<ImmutableDictionary<Guid, ImmutableArray<Guid>>> {
public Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetUserRoles(this);
}
}
) : IMessageToController, ICanReply<ImmutableDictionary<Guid, ImmutableArray<Guid>>>;

View File

@ -1,12 +1,9 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record GetUsersMessage : IMessageToController<ImmutableArray<UserInfo>> {
public Task<ImmutableArray<UserInfo>> Accept(IMessageToControllerListener listener) {
return listener.HandleGetUsers(this);
}
}
public sealed partial record GetUsersMessage : IMessageToController, ICanReply<ImmutableArray<UserInfo>>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -8,8 +9,4 @@ public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid
) : IMessageToController<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleLaunchInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<LaunchInstanceResult>>;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Web.Users;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -7,8 +8,4 @@ namespace Phantom.Common.Messages.Web.ToController;
public sealed partial record LogInMessage(
[property: MemoryPackOrder(0)] string Username,
[property: MemoryPackOrder(1)] string Password
) : IMessageToController<LogInSuccess?> {
public Task<LogInSuccess?> Accept(IMessageToControllerListener listener) {
return listener.HandleLogIn(this);
}
}
) : IMessageToController, ICanReply<LogInSuccess?>;

View File

@ -1,14 +1,9 @@
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterWebMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleRegisterWeb(this);
}
}
) : IMessageToController;

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -9,8 +10,4 @@ public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] string Command
) : IMessageToController<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleSendCommandToInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;

View File

@ -1,6 +1,7 @@
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Web.ToController;
@ -10,8 +11,4 @@ public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] MinecraftStopStrategy StopStrategy
) : IMessageToController<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleStopInstance(this);
}
}
) : IMessageToController, ICanReply<InstanceActionResult<StopInstanceResult>>;

View File

@ -1,11 +1,6 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterWebMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterWeb(this);
}
}
public sealed partial record UnregisterWebMessage : IMessageToController;

View File

@ -1,6 +1,5 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
@ -8,13 +7,4 @@ namespace Phantom.Common.Messages.Web.ToWeb;
public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToWeb {
private static readonly MessageQueueKey MessageQueueKey = new ("Web.InstanceOutput");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleInstanceOutput(this);
}
}
) : IMessageToWeb;

View File

@ -1,15 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Agent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RefreshAgentsMessage(
[property: MemoryPackOrder(0)] ImmutableArray<Agent> Agents
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRefreshAgents(this);
}
}
) : IMessageToWeb;

View File

@ -1,15 +1,10 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Web.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RefreshInstancesMessage(
[property: MemoryPackOrder(0)] ImmutableArray<Instance> Instances
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRefreshInstances(this);
}
}
) : IMessageToWeb;

View File

@ -1,13 +1,8 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterWebResultMessage(
[property: MemoryPackOrder(0)] bool Success
) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRegisterWebResult(this);
}
}
) : IMessageToWeb;

View File

@ -15,10 +15,10 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public static class WebMessageRegistries {
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static MessageRegistry<IMessageToController> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToWeb> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
public static IMessageDefinitions<IMessageToWeb, IMessageToController, ReplyMessage> Definitions { get; } = new MessageDefinitions();
static WebMessageRegistries() {
ToController.Add<RegisterWebMessage>(0);
@ -48,13 +48,9 @@ public static class WebMessageRegistries {
ToWeb.Add<ReplyMessage>(127);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> {
public MessageRegistry<IMessageToWebListener> ToClient => ToWeb;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
public bool IsRegistrationMessage(Type messageType) {
return messageType == typeof(RegisterWebMessage);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWeb, IMessageToController, ReplyMessage> {
public MessageRegistry<IMessageToWeb> ToClient => ToWeb;
public MessageRegistry<IMessageToController> ToServer => ToController;
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);

View File

@ -169,9 +169,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private sealed record InitializeCommand : ICommand;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgent> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgent> Connection) : ICommand;
private sealed record RefreshConnectionStatusCommand : ICommand;

View File

@ -1,4 +1,5 @@
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
@ -11,14 +12,14 @@ sealed class AgentConnection {
private readonly Guid agentGuid;
private string agentName;
private RpcConnectionToClient<IMessageToAgentListener>? connection;
private RpcConnectionToClient<IMessageToAgent>? connection;
public AgentConnection(Guid agentGuid, string agentName) {
this.agentName = agentName;
this.agentGuid = agentGuid;
}
public void UpdateConnection(RpcConnectionToClient<IMessageToAgentListener> newConnection, string newAgentName) {
public void UpdateConnection(RpcConnectionToClient<IMessageToAgent> newConnection, string newAgentName) {
lock (this) {
connection?.Close();
connection = newConnection;
@ -26,7 +27,7 @@ sealed class AgentConnection {
}
}
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgentListener> expected) {
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgent> expected) {
lock (this) {
if (connection != null && connection.IsSame(expected)) {
connection.Close();
@ -48,7 +49,7 @@ sealed class AgentConnection {
}
}
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent, ICanReply<TReply> where TReply : class {
lock (this) {
if (connection == null) {
LogAgentOffline();

View File

@ -38,7 +38,7 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
private sealed record FlushChangesCommand : ICommand;
private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) {
this.configurationToStore = command.Configuration;
configurationToStore = command.Configuration;
ScheduleFlush(TimeSpan.FromSeconds(2));
}
@ -72,11 +72,9 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
}
private void ScheduleFlush(TimeSpan delay) {
if (hasScheduledFlush) {
return;
}
if (!hasScheduledFlush) {
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}
}

View File

@ -18,7 +18,7 @@ namespace Phantom.Controller.Services.Agents;
sealed class AgentManager {
private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>();
private readonly ActorSystem actorSystem;
private readonly IActorRefFactory actorSystem;
private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
@ -28,7 +28,7 @@ sealed class AgentManager {
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
public AgentManager(ActorSystem actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
public AgentManager(IActorRefFactory actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.actorSystem = actorSystem;
this.authToken = authToken;
this.controllerState = controllerState;
@ -58,15 +58,15 @@ sealed class AgentManager {
}
}
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgentListener> connection) {
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgent> connection) {
if (!this.authToken.FixedTimeEquals(authToken)) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
return false;
}
var agentProperties = AgentConfiguration.From(agentInfo);
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var configureInstanceMessages = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
var agentActor = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var configureInstanceMessages = await agentActor.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
return true;

View File

@ -1,7 +1,9 @@
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
@ -10,14 +12,15 @@ using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;
namespace Phantom.Controller.Services;
public sealed class ControllerServices : IAsyncDisposable {
private TaskManager TaskManager { get; }
public sealed class ControllerServices : IDisposable {
public ActorSystem ActorSystem { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; }
@ -33,23 +36,22 @@ public sealed class ControllerServices : IAsyncDisposable {
private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
public IRegistrationHandler<IMessageToAgent, IMessageFromAgentToController, RegisterAgentMessage> AgentRegistrationHandler { get; }
public IRegistrationHandler<IMessageToWeb, IMessageFromWebToController, RegisterWebMessage> WebRegistrationHandler { get; }
private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken;
public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, AuthToken webAuthToken, CancellationToken shutdownCancellationToken) {
this.dbProvider = dbProvider;
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller");
this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider);
@ -59,15 +61,10 @@ public sealed class ControllerServices : IAsyncDisposable {
this.UserRoleManager = new UserRoleManager(dbProvider);
this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager);
this.AuditLogManager = new AuditLogManager(dbProvider);
this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken);
}
this.EventLogManager = new EventLogManager(ActorSystem, dbProvider, shutdownCancellationToken);
public AgentMessageListener CreateAgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection) {
return new AgentMessageListener(connection, AgentManager, InstanceLogManager, EventLogManager, cancellationToken);
}
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
this.AgentRegistrationHandler = new AgentRegistrationHandler(AgentManager, InstanceLogManager, EventLogManager);
this.WebRegistrationHandler = new WebRegistrationHandler(webAuthToken, ControllerState, InstanceLogManager, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, MinecraftVersions, EventLogManager);
}
public async Task Initialize() {
@ -77,8 +74,7 @@ public sealed class ControllerServices : IAsyncDisposable {
await RoleManager.Initialize();
}
public async ValueTask DisposeAsync() {
await actorSystem.Terminate();
actorSystem.Dispose();
public void Dispose() {
ActorSystem.Dispose();
}
}

View File

@ -0,0 +1,77 @@
using Phantom.Common.Data.Web.EventLog;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Events;
sealed class EventLogDatabaseStorageActor : ReceiveActor<EventLogDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<EventLogDatabaseStorageActor>();
public readonly record struct Init(IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new EventLogDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly LinkedList<StoreEventCommand> pendingCommands = new ();
private bool hasScheduledFlush = false;
private EventLogDatabaseStorageActor(Init init) {
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<StoreEventCommand>(StoreEvent);
ReceiveAsync<FlushChangesCommand>(FlushChanges);
}
public interface ICommand {}
public sealed record StoreEventCommand(Guid EventGuid, DateTime UtcTime, Guid? AgentGuid, EventLogEventType EventType, string SubjectId, Dictionary<string, object?>? Extra = null) : ICommand;
private sealed record FlushChangesCommand : ICommand;
private void StoreEvent(StoreEventCommand command) {
pendingCommands.AddLast(command);
ScheduleFlush(TimeSpan.FromMilliseconds(500));
}
private async Task FlushChanges(FlushChangesCommand command) {
hasScheduledFlush = false;
if (pendingCommands.Count == 0) {
return;
}
try {
await using var db = dbProvider.Lazy();
var eventLogRepository = new EventLogRepository(db);
foreach (var (eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra) in pendingCommands) {
eventLogRepository.AddItem(eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
} catch (Exception e) {
ScheduleFlush(TimeSpan.FromSeconds(10));
Logger.Error(e, "Could not store {EventCount} event(s) in database.", pendingCommands.Count);
return;
}
Logger.Information("Stored {EventCount} event(s) in database.", pendingCommands.Count);
pendingCommands.Clear();
}
private void ScheduleFlush(TimeSpan delay) {
if (!hasScheduledFlush) {
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}
}

View File

@ -1,30 +1,25 @@
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Tasks;
using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Events;
sealed partial class EventLogManager {
private readonly ActorRef<EventLogDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly IDbContextProvider dbProvider;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
public EventLogManager(IDbContextProvider dbProvider, TaskManager taskManager, CancellationToken cancellationToken) {
public EventLogManager(IActorRefFactory actorSystem, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.databaseStorageActor = actorSystem.ActorOf(EventLogDatabaseStorageActor.Factory(new EventLogDatabaseStorageActor.Init(dbProvider, cancellationToken)), "EventLogDatabaseStorage");
this.dbProvider = dbProvider;
this.taskManager = taskManager;
this.cancellationToken = cancellationToken;
}
public void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
taskManager.Run("Store event log item to database", () => AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra));
}
public async Task AddItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
await using var db = dbProvider.Lazy();
new EventLogRepository(db).AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra);
await db.Ctx.SaveChangesAsync(cancellationToken);
private void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
databaseStorageActor.Tell(new EventLogDatabaseStorageActor.StoreEventCommand(eventGuid, utcTime, agentGuid, eventType, subjectId, extra));
}
public async Task<ImmutableArray<EventLogItem>> GetMostRecentItems(int count) {

View File

@ -11,13 +11,13 @@ using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActor, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly ActorRef<AgentActor.ICommand> agentActor;
private readonly AgentConnection agentConnection;
private readonly CancellationToken cancellationToken;
@ -30,7 +30,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
private readonly ActorRef<InstanceDatabaseStorageActor.ICommand> databaseStorageActor;
private InstanceActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentActor = init.AgentActor;
this.agentConnection = init.AgentConnection;
this.cancellationToken = init.CancellationToken;
@ -46,7 +46,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
private void NotifyInstanceUpdated() {
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
agentActor.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
}
private void SetLaunchAutomatically(bool newValue) {
@ -56,7 +56,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent, ICanReply<InstanceActionResult<TReply>> {
var reply = await agentConnection.Send<TMessage, InstanceActionResult<TReply>>(message, TimeSpan.FromSeconds(10), cancellationToken);
return reply.DidNotReplyIfNull();
}

View File

@ -6,7 +6,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" />
<PackageReference Include="BCrypt.Net-Next.StrongName" />
</ItemGroup>

View File

@ -0,0 +1,91 @@
using Akka.Actor;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
public readonly record struct Init(Guid AgentGuid, RpcConnectionToClient<IMessageToAgent> Connection, AgentRegistrationHandler AgentRegistrationHandler, AgentManager AgentManager, InstanceLogManager InstanceLogManager, EventLogManager EventLogManager);
public static Props<IMessageToController> Factory(Init init) {
return Props<IMessageToController>.Create(() => new AgentMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
public IStash Stash { get; set; } = null!;
private readonly Guid agentGuid;
private readonly RpcConnectionToClient<IMessageToAgent> connection;
private readonly AgentRegistrationHandler agentRegistrationHandler;
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
private AgentMessageHandlerActor(Init init) {
this.agentGuid = init.AgentGuid;
this.connection = init.Connection;
this.agentRegistrationHandler = init.AgentRegistrationHandler;
this.agentManager = init.AgentManager;
this.instanceLogManager = init.InstanceLogManager;
this.eventLogManager = init.EventLogManager;
ReceiveAsync<RegisterAgentMessage>(HandleRegisterAgent);
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
Receive<AgentIsAliveMessage>(HandleAgentIsAlive);
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
Receive<ReportAgentStatusMessage>(HandleReportAgentStatus);
Receive<ReportInstanceStatusMessage>(HandleReportInstanceStatus);
Receive<ReportInstanceEventMessage>(HandleReportInstanceEvent);
Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<ReplyMessage>(HandleReply);
}
private async Task HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuid != message.AgentInfo.AgentGuid) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
}
else {
await agentRegistrationHandler.TryRegisterImpl(connection, message);
}
}
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UnregisterCommand(connection));
connection.Close();
}
private void HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.NotifyIsAliveCommand());
}
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
}
private void HandleReportAgentStatus(ReportAgentStatusMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
}
private void HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
agentManager.TellAgent(agentGuid, new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
}
private void HandleReportInstanceEvent(ReportInstanceEventMessage message) {
message.Event.Accept(eventLogManager.CreateInstanceEventVisitor(message.EventGuid, message.UtcTime, agentGuid, message.InstanceGuid));
}
private void HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.ReceiveLines(message.InstanceGuid, message.Lines);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@ -1,92 +0,0 @@
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services.Rpc;
public sealed class AgentMessageListener : IMessageToControllerListener {
private readonly RpcConnectionToClient<IMessageToAgentListener> connection;
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
private readonly CancellationToken cancellationToken;
private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>();
internal AgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection, AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager, CancellationToken cancellationToken) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceLogManager = instanceLogManager;
this.eventLogManager = eventLogManager;
this.cancellationToken = cancellationToken;
}
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.AgentGuid) {
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
}
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection)) {
connection.SetAuthorizationResult(true);
agentGuidWaiter.SetResult(message.AgentInfo.AgentGuid);
}
return NoReply.Instance;
}
private async Task<Guid> WaitForAgentGuid() {
return await agentGuidWaiter.Task.WaitAsync(cancellationToken);
}
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted) {
agentManager.TellAgent(agentGuidWaiter.Task.Result, new AgentActor.UnregisterCommand(connection));
}
connection.Close();
return Task.FromResult(NoReply.Instance);
}
public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.NotifyIsAliveCommand());
return NoReply.Instance;
}
public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
return NoReply.Instance;
}
public async Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message) {
message.Event.Accept(eventLogManager.CreateInstanceEventVisitor(message.EventGuid, message.UtcTime, await WaitForAgentGuid(), message.InstanceGuid));
return NoReply.Instance;
}
public Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.ReceiveLines(message.InstanceGuid, message.Lines);
return Task.FromResult(NoReply.Instance);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -0,0 +1,34 @@
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class AgentRegistrationHandler : IRegistrationHandler<IMessageToAgent, IMessageToController, RegisterAgentMessage> {
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager;
public AgentRegistrationHandler(AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager) {
this.agentManager = agentManager;
this.instanceLogManager = instanceLogManager;
this.eventLogManager = eventLogManager;
}
async Task<Props<IMessageToController>?> IRegistrationHandler<IMessageToAgent, IMessageToController, RegisterAgentMessage>.TryRegister(RpcConnectionToClient<IMessageToAgent> connection, RegisterAgentMessage message) {
return await TryRegisterImpl(connection, message) ? CreateMessageHandlerActorProps(message.AgentInfo.AgentGuid, connection) : null;
}
public Task<bool> TryRegisterImpl(RpcConnectionToClient<IMessageToAgent> connection, RegisterAgentMessage message) {
return agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection);
}
private Props<IMessageToController> CreateMessageHandlerActorProps(Guid agentGuid, RpcConnectionToClient<IMessageToAgent> connection) {
var init = new AgentMessageHandlerActor.Init(agentGuid, connection, this, agentManager, instanceLogManager, eventLogManager);
return AgentMessageHandlerActor.Factory(init);
}
}

View File

@ -0,0 +1,72 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class WebMessageDataUpdateSenderActor : ReceiveActor<WebMessageDataUpdateSenderActor.ICommand> {
public readonly record struct Init(RpcConnectionToClient<IMessageToWeb> Connection, ControllerState ControllerState, InstanceLogManager InstanceLogManager);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new WebMessageDataUpdateSenderActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToClient<IMessageToWeb> connection;
private readonly ControllerState controllerState;
private readonly InstanceLogManager instanceLogManager;
private readonly ActorRef<ICommand> selfCached;
private WebMessageDataUpdateSenderActor(Init init) {
this.connection = init.Connection;
this.controllerState = init.ControllerState;
this.instanceLogManager = init.InstanceLogManager;
this.selfCached = SelfTyped;
ReceiveAsync<RefreshAgentsCommand>(RefreshAgents);
ReceiveAsync<RefreshInstancesCommand>(RefreshInstances);
ReceiveAsync<ReceiveInstanceLogsCommand>(ReceiveInstanceLogs);
}
protected override void PreStart() {
controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
instanceLogManager.LogsReceived += OnInstanceLogsReceived;
}
protected override void PostStop() {
instanceLogManager.LogsReceived -= OnInstanceLogsReceived;
controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
}
private void OnInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
selfCached.Tell(new ReceiveInstanceLogsCommand(e.InstanceGuid, e.Lines));
}
public interface ICommand {}
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
private sealed record ReceiveInstanceLogsCommand(Guid InstanceGuid, ImmutableArray<string> Lines) : ICommand;
private Task RefreshAgents(RefreshAgentsCommand command) {
return connection.Send(new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray()));
}
private Task RefreshInstances(RefreshInstancesCommand command) {
return connection.Send(new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray()));
}
private Task ReceiveInstanceLogs(ReceiveInstanceLogsCommand command) {
return connection.Send(new InstanceOutputMessage(command.InstanceGuid, command.Lines));
}
}

View File

@ -0,0 +1,166 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Rpc;
sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
public readonly record struct Init(
RpcConnectionToClient<IMessageToWeb> Connection,
WebRegistrationHandler WebRegistrationHandler,
ControllerState ControllerState,
InstanceLogManager InstanceLogManager,
UserManager UserManager,
RoleManager RoleManager,
UserRoleManager UserRoleManager,
UserLoginManager UserLoginManager,
AuditLogManager AuditLogManager,
AgentManager AgentManager,
MinecraftVersions MinecraftVersions,
EventLogManager EventLogManager
);
public static Props<IMessageToController> Factory(Init init) {
return Props<IMessageToController>.Create(() => new WebMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToClient<IMessageToWeb> connection;
private readonly WebRegistrationHandler webRegistrationHandler;
private readonly ControllerState controllerState;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
private WebMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.webRegistrationHandler = init.WebRegistrationHandler;
this.controllerState = init.ControllerState;
this.userManager = init.UserManager;
this.roleManager = init.RoleManager;
this.userRoleManager = init.UserRoleManager;
this.userLoginManager = init.UserLoginManager;
this.auditLogManager = init.AuditLogManager;
this.agentManager = init.AgentManager;
this.minecraftVersions = init.MinecraftVersions;
this.eventLogManager = init.EventLogManager;
var senderActorInit = new WebMessageDataUpdateSenderActor.Init(connection, controllerState, init.InstanceLogManager);
Context.ActorOf(WebMessageDataUpdateSenderActor.Factory(senderActorInit), "DataUpdateSender");
ReceiveAsync<RegisterWebMessage>(HandleRegisterWeb);
Receive<UnregisterWebMessage>(HandleUnregisterWeb);
ReceiveAndReplyLater<CreateOrUpdateAdministratorUserMessage, CreateOrUpdateAdministratorUserResult>(HandleCreateOrUpdateAdministratorUser);
ReceiveAndReplyLater<CreateUserMessage, CreateUserResult>(HandleCreateUser);
ReceiveAndReplyLater<GetUsersMessage, ImmutableArray<UserInfo>>(HandleGetUsers);
ReceiveAndReplyLater<GetRolesMessage, ImmutableArray<RoleInfo>>(HandleGetRoles);
ReceiveAndReplyLater<GetUserRolesMessage, ImmutableDictionary<Guid, ImmutableArray<Guid>>>(HandleGetUserRoles);
ReceiveAndReplyLater<ChangeUserRolesMessage, ChangeUserRolesResult>(HandleChangeUserRoles);
ReceiveAndReplyLater<DeleteUserMessage, DeleteUserResult>(HandleDeleteUser);
ReceiveAndReplyLater<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(HandleCreateOrUpdateInstance);
ReceiveAndReplyLater<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(HandleLaunchInstance);
ReceiveAndReplyLater<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(HandleStopInstance);
ReceiveAndReplyLater<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(HandleSendCommandToInstance);
ReceiveAndReplyLater<GetMinecraftVersionsMessage, ImmutableArray<MinecraftVersion>>(HandleGetMinecraftVersions);
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
ReceiveAndReplyLater<GetAuditLogMessage, ImmutableArray<AuditLogItem>>(HandleGetAuditLog);
ReceiveAndReplyLater<GetEventLogMessage, ImmutableArray<EventLogItem>>(HandleGetEventLog);
ReceiveAndReplyLater<LogInMessage, LogInSuccess?>(HandleLogIn);
Receive<ReplyMessage>(HandleReply);
}
private async Task HandleRegisterWeb(RegisterWebMessage message) {
await webRegistrationHandler.TryRegisterImpl(connection, message);
}
private void HandleUnregisterWeb(UnregisterWebMessage message) {
connection.Close();
}
private Task<CreateOrUpdateAdministratorUserResult> HandleCreateOrUpdateAdministratorUser(CreateOrUpdateAdministratorUserMessage message) {
return userManager.CreateOrUpdateAdministrator(message.Username, message.Password);
}
private Task<CreateUserResult> HandleCreateUser(CreateUserMessage message) {
return userManager.Create(message.LoggedInUserGuid, message.Username, message.Password);
}
private Task<ImmutableArray<UserInfo>> HandleGetUsers(GetUsersMessage message) {
return userManager.GetAll();
}
private Task<ImmutableArray<RoleInfo>> HandleGetRoles(GetRolesMessage message) {
return roleManager.GetAll();
}
private Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> HandleGetUserRoles(GetUserRolesMessage message) {
return userRoleManager.GetUserRoles(message.UserGuids);
}
private Task<ChangeUserRolesResult> HandleChangeUserRoles(ChangeUserRolesMessage message) {
return userRoleManager.ChangeUserRoles(message.LoggedInUserGuid, message.SubjectUserGuid, message.AddToRoleGuids, message.RemoveFromRoleGuids);
}
private Task<DeleteUserResult> HandleDeleteUser(DeleteUserMessage message) {
return userManager.DeleteByGuid(message.LoggedInUserGuid, message.SubjectUserGuid);
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
}
private Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
}
private Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
return minecraftVersions.GetVersions(CancellationToken.None);
}
private ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) {
return controllerState.AgentJavaRuntimesByGuid;
}
private Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) {
return auditLogManager.GetMostRecentItems(message.Count);
}
private Task<ImmutableArray<EventLogItem>> HandleGetEventLog(GetEventLogMessage message) {
return eventLogManager.GetMostRecentItems(message.Count);
}
private Task<LogInSuccess?> HandleLogIn(LogInMessage message) {
return userLoginManager.LogIn(message.Username, message.Password);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@ -1,229 +0,0 @@
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
using Agent = Phantom.Common.Data.Web.Agent.Agent;
namespace Phantom.Controller.Services.Rpc;
public sealed class WebMessageListener : IMessageToControllerListener {
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
private static int listenerSequenceId = 0;
private readonly ActorRef<ICommand> actor;
private readonly RpcConnectionToClient<IMessageToWebListener> connection;
private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly InstanceLogManager instanceLogManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
internal WebMessageListener(
IActorRefFactory actorSystem,
RpcConnectionToClient<IMessageToWebListener> connection,
AuthToken authToken,
ControllerState controllerState,
UserManager userManager,
RoleManager roleManager,
UserRoleManager userRoleManager,
UserLoginManager userLoginManager,
AuditLogManager auditLogManager,
AgentManager agentManager,
InstanceLogManager instanceLogManager,
MinecraftVersions minecraftVersions,
EventLogManager eventLogManager
) {
this.actor = actorSystem.ActorOf(Actor.Factory(this), "Web-" + Interlocked.Increment(ref listenerSequenceId));
this.connection = connection;
this.authToken = authToken;
this.controllerState = controllerState;
this.userManager = userManager;
this.roleManager = roleManager;
this.userRoleManager = userRoleManager;
this.userLoginManager = userLoginManager;
this.auditLogManager = auditLogManager;
this.agentManager = agentManager;
this.instanceLogManager = instanceLogManager;
this.minecraftVersions = minecraftVersions;
this.eventLogManager = eventLogManager;
}
private sealed class Actor : ReceiveActor<ICommand> {
public static Props<ICommand> Factory(WebMessageListener listener) {
return Props<ICommand>.Create(() => new Actor(listener), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly WebMessageListener listener;
private Actor(WebMessageListener listener) {
this.listener = listener;
Receive<StartConnectionCommand>(StartConnection);
Receive<StopConnectionCommand>(StopConnection);
Receive<RefreshAgentsCommand>(RefreshAgents);
Receive<RefreshInstancesCommand>(RefreshInstances);
}
private void StartConnection(StartConnectionCommand command) {
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
}
private void StopConnection(StopConnectionCommand command) {
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
}
private void RefreshAgents(RefreshAgentsCommand command) {
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void RefreshInstances(RefreshInstancesCommand command) {
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
}
}
private interface ICommand {}
private sealed record StartConnectionCommand : ICommand;
private sealed record StopConnectionCommand : ICommand;
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
if (authToken.FixedTimeEquals(message.AuthToken)) {
Logger.Information("Web authorized successfully.");
connection.SetAuthorizationResult(true);
await connection.Send(new RegisterWebResultMessage(true));
}
else {
Logger.Warning("Web failed to authorize, invalid token.");
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterWebResultMessage(false));
}
if (!connection.IsClosed) {
actor.Tell(new StartConnectionCommand());
}
return NoReply.Instance;
}
public Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message) {
if (!connection.IsClosed) {
connection.Close();
actor.Tell(new StopConnectionCommand());
}
return Task.FromResult(NoReply.Instance);
}
public Task<CreateOrUpdateAdministratorUserResult> HandleCreateOrUpdateAdministratorUser(CreateOrUpdateAdministratorUserMessage message) {
return userManager.CreateOrUpdateAdministrator(message.Username, message.Password);
}
public Task<CreateUserResult> HandleCreateUser(CreateUserMessage message) {
return userManager.Create(message.LoggedInUserGuid, message.Username, message.Password);
}
public Task<ImmutableArray<UserInfo>> HandleGetUsers(GetUsersMessage message) {
return userManager.GetAll();
}
public Task<ImmutableArray<RoleInfo>> HandleGetRoles(GetRolesMessage message) {
return roleManager.GetAll();
}
public Task<ImmutableDictionary<Guid, ImmutableArray<Guid>>> HandleGetUserRoles(GetUserRolesMessage message) {
return userRoleManager.GetUserRoles(message.UserGuids);
}
public Task<ChangeUserRolesResult> HandleChangeUserRoles(ChangeUserRolesMessage message) {
return userRoleManager.ChangeUserRoles(message.LoggedInUserGuid, message.SubjectUserGuid, message.AddToRoleGuids, message.RemoveFromRoleGuids);
}
public Task<DeleteUserResult> HandleDeleteUser(DeleteUserMessage message) {
return userManager.DeleteByGuid(message.LoggedInUserGuid, message.SubjectUserGuid);
}
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
}
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
}
public Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
}
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
}
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
return minecraftVersions.GetVersions(CancellationToken.None);
}
public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) {
return Task.FromResult(controllerState.AgentJavaRuntimesByGuid);
}
public Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) {
return auditLogManager.GetMostRecentItems(message.Count);
}
public Task<ImmutableArray<EventLogItem>> HandleGetEventLog(GetEventLogMessage message) {
return eventLogManager.GetMostRecentItems(message.Count);
}
public Task<LogInSuccess?> HandleLogIn(LogInMessage message) {
return userLoginManager.LogIn(message.Username, message.Password);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -0,0 +1,67 @@
using Phantom.Common.Data;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Controller.Services.Rpc;
sealed class WebRegistrationHandler : IRegistrationHandler<IMessageToWeb, IMessageToController, RegisterWebMessage> {
private static readonly ILogger Logger = PhantomLogger.Create<WebRegistrationHandler>();
private readonly AuthToken webAuthToken;
private readonly ControllerState controllerState;
private readonly InstanceLogManager instanceLogManager;
private readonly UserManager userManager;
private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager;
private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager;
public WebRegistrationHandler(AuthToken webAuthToken, ControllerState controllerState, InstanceLogManager instanceLogManager, UserManager userManager, RoleManager roleManager, UserRoleManager userRoleManager, UserLoginManager userLoginManager, AuditLogManager auditLogManager, AgentManager agentManager, MinecraftVersions minecraftVersions, EventLogManager eventLogManager) {
this.webAuthToken = webAuthToken;
this.controllerState = controllerState;
this.userManager = userManager;
this.roleManager = roleManager;
this.userRoleManager = userRoleManager;
this.userLoginManager = userLoginManager;
this.auditLogManager = auditLogManager;
this.agentManager = agentManager;
this.minecraftVersions = minecraftVersions;
this.eventLogManager = eventLogManager;
this.instanceLogManager = instanceLogManager;
}
async Task<Props<IMessageToController>?> IRegistrationHandler<IMessageToWeb, IMessageToController, RegisterWebMessage>.TryRegister(RpcConnectionToClient<IMessageToWeb> connection, RegisterWebMessage message) {
return await TryRegisterImpl(connection, message) ? CreateMessageHandlerActorProps(connection) : null;
}
public async Task<bool> TryRegisterImpl(RpcConnectionToClient<IMessageToWeb> connection, RegisterWebMessage message) {
if (webAuthToken.FixedTimeEquals(message.AuthToken)) {
Logger.Information("Web authorized successfully.");
await connection.Send(new RegisterWebResultMessage(true));
return true;
}
else {
Logger.Warning("Web failed to authorize, invalid token.");
await connection.Send(new RegisterWebResultMessage(false));
return false;
}
}
private Props<IMessageToController> CreateMessageHandlerActorProps(RpcConnectionToClient<IMessageToWeb> connection) {
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
return WebMessageHandlerActor.Factory(init);
}
}

View File

@ -55,24 +55,23 @@ try {
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.AgentRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.WebRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
}
return 0;
} catch (OperationCanceledException) {

View File

@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) {
if (SupervisorStrategy != null) {
@ -15,6 +16,10 @@ public readonly struct ActorConfiguration {
props = props.WithMailbox(MailboxType);
}
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props;
}
}

View File

@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken);
}
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
}

View File

@ -1,6 +0,0 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessage<TListener, TReply> {
MessageQueueKey QueueKey { get; }
Task<TReply> Accept(TListener listener);
}

View File

@ -1,9 +1,6 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
MessageRegistry<TClientListener> ToClient { get; }
MessageRegistry<TServerListener> ToServer { get; }
bool IsRegistrationMessage(Type messageType);
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
public interface IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> : IReplyMessageFactory<TReplyMessage> where TReplyMessage : TClientMessage, TServerMessage {
MessageRegistry<TClientMessage> ToClient { get; }
MessageRegistry<TServerMessage> ToServer { get; }
}

View File

@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Message;
public interface IReplyMessageFactory<TReplyMessage> {
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
}

View File

@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Message;
interface IReplySender {
Task SendReply(uint sequenceId, byte[] serializedReply);
}

View File

@ -1,41 +1,35 @@
using Phantom.Utils.Logging;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
abstract class MessageHandler<TListener> {
protected ILogger Logger { get; }
sealed class MessageHandler<TMessageBase> {
private readonly ILogger logger;
private readonly ActorRef<TMessageBase> handlerActor;
private readonly IReplySender replySender;
private readonly TListener listener;
private readonly MessageQueues messageQueues;
protected MessageHandler(string loggerName, TListener listener) {
this.Logger = PhantomLogger.Create("MessageHandler", loggerName);
this.listener = listener;
this.messageQueues = new MessageQueues(loggerName + ":Receive");
public MessageHandler(string loggerName, ActorRef<TMessageBase> handlerActor, IReplySender replySender) {
this.logger = PhantomLogger.Create("MessageHandler", loggerName);
this.handlerActor = handlerActor;
this.replySender = replySender;
}
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
messageQueues.Enqueue(message.QueueKey, () => TryHandle<TMessage, TReply>(sequenceId, message));
public void Tell(TMessageBase message) {
handlerActor.Tell(message);
}
private async Task TryHandle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply;
try {
reply = await message.Accept(listener);
} catch (Exception e) {
Logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
return;
public Task TellAndReply<TMessage, TReply>(TMessage message, uint sequenceId) where TMessage : ICanReply<TReply> {
return handlerActor.Request(message).ContinueWith(task => {
if (task.IsCompletedSuccessfully) {
return replySender.SendReply(sequenceId, MessageSerializer.Serialize(task.Result));
}
if (reply is not NoReply) {
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
}
if (task.IsFaulted) {
logger.Error(task.Exception, "Failed to handle message {Type}.", message.GetType().Name);
}
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
internal Task StopReceiving() {
return messageQueues.Stop();
return task;
}, TaskScheduler.Default);
}
}

View File

@ -1,9 +0,0 @@
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageQueueKey {
public string Name { get; }
public MessageQueueKey(string name) {
Name = name;
}
}

View File

@ -1,53 +0,0 @@
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
sealed class MessageQueues {
private readonly ILogger logger;
private readonly TaskManager taskManager;
private readonly Dictionary<MessageQueueKey, RpcQueue> queues = new ();
private Task? stopTask;
public MessageQueues(string loggerName) {
this.logger = PhantomLogger.Create<MessageQueues>(loggerName);
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(loggerName));
}
private RpcQueue GetOrCreateQueue(MessageQueueKey key) {
if (!queues.TryGetValue(key, out var queue)) {
queues[key] = queue = new RpcQueue(taskManager, "Message queue for " + key.Name);
}
return queue;
}
public Task Enqueue(MessageQueueKey key, Func<Task> task) {
lock (this) {
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException(new OperationCanceledException());
}
}
public Task<T> Enqueue<T>(MessageQueueKey key, Func<Task<T>> task) {
lock (this) {
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException<T>(new OperationCanceledException());
}
}
internal Task Stop() {
lock (this) {
if (stopTask == null) {
logger.Debug("Stopping " + queues.Count + " message queue(s)...");
stopTask = Task.WhenAll(queues.Values.Select(static queue => queue.Stop()))
.ContinueWith(_ => logger.Debug("All queues stopped."));
queues.Clear();
}
return stopTask;
}
}
}

View File

@ -1,41 +1,49 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using Phantom.Utils.Actor;
using Serilog;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageRegistry<TListener> {
public sealed class MessageRegistry<TMessageBase> {
private const int DefaultBufferSize = 512;
private readonly ILogger logger;
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TListener>>> codeToHandlerMapping = new ();
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TMessageBase>>> codeToHandlerMapping = new ();
public MessageRegistry(ILogger logger) {
this.logger = logger;
}
public void Add<TMessage>(ushort code) where TMessage : IMessage<TListener, NoReply> {
AddTypeCodeMapping<TMessage, NoReply>(code);
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
if (HasReplyType(typeof(TMessage))) {
throw new ArgumentException("This overload is for messages without a reply");
}
AddTypeCodeMapping<TMessage>(code);
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage>);
}
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
if (typeof(TReply) == typeof(NoReply)) {
throw new InvalidOperationException("This overload of Add must not be used with NoReply as the reply type!");
}
AddTypeCodeMapping<TMessage, TReply>(code);
public void Add<TMessage, TReply>(ushort code) where TMessage : TMessageBase, ICanReply<TReply> {
AddTypeCodeMapping<TMessage>(code);
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage, TReply>);
}
private void AddTypeCodeMapping<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
private void AddTypeCodeMapping<TMessage>(ushort code) where TMessage : TMessageBase {
typeToCodeMapping.Add(typeof(TMessage), code);
codeToTypeMapping.Add(code, typeof(TMessage));
}
private bool HasReplyType(Type messageType) {
string replyInterfaceName = typeof(ICanReply<object>).FullName!;
replyInterfaceName = replyInterfaceName[..(replyInterfaceName.IndexOf('`') + 1)];
return messageType.GetInterfaces().Any(type => type.FullName is {} name && name.StartsWith(replyInterfaceName, StringComparison.Ordinal));
}
internal bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
try {
var code = MessageSerializer.ReadCode(ref data);
@ -46,13 +54,8 @@ public sealed class MessageRegistry<TListener> {
}
}
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return Write<TMessage, NoReply>(0, message);
}
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
logger.Error("Unknown message type {Type}.", typeof(TMessage));
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : TMessageBase {
if (!GetMessageCode<TMessage>(out var code)) {
return default;
}
@ -60,30 +63,68 @@ public sealed class MessageRegistry<TListener> {
try {
MessageSerializer.WriteCode(buffer, code);
if (typeof(TReply) != typeof(NoReply)) {
MessageSerializer.WriteSequenceId(buffer, sequenceId);
}
MessageSerializer.Serialize(buffer, message);
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
}
CheckWrittenBufferLength<TMessage>(buffer);
return buffer.WrittenSpan;
} catch (Exception e) {
logger.Error(e, "Failed to serialize message {Type}.", typeof(TMessage).Name);
LogWriteFailure<TMessage>(e);
return default;
}
}
internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
ushort code;
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : TMessageBase, ICanReply<TReply> {
if (!GetMessageCode<TMessage>(out var code)) {
return default;
}
var buffer = new ArrayBufferWriter<byte>(DefaultBufferSize);
try {
code = MessageSerializer.ReadCode(ref data);
MessageSerializer.WriteCode(buffer, code);
MessageSerializer.WriteSequenceId(buffer, sequenceId);
MessageSerializer.Serialize(buffer, message);
CheckWrittenBufferLength<TMessage>(buffer);
return buffer.WrittenSpan;
} catch (Exception e) {
logger.Error(e, "Failed to deserialize message code.");
LogWriteFailure<TMessage>(e);
return default;
}
}
private bool GetMessageCode<TMessage>(out ushort code) where TMessage : TMessageBase {
if (typeToCodeMapping.TryGetValue(typeof(TMessage), out code)) {
return true;
}
else {
logger.Error("Unknown message type {Type}.", typeof(TMessage));
return false;
}
}
private void CheckWrittenBufferLength<TMessage>(ArrayBufferWriter<byte> buffer) where TMessage : TMessageBase {
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
}
}
private void LogWriteFailure<TMessage>(Exception e) where TMessage : TMessageBase {
logger.Error(e, "Failed to serialize message {Type}.", typeof(TMessage).Name);
}
internal bool Read<TMessage>(ReadOnlyMemory<byte> data, out TMessage message) where TMessage : TMessageBase {
if (ReadTypeCode(ref data, out ushort code) && codeToTypeMapping.TryGetValue(code, out var expectedType) && expectedType == typeof(TMessage) && ReadMessage(data, out message)) {
return true;
}
else {
message = default!;
return false;
}
}
internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TMessageBase> handler) {
if (!ReadTypeCode(ref data, out var code)) {
return;
}
@ -95,31 +136,48 @@ public sealed class MessageRegistry<TListener> {
handle(data, code, handler);
}
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, NoReply> {
DeserializeAndEnqueueMessage<TMessage, NoReply>(data, code, handler, 0);
private bool ReadTypeCode(ref ReadOnlyMemory<byte> data, out ushort code) {
try {
code = MessageSerializer.ReadCode(ref data);
return true;
} catch (Exception e) {
code = default;
logger.Error(e, "Failed to deserialize message code.");
return false;
}
}
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
uint sequenceId;
private bool ReadSequenceId<TMessage, TReply>(ref ReadOnlyMemory<byte> data, out uint sequenceId) where TMessage : TMessageBase, ICanReply<TReply> {
try {
sequenceId = MessageSerializer.ReadSequenceId(ref data);
return true;
} catch (Exception e) {
logger.Error(e, "Failed to deserialize sequence ID of message with code {Code}.", code);
return;
sequenceId = default;
logger.Error(e, "Failed to deserialize sequence ID of message {Type}.", typeof(TMessage).Name);
return false;
}
}
DeserializeAndEnqueueMessage<TMessage, TReply>(data, code, handler, sequenceId);
}
private void DeserializeAndEnqueueMessage<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler, uint sequenceId) where TMessage : IMessage<TListener, TReply> {
TMessage message;
private bool ReadMessage<TMessage>(ReadOnlyMemory<byte> data, out TMessage message) where TMessage : TMessageBase {
try {
message = MessageSerializer.Deserialize<TMessage>(data);
return true;
} catch (Exception e) {
logger.Error(e, "Failed to deserialize message with code {Code}.", code);
return;
message = default!;
logger.Error(e, "Failed to deserialize message {Type}.", typeof(TMessage).Name);
return false;
}
}
handler.Enqueue<TMessage, TReply>(sequenceId, message);
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TMessageBase> handler) where TMessage : TMessageBase {
if (ReadMessage<TMessage>(data, out var message)) {
handler.Tell(message);
}
}
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TMessageBase> handler) where TMessage : TMessageBase, ICanReply<TReply> {
if (ReadSequenceId<TMessage, TReply>(ref data, out var sequenceId) && ReadMessage<TMessage>(data, out var message)) {
handler.TellAndReply<TMessage, TReply>(message, sequenceId);
}
}
}

View File

@ -1,5 +0,0 @@
namespace Phantom.Utils.Rpc.Message;
public readonly struct NoReply {
public static NoReply Instance { get; } = new ();
}

View File

@ -0,0 +1,17 @@
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc.Message;
sealed class ReplySender<TMessageBase, TReplyMessage> : IReplySender where TReplyMessage : TMessageBase {
private readonly RpcConnection<TMessageBase> connection;
private readonly IReplyMessageFactory<TReplyMessage> replyMessageFactory;
public ReplySender(RpcConnection<TMessageBase> connection, IReplyMessageFactory<TReplyMessage> replyMessageFactory) {
this.connection = connection;
this.replyMessageFactory = replyMessageFactory;
}
public Task SendReply(uint sequenceId, byte[] serializedReply) {
return connection.Send(replyMessageFactory.CreateReplyMessage(sequenceId, serializedReply));
}
}

View File

@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>

View File

@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string TcpUrl => "tcp://" + Host + ":" + Port;
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
internal string LoggerName => "Rpc:" + ServiceName;
internal string TcpUrl => "tcp://" + Host + ":" + Port;
}

View File

@ -1,60 +0,0 @@
using System.Threading.Channels;
using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc;
sealed class RpcQueue {
private readonly Channel<Func<Task>> channel = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions {
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});
private readonly Task processingTask;
public RpcQueue(TaskManager taskManager, string taskName) {
this.processingTask = taskManager.Run(taskName, Process);
}
public Task Enqueue(Action action) {
return Enqueue(() => {
action();
return Task.CompletedTask;
});
}
public Task Enqueue(Func<Task> task) {
var completionSource = AsyncTasks.CreateCompletionSource();
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
completionSource.SetCanceled();
}
return completionSource.Task;
}
public Task<T> Enqueue<T>(Func<Task<T>> task) {
var completionSource = AsyncTasks.CreateCompletionSource<T>();
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
completionSource.SetCanceled();
}
return completionSource.Task;
}
private async Task Process() {
try {
await foreach (var task in channel.Reader.ReadAllAsync()) {
await task();
}
} catch (OperationCanceledException) {
// Ignore.
}
}
public Task Stop() {
channel.Writer.Complete();
return processingTask;
}
}

View File

@ -0,0 +1,7 @@
using Phantom.Utils.Actor;
namespace Phantom.Utils.Rpc.Runtime;
public interface IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> where TRegistrationMessage : TServerMessage {
Task<Props<TServerMessage>?> TryRegister(RpcConnectionToClient<TClientMessage> connection, TRegistrationMessage message);
}

View File

@ -1,4 +1,5 @@
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Serilog;
@ -6,18 +7,18 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TClientListener messageListener;
public abstract class RpcClientRuntime<TClientMessage, TServerMessage, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : TClientMessage, TServerMessage {
private readonly RpcConnectionToServer<TServerMessage> connection;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly ActorRef<TClientMessage> handlerActor;
private readonly SemaphoreSlim disconnectSemaphore;
private readonly CancellationToken receiveCancellationToken;
protected RpcClientRuntime(RpcClientSocket<TClientListener, TServerListener, TReplyMessage> socket, TClientListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
protected RpcClientRuntime(RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> socket, ActorRef<TClientMessage> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
this.connection = socket.Connection;
this.messageDefinitions = socket.MessageDefinitions;
this.messageListener = messageListener;
this.handlerActor = handlerActor;
this.disconnectSemaphore = disconnectSemaphore;
this.receiveCancellationToken = receiveCancellationToken;
}
@ -26,8 +27,9 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
return RunWithConnection(socket, connection);
}
protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection) {
var handler = new Handler(LoggerName, connection, messageDefinitions, messageListener);
protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerMessage> connection) {
var replySender = new ReplySender<TServerMessage, TReplyMessage>(connection, messageDefinitions);
var messageHandler = new MessageHandler<TClientMessage>(LoggerName, handlerActor, replySender);
try {
while (!receiveCancellationToken.IsCancellationRequested) {
@ -36,13 +38,13 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
LogMessageType(RuntimeLogger, data);
if (data.Length > 0) {
messageDefinitions.ToClient.Handle(data, handler);
messageDefinitions.ToClient.Handle(data, messageHandler);
}
}
} catch (OperationCanceledException) {
// Ignore.
} finally {
await handler.StopReceiving();
await handlerActor.Stop();
RuntimeLogger.Debug("ZeroMQ client stopped receiving messages.");
await disconnectSemaphore.WaitAsync(CancellationToken.None);
@ -50,12 +52,6 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
}
private protected sealed override async Task Disconnect(ClientSocket socket) {
try {
await connection.StopSending().WaitAsync(TimeSpan.FromSeconds(10), CancellationToken.None);
} catch (TimeoutException) {
RuntimeLogger.Error("Timed out waiting for message sending queue.");
}
await SendDisconnectMessage(socket, RuntimeLogger);
}
@ -73,18 +69,4 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
logger.Verbose("Received {Bytes} B message.", data.Length);
}
}
private sealed class Handler : MessageHandler<TClientListener> {
private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Handler(string loggerName, RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener) : base(loggerName, listener) {
this.connection = connection;
this.messageDefinitions = messageDefinitions;
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
}
}

View File

@ -1,36 +1,27 @@
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcConnection<TListener> {
private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageQueues sendingQueues;
public abstract class RpcConnection<TMessageBase> {
private readonly MessageRegistry<TMessageBase> messageRegistry;
private readonly MessageReplyTracker replyTracker;
internal RpcConnection(string loggerName, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
internal RpcConnection(MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) {
this.messageRegistry = messageRegistry;
this.sendingQueues = new MessageQueues(loggerName + ":Send");
this.replyTracker = replyTracker;
}
private protected abstract ValueTask Send(byte[] bytes);
public Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return sendingQueues.Enqueue(message.QueueKey, () => SendTask(message));
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
return sendingQueues.Enqueue(message.QueueKey, () => SendTask<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken));
}
private async Task SendTask<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
public async Task Send<TMessage>(TMessage message) where TMessage : TMessageBase {
var bytes = messageRegistry.Write(message).ToArray();
if (bytes.Length > 0) {
await Send(bytes);
}
}
private async Task<TReply> SendTask<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
public async Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
@ -46,8 +37,4 @@ public abstract class RpcConnection<TListener> {
public void Receive(IReply message) {
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
internal Task StopSending() {
return sendingQueues.Stop();
}
}

View File

@ -4,29 +4,19 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToClient<TListener> : RpcConnection<TListener> {
public sealed class RpcConnectionToClient<TMessageBase> : RpcConnection<TMessageBase> {
private readonly ServerSocket socket;
private readonly uint routingId;
private readonly TaskCompletionSource<bool> authorizationCompletionSource = new ();
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
public bool IsClosed { get; private set; }
internal RpcConnectionToClient(string loggerName, ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
internal RpcConnectionToClient(ServerSocket socket, uint routingId, MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) : base(messageRegistry, replyTracker) {
this.socket = socket;
this.routingId = routingId;
}
internal Task<bool> GetAuthorization() {
return authorizationCompletionSource.Task;
}
public void SetAuthorizationResult(bool isAuthorized) {
authorizationCompletionSource.SetResult(isAuthorized);
}
public bool IsSame(RpcConnectionToClient<TListener> other) {
public bool IsSame(RpcConnectionToClient<TMessageBase> other) {
return this.routingId == other.routingId && this.socket == other.socket;
}

View File

@ -5,13 +5,13 @@ using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
public sealed class RpcConnectionToServer<TMessageBase> : RpcConnection<TMessageBase> {
private readonly ClientSocket socket;
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
public Task IsReady => isReady.Task;
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) : base(messageRegistry, replyTracker) {
this.socket = socket;
}

View File

@ -0,0 +1,75 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
sealed class RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand>, IWithStash where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
public readonly record struct Init(
string LoggerName,
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> MessageDefinitions,
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> RegistrationHandler,
RpcConnectionToClient<TClientMessage> Connection
);
public static Props<ReceiveMessageCommand> Factory(Init init) {
return Props<ReceiveMessageCommand>.Create(() => new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly string loggerName;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
private readonly RpcConnectionToClient<TClientMessage> connection;
private RpcReceiverActor(Init init) {
this.loggerName = init.LoggerName;
this.messageDefinitions = init.MessageDefinitions;
this.registrationHandler = init.RegistrationHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data);
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (command.MessageType == typeof(TRegistrationMessage)) {
await HandleRegistrationMessage(command);
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private async Task HandleRegistrationMessage(ReceiveMessageCommand command) {
if (!messageDefinitions.ToServer.Read(command.Data, out TRegistrationMessage message)) {
return;
}
var props = await registrationHandler.TryRegister(connection, message);
if (props == null) {
return;
}
var handlerActor = Context.ActorOf(props, "Handler");
var replySender = new ReplySender<TClientMessage, TReplyMessage>(connection, messageDefinitions);
BecomeAuthorized(new MessageHandler<TServerMessage>(loggerName, handlerActor, replySender));
}
private void BecomeAuthorized(MessageHandler<TServerMessage> handler) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(command => messageDefinitions.ToServer.Handle(command.Data, handler));
});
}
}

View File

@ -1,34 +1,44 @@
using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
RpcConfiguration config,
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
IActorRefFactory actorSystem,
CancellationToken cancellationToken
) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
}
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly TaskManager taskManager;
private readonly string serviceName;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
private readonly IActorRefFactory actorSystem;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.registrationHandler = registrationHandler;
this.actorSystem = actorSystem;
this.cancellationToken = cancellationToken;
}
@ -56,31 +66,30 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
if (!clients.TryGetValue(routingId, out var client)) {
if (!messageDefinitions.IsRegistrationMessage(messageType)) {
if (messageType != typeof(TRegistrationMessage)) {
RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
continue;
}
var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
var clientActorName = "Rpc-" + serviceName + "-" + routingId;
// TODO add pings and tear down connection after too much inactivity
var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
}
}
foreach (var client in clients.Values) {
client.Connection.Close();
}
return taskManager.Stop();
return Task.CompletedTask;
}
private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
@ -91,66 +100,38 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
return Task.CompletedTask;
}
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private sealed class Client {
public RpcConnectionToClient<TClientMessage> Connection { get; }
private readonly RpcQueue processingQueue;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager;
private readonly ILogger logger;
private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
this.Connection = connection;
this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue;
this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager;
}
this.logger = PhantomLogger.Create(loggerName);
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
}
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
if (Logger.IsEnabled(LogEventLevel.Verbose)) {
Logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
if (logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
}
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
Connection.Closed -= OnConnectionClosed;
Logger.Debug("Closing connection...");
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving();
await processingQueue.Stop();
await Connection.StopSending();
Logger.Debug("Connection closed.");
});
logger.Debug("Closing connection...");
receiverActor.Stop();
}
}
}

View File

@ -7,13 +7,13 @@ using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc.Sockets;
public static class RpcClientSocket {
public static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<TClientListener, TServerListener, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcClientSocket<TClientListener, TServerListener, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
public static RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> Connect<TClientMessage, TServerMessage, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
return RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
}
}
public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> {
public sealed class RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : TClientMessage, TServerMessage {
internal static RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : TServerMessage {
var socket = new ClientSocket();
var options = socket.Options;
@ -29,14 +29,14 @@ public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMess
socket.Connect(url);
logger.Information("ZeroMQ client ready.");
return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions);
return new RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage>(socket, config, messageDefinitions);
}
public RpcConnectionToServer<TServerListener> Connection { get; }
internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; }
public RpcConnectionToServer<TServerMessage> Connection { get; }
internal IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> MessageDefinitions { get; }
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions) : base(socket, config) {
MessageDefinitions = messageDefinitions;
Connection = new RpcConnectionToServer<TServerListener>(config.LoggerName, socket, messageDefinitions.ToServer, ReplyTracker);
Connection = new RpcConnectionToServer<TServerMessage>(socket, messageDefinitions.ToServer, ReplyTracker);
}
}

View File

@ -3,7 +3,7 @@ using Phantom.Utils.Logging;
namespace Phantom.Utils.Rpc.Sockets;
public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
sealed class RpcServerSocket : RpcSocket<ServerSocket> {
public static RpcServerSocket Connect(RpcConfiguration config) {
var socket = new ServerSocket();
var options = socket.Options;

View File

@ -8,28 +8,4 @@ public static class AsyncTasks {
public static TaskCompletionSource<T> CreateCompletionSource<T>() {
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
}
public static void SetResultFrom(this TaskCompletionSource completionSource, Task task) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult();
}
}
public static void SetResultFrom<T>(this TaskCompletionSource<T> completionSource, Task<T> task) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult(task.Result);
}
}
}

View File

@ -14,7 +14,7 @@ namespace Phantom.Web.Services;
public static class PhantomWebServices {
public static void AddPhantomServices(this IServiceCollection services) {
services.AddSingleton<ControllerConnection>();
services.AddSingleton<MessageListener>();
services.AddSingleton<ControllerMessageHandlerFactory>();
services.AddSingleton<AgentManager>();
services.AddSingleton<InstanceManager>();

View File

@ -1,12 +1,13 @@
using Phantom.Common.Messages.Web;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Web.Services.Rpc;
public sealed class ControllerConnection {
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly RpcConnectionToServer<IMessageToController> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
}
@ -14,11 +15,11 @@ public sealed class ControllerConnection {
return connection.Send(message);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController<TReply> {
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> {
public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, Timeout.InfiniteTimeSpan, waitForReplyCancellationToken);
}
}

View File

@ -0,0 +1,57 @@
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Web.Services.Agents;
using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc;
sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
public readonly record struct Init(RpcConnectionToServer<IMessageToController> Connection, AgentManager AgentManager, InstanceManager InstanceManager, InstanceLogManager InstanceLogManager, TaskCompletionSource<bool> RegisterSuccessWaiter);
public static Props<IMessageToWeb> Factory(Init init) {
return Props<IMessageToWeb>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly TaskCompletionSource<bool> registerSuccessWaiter;
private ControllerMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.agentManager = init.AgentManager;
this.instanceManager = init.InstanceManager;
this.instanceLogManager = init.InstanceLogManager;
this.registerSuccessWaiter = init.RegisterSuccessWaiter;
Receive<RegisterWebResultMessage>(HandleRegisterWebResult);
Receive<RefreshAgentsMessage>(HandleRefreshAgents);
Receive<RefreshInstancesMessage>(HandleRefreshInstances);
Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<ReplyMessage>(HandleReply);
}
private void HandleRegisterWebResult(RegisterWebResultMessage message) {
registerSuccessWaiter.TrySetResult(message.Success);
}
private void HandleRefreshAgents(RefreshAgentsMessage message) {
agentManager.RefreshAgents(message.Agents);
}
private void HandleRefreshInstances(RefreshInstancesMessage message) {
instanceManager.RefreshInstances(message.Instances);
}
private void HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.AddLines(message.InstanceGuid, message.Lines);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@ -0,0 +1,34 @@
using Akka.Actor;
using Phantom.Common.Messages.Web;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Phantom.Web.Services.Agents;
using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc;
public sealed class ControllerMessageHandlerFactory {
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly TaskCompletionSource<bool> registerSuccessWaiter = AsyncTasks.CreateCompletionSource<bool>();
public Task<bool> RegisterSuccessWaiter => registerSuccessWaiter.Task;
private int messageHandlerId = 0;
public ControllerMessageHandlerFactory(RpcConnectionToServer<IMessageToController> connection, AgentManager agentManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
}
public ActorRef<IMessageToWeb> Create(IActorRefFactory actorSystem) {
int id = Interlocked.Increment(ref messageHandlerId);
return actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(new ControllerMessageHandlerActor.Init(connection, agentManager, instanceManager, instanceLogManager, registerSuccessWaiter)), "ControllerMessageHandler-" + id);
}
}

View File

@ -1,51 +0,0 @@
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Phantom.Web.Services.Agents;
using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc;
public sealed class MessageListener : IMessageToWebListener {
public TaskCompletionSource<bool> RegisterSuccessWaiter { get; } = AsyncTasks.CreateCompletionSource<bool>();
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
public MessageListener(RpcConnectionToServer<IMessageToControllerListener> connection, AgentManager agentManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
}
public Task<NoReply> HandleRegisterWebResult(RegisterWebResultMessage message) {
RegisterSuccessWaiter.TrySetResult(message.Success);
return Task.FromResult(NoReply.Instance);
}
public Task<NoReply> HandleRefreshAgents(RefreshAgentsMessage message) {
agentManager.RefreshAgents(message.Agents);
return Task.FromResult(NoReply.Instance);
}
public Task<NoReply> HandleRefreshInstances(RefreshInstancesMessage message) {
instanceManager.RefreshInstances(message.Instances);
return Task.FromResult(NoReply.Instance);
}
public Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.AddLines(message.InstanceGuid, message.Lines);
return Task.FromResult(NoReply.Instance);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

Some files were not shown because too many files have changed in this diff Show More