mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-10-18 03:39:36 +02:00
Compare commits
6 Commits
360d22fdb9
...
2947fa3522
Author | SHA1 | Date | |
---|---|---|---|
2947fa3522
|
|||
4d1a79307f
|
|||
cc4eb8aa9c
|
|||
de1767c876
|
|||
39f2fa4b17
|
|||
19e0d6fd3d
|
@@ -1,21 +1,15 @@
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Agent.Rpc;
|
||||
|
||||
public sealed class ControllerConnection {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ControllerConnection));
|
||||
|
||||
private readonly RpcConnectionToServer<IMessageToController> connection;
|
||||
|
||||
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
|
||||
this.connection = connection;
|
||||
Logger.Information("Connection ready.");
|
||||
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> sendChannel) {
|
||||
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||
return sendChannel.SendMessage(message, CancellationToken.None /* TODO */);
|
||||
}
|
||||
|
||||
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||
return connection.Send(message);
|
||||
// TODO handle properly
|
||||
public bool TrySend<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||
return sendChannel.TrySendMessage(message);
|
||||
}
|
||||
}
|
||||
|
@@ -1,7 +1,6 @@
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Agent.Rpc;
|
||||
|
@@ -1,37 +0,0 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Agent.Rpc;
|
||||
|
||||
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgent, IMessageToController, ReplyMessage> {
|
||||
public static Task Launch(RpcClientSocket<IMessageToAgent, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToAgent> handlerActorRef, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
||||
return new RpcClientRuntime(socket, handlerActorRef, disconnectSemaphore, receiveCancellationToken).Launch();
|
||||
}
|
||||
|
||||
private RpcClientRuntime(RpcClientSocket<IMessageToAgent, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToAgent> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, handlerActor, disconnectSemaphore, receiveCancellationToken) {}
|
||||
|
||||
protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToController> connection) {
|
||||
var keepAliveLoop = new KeepAliveLoop(connection);
|
||||
try {
|
||||
await base.RunWithConnection(socket, connection);
|
||||
} finally {
|
||||
keepAliveLoop.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
|
||||
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray();
|
||||
try {
|
||||
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
||||
} catch (TimeoutException) {
|
||||
logger.Error("Timed out communicating agent shutdown with the controller.");
|
||||
}
|
||||
}
|
||||
}
|
@@ -58,7 +58,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
|
||||
private void ReportCurrentStatus() {
|
||||
agentState.UpdateInstance(new Instance(instanceGuid, currentStatus));
|
||||
instanceServices.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, currentStatus));
|
||||
instanceServices.ControllerConnection.TrySend(new ReportInstanceStatusMessage(instanceGuid, currentStatus));
|
||||
}
|
||||
|
||||
private void TransitionState(InstanceRunningState? newState) {
|
||||
|
@@ -7,6 +7,6 @@ namespace Phantom.Agent.Services.Instances;
|
||||
|
||||
sealed record InstanceContext(Guid InstanceGuid, string ShortName, ILogger Logger, InstanceServices Services, ActorRef<InstanceActor.ICommand> Actor, CancellationToken ActorCancellationToken) {
|
||||
public void ReportEvent(IInstanceEvent instanceEvent) {
|
||||
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, InstanceGuid, instanceEvent));
|
||||
Services.ControllerConnection.TrySend(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, InstanceGuid, instanceEvent));
|
||||
}
|
||||
}
|
||||
|
@@ -91,7 +91,7 @@ sealed class InstanceTicketManager {
|
||||
|
||||
public void RefreshAgentStatus() {
|
||||
lock (this) {
|
||||
controllerConnection.Send(new ReportAgentStatusMessage(activeTicketGuids.Count, usedMemory));
|
||||
controllerConnection.TrySend(new ReportAgentStatusMessage(activeTicketGuids.Count, usedMemory));
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -63,7 +63,7 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
|
||||
|
||||
private void SendOutputToServer(ImmutableArray<string> lines) {
|
||||
if (!lines.IsEmpty) {
|
||||
controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines));
|
||||
controllerConnection.TrySend(new InstanceOutputMessage(instanceGuid, lines));
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -38,7 +38,7 @@ sealed class InstancePlayerCountTracker : CancellableBackgroundTask {
|
||||
}
|
||||
|
||||
onlinePlayerCountChanged?.Invoke(this, value?.Online);
|
||||
controllerConnection.Send(new ReportInstancePlayerCountsMessage(instanceGuid, value));
|
||||
controllerConnection.TrySend(new ReportInstancePlayerCountsMessage(instanceGuid, value));
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -3,7 +3,6 @@ using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Actor;
|
||||
@@ -16,18 +15,18 @@ namespace Phantom.Agent.Services.Rpc;
|
||||
public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent> {
|
||||
private static ILogger Logger { get; } = PhantomLogger.Create<ControllerMessageHandlerActor>();
|
||||
|
||||
public readonly record struct Init(RpcConnectionToServer<IMessageToController> Connection, AgentServices Agent, CancellationTokenSource ShutdownTokenSource);
|
||||
public readonly record struct Init(RpcSendChannel<IMessageToController> SendChannel, AgentServices Agent, CancellationTokenSource ShutdownTokenSource);
|
||||
|
||||
public static Props<IMessageToAgent> Factory(Init init) {
|
||||
return Props<IMessageToAgent>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||
}
|
||||
|
||||
private readonly RpcConnectionToServer<IMessageToController> connection;
|
||||
private readonly RpcSendChannel<IMessageToController> sendChannel;
|
||||
private readonly AgentServices agent;
|
||||
private readonly CancellationTokenSource shutdownTokenSource;
|
||||
|
||||
private ControllerMessageHandlerActor(Init init) {
|
||||
this.connection = init.Connection;
|
||||
this.sendChannel = init.SendChannel;
|
||||
this.agent = init.Agent;
|
||||
this.shutdownTokenSource = init.ShutdownTokenSource;
|
||||
|
||||
@@ -37,7 +36,6 @@ public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent
|
||||
ReceiveAndReplyLater<LaunchInstanceMessage, Result<LaunchInstanceResult, InstanceActionFailure>>(HandleLaunchInstance);
|
||||
ReceiveAndReplyLater<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(HandleStopInstance);
|
||||
ReceiveAndReplyLater<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(HandleSendCommandToInstance);
|
||||
Receive<ReplyMessage>(HandleReply);
|
||||
}
|
||||
|
||||
private async Task HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
|
||||
@@ -56,9 +54,7 @@ public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent
|
||||
}
|
||||
}
|
||||
|
||||
connection.SetIsReady();
|
||||
|
||||
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
|
||||
await sendChannel.SendMessage(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All), CancellationToken.None);
|
||||
agent.InstanceTicketManager.RefreshAgentStatus();
|
||||
}
|
||||
|
||||
@@ -94,8 +90,4 @@ public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent
|
||||
private async Task<Result<SendCommandToInstanceResult, InstanceActionFailure>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
|
||||
return await agent.InstanceManager.Request(new InstanceManagerActor.SendCommandToInstanceCommand(message.InstanceGuid, message.Command));
|
||||
}
|
||||
|
||||
private void HandleReply(ReplyMessage message) {
|
||||
connection.Receive(message);
|
||||
}
|
||||
}
|
||||
|
@@ -2,11 +2,15 @@
|
||||
using Phantom.Agent;
|
||||
using Phantom.Agent.Rpc;
|
||||
using Phantom.Agent.Services;
|
||||
using Phantom.Agent.Services.Rpc;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.New;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Phantom.Utils.Threading;
|
||||
|
||||
const int ProtocolVersion = 1;
|
||||
|
||||
@@ -48,41 +52,40 @@ try {
|
||||
|
||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
||||
|
||||
var rpcClient = new RpcClient<IMessageToController, IMessageToAgent>("Controller", controllerHost, controllerPort, "phantom-controller", certificateThumbprint, null);
|
||||
var rpcConnection = await rpcClient.Connect(shutdownCancellationToken);
|
||||
if (rpcConnection == null) {
|
||||
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", controllerHost, controllerPort, "phantom-controller", certificateThumbprint, null, AgentMessageRegistries.Definitions, shutdownCancellationToken);
|
||||
if (rpcClient == null) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// var rpcConfiguration = new RpcConfiguration("Agent", controllerHost, controllerPort, controllerCertificate);
|
||||
// var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
|
||||
//
|
||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
|
||||
await agentServices.Initialize();
|
||||
|
||||
Task? rpcClientListener = null;
|
||||
try {
|
||||
// var rpcConfiguration = new RpcConfiguration("Agent", controllerHost, controllerPort, controllerCertificate);
|
||||
// var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
|
||||
//
|
||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel));
|
||||
await agentServices.Initialize();
|
||||
|
||||
} finally {
|
||||
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcClient.SendChannel, agentServices, shutdownCancellationTokenSource);
|
||||
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
|
||||
|
||||
rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor);
|
||||
|
||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||
await agentServices.Shutdown();
|
||||
} finally {
|
||||
try {
|
||||
await rpcClient.SendChannel.SendMessage(new UnregisterAgentMessage(), CancellationToken.None);
|
||||
// TODO wait for acknowledgment
|
||||
} catch (Exception e) {
|
||||
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
|
||||
} finally {
|
||||
await rpcClient.Shutdown();
|
||||
|
||||
if (rpcClientListener != null) {
|
||||
await rpcClientListener;
|
||||
}
|
||||
}
|
||||
}
|
||||
//
|
||||
// var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
|
||||
// var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
|
||||
//
|
||||
// var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
||||
// var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageHandlerActor, rpcDisconnectSemaphore, shutdownCancellationToken);
|
||||
// try {
|
||||
// await rpcTask.WaitAsync(shutdownCancellationToken);
|
||||
// } finally {
|
||||
// shutdownCancellationTokenSource.Cancel();
|
||||
// await agentServices.Shutdown();
|
||||
//
|
||||
// rpcDisconnectSemaphore.Release();
|
||||
// await rpcTask;
|
||||
// rpcDisconnectSemaphore.Dispose();
|
||||
//
|
||||
// NetMQConfig.Cleanup();
|
||||
// }
|
||||
|
||||
return 0;
|
||||
} catch (OperationCanceledException) {
|
||||
|
@@ -1,4 +1,4 @@
|
||||
using Phantom.Utils.Rpc.New;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
namespace Phantom.Common.Data;
|
||||
|
||||
|
@@ -1,6 +1,5 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Logging;
|
||||
@@ -12,7 +11,7 @@ public static class AgentMessageRegistries {
|
||||
public static MessageRegistry<IMessageToAgent> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
|
||||
public static MessageRegistry<IMessageToController> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
|
||||
|
||||
public static IMessageDefinitions<IMessageToAgent, IMessageToController, ReplyMessage> Definitions { get; } = new MessageDefinitions();
|
||||
public static IMessageDefinitions<IMessageToController, IMessageToAgent> Definitions { get; } = new MessageDefinitions();
|
||||
|
||||
static AgentMessageRegistries() {
|
||||
ToAgent.Add<RegisterAgentSuccessMessage>(0);
|
||||
@@ -21,7 +20,6 @@ public static class AgentMessageRegistries {
|
||||
ToAgent.Add<LaunchInstanceMessage, Result<LaunchInstanceResult, InstanceActionFailure>>(3);
|
||||
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
|
||||
ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5);
|
||||
ToAgent.Add<ReplyMessage>(127);
|
||||
|
||||
ToController.Add<RegisterAgentMessage>(0);
|
||||
ToController.Add<UnregisterAgentMessage>(1);
|
||||
@@ -32,15 +30,10 @@ public static class AgentMessageRegistries {
|
||||
ToController.Add<ReportAgentStatusMessage>(6);
|
||||
ToController.Add<ReportInstanceEventMessage>(7);
|
||||
ToController.Add<ReportInstancePlayerCountsMessage>(8);
|
||||
ToController.Add<ReplyMessage>(127);
|
||||
}
|
||||
|
||||
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgent, IMessageToController, ReplyMessage> {
|
||||
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToController, IMessageToAgent> {
|
||||
public MessageRegistry<IMessageToAgent> ToClient => ToAgent;
|
||||
public MessageRegistry<IMessageToController> ToServer => ToController;
|
||||
|
||||
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
|
||||
return new ReplyMessage(sequenceId, serializedReply);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,10 +0,0 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.Agent.BiDirectional;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record ReplyMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] byte[] SerializedReply
|
||||
) : IMessageToController, IMessageToAgent, IReply;
|
@@ -1,10 +0,0 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.Web.BiDirectional;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record ReplyMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] byte[] SerializedReply
|
||||
) : IMessageToController, IMessageToWeb, IReply;
|
@@ -7,7 +7,6 @@ using Phantom.Common.Data.Web.AuditLog;
|
||||
using Phantom.Common.Data.Web.EventLog;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Common.Data.Web.Users;
|
||||
using Phantom.Common.Messages.Web.BiDirectional;
|
||||
using Phantom.Common.Messages.Web.ToController;
|
||||
using Phantom.Common.Messages.Web.ToWeb;
|
||||
using Phantom.Utils.Logging;
|
||||
@@ -19,7 +18,7 @@ public static class WebMessageRegistries {
|
||||
public static MessageRegistry<IMessageToController> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
|
||||
public static MessageRegistry<IMessageToWeb> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
|
||||
|
||||
public static IMessageDefinitions<IMessageToWeb, IMessageToController, ReplyMessage> Definitions { get; } = new MessageDefinitions();
|
||||
public static IMessageDefinitions<IMessageToController, IMessageToWeb> Definitions { get; } = new MessageDefinitions();
|
||||
|
||||
static WebMessageRegistries() {
|
||||
ToController.Add<RegisterWebMessage>(0);
|
||||
@@ -42,22 +41,16 @@ public static class WebMessageRegistries {
|
||||
ToController.Add<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(17);
|
||||
ToController.Add<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(18);
|
||||
ToController.Add<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(19);
|
||||
ToController.Add<ReplyMessage>(127);
|
||||
|
||||
ToWeb.Add<RegisterWebResultMessage>(0);
|
||||
ToWeb.Add<RefreshAgentsMessage>(1);
|
||||
ToWeb.Add<RefreshInstancesMessage>(2);
|
||||
ToWeb.Add<InstanceOutputMessage>(3);
|
||||
ToWeb.Add<RefreshUserSessionMessage>(4);
|
||||
ToWeb.Add<ReplyMessage>(127);
|
||||
}
|
||||
|
||||
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWeb, IMessageToController, ReplyMessage> {
|
||||
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToController, IMessageToWeb> {
|
||||
public MessageRegistry<IMessageToWeb> ToClient => ToWeb;
|
||||
public MessageRegistry<IMessageToController> ToServer => ToController;
|
||||
|
||||
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
|
||||
return new ReplyMessage(sequenceId, serializedReply);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -21,7 +21,6 @@ using Phantom.Utils.Actor.Mailbox;
|
||||
using Phantom.Utils.Actor.Tasks;
|
||||
using Phantom.Utils.Collections;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
|
@@ -1,7 +1,6 @@
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
|
@@ -13,7 +13,6 @@ using Phantom.Controller.Minecraft;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
|
@@ -13,7 +13,7 @@ using Phantom.Controller.Services.Rpc;
|
||||
using Phantom.Controller.Services.Users;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime2;
|
||||
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
|
||||
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;
|
||||
|
||||
|
@@ -1,13 +1,11 @@
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Controller.Services.Agents;
|
||||
using Phantom.Controller.Services.Events;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
||||
|
@@ -4,7 +4,7 @@ using Phantom.Controller.Services.Agents;
|
||||
using Phantom.Controller.Services.Events;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime2;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
||||
|
@@ -2,7 +2,8 @@
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Messages.Agent.Handshake;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.New;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
@@ -5,7 +5,6 @@ using Phantom.Common.Messages.Web;
|
||||
using Phantom.Common.Messages.Web.ToWeb;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
||||
|
@@ -7,7 +7,6 @@ using Phantom.Common.Data.Web.AuditLog;
|
||||
using Phantom.Common.Data.Web.EventLog;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Common.Data.Web.Users;
|
||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Common.Messages.Web.ToController;
|
||||
using Phantom.Controller.Minecraft;
|
||||
@@ -17,7 +16,6 @@ using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Controller.Services.Users;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
||||
|
@@ -10,7 +10,7 @@ using Phantom.Controller.Services.Users;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime2;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Utils.Rpc.New;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Controller;
|
||||
|
||||
|
@@ -3,7 +3,8 @@ using Phantom.Utils.Cryptography;
|
||||
using Phantom.Utils.IO;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Monads;
|
||||
using Phantom.Utils.Rpc.New;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller;
|
||||
|
@@ -5,7 +5,7 @@ using Phantom.Controller.Services;
|
||||
using Phantom.Controller.Services.Rpc;
|
||||
using Phantom.Utils.IO;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.New;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Phantom.Utils.Tasks;
|
||||
|
||||
|
46
Utils/Phantom.Utils.Rpc/Frame/IFrame.cs
Normal file
46
Utils/Phantom.Utils.Rpc/Frame/IFrame.cs
Normal file
@@ -0,0 +1,46 @@
|
||||
using Phantom.Utils.Rpc.Frame.Types;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Frame;
|
||||
|
||||
interface IFrame {
|
||||
private const byte TypeMessageId = 1;
|
||||
private const byte TypeReplyId = 2;
|
||||
private const byte TypeErrorId = 3;
|
||||
|
||||
static readonly ReadOnlyMemory<byte> TypeMessage = new ([TypeMessageId]);
|
||||
static readonly ReadOnlyMemory<byte> TypeReply = new ([TypeReplyId]);
|
||||
static readonly ReadOnlyMemory<byte> TypeError = new ([TypeErrorId]);
|
||||
|
||||
internal static async Task ReadFrom(Stream stream, IFrameReader reader, CancellationToken cancellationToken) {
|
||||
byte[] oneByteBuffer = new byte[1];
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested) {
|
||||
await stream.ReadExactlyAsync(oneByteBuffer, cancellationToken);
|
||||
|
||||
switch (oneByteBuffer[0]) {
|
||||
case TypeMessageId:
|
||||
var messageFrame = await MessageFrame.Read(stream, cancellationToken);
|
||||
await reader.OnMessageFrame(messageFrame, stream, cancellationToken);
|
||||
break;
|
||||
|
||||
case TypeReplyId:
|
||||
var replyFrame = await ReplyFrame.Read(stream, cancellationToken);
|
||||
reader.OnReplyFrame(replyFrame);
|
||||
break;
|
||||
|
||||
case TypeErrorId:
|
||||
var errorFrame = await ErrorFrame.Read(stream, cancellationToken);
|
||||
reader.OnErrorFrame(errorFrame);
|
||||
break;
|
||||
|
||||
default:
|
||||
reader.OnUnknownFrameStart(oneByteBuffer[0]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ReadOnlyMemory<byte> Type { get; }
|
||||
|
||||
Task Write(Stream stream, CancellationToken cancellationToken);
|
||||
}
|
10
Utils/Phantom.Utils.Rpc/Frame/IFrameReader.cs
Normal file
10
Utils/Phantom.Utils.Rpc/Frame/IFrameReader.cs
Normal file
@@ -0,0 +1,10 @@
|
||||
using Phantom.Utils.Rpc.Frame.Types;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Frame;
|
||||
|
||||
interface IFrameReader {
|
||||
Task OnMessageFrame(MessageFrame frame, Stream stream, CancellationToken cancellationToken);
|
||||
void OnReplyFrame(ReplyFrame frame);
|
||||
void OnErrorFrame(ErrorFrame frame);
|
||||
void OnUnknownFrameStart(byte id);
|
||||
}
|
19
Utils/Phantom.Utils.Rpc/Frame/Types/ErrorFrame.cs
Normal file
19
Utils/Phantom.Utils.Rpc/Frame/Types/ErrorFrame.cs
Normal file
@@ -0,0 +1,19 @@
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Frame.Types;
|
||||
|
||||
sealed record ErrorFrame(uint ReplyingToMessageId, RpcError Error) : IFrame {
|
||||
public ReadOnlyMemory<byte> Type => IFrame.TypeError;
|
||||
|
||||
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||
await Serialization.WriteUnsignedInt(ReplyingToMessageId, stream, cancellationToken);
|
||||
await Serialization.WriteByte((byte) Error, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static async Task<ErrorFrame> Read(Stream stream, CancellationToken cancellationToken) {
|
||||
var replyingToMessageId = await Serialization.ReadUnsignedInt(stream, cancellationToken);
|
||||
var messageError = (RpcError) await Serialization.ReadByte(stream, cancellationToken);
|
||||
return new ErrorFrame(replyingToMessageId, messageError);
|
||||
}
|
||||
}
|
40
Utils/Phantom.Utils.Rpc/Frame/Types/MessageFrame.cs
Normal file
40
Utils/Phantom.Utils.Rpc/Frame/Types/MessageFrame.cs
Normal file
@@ -0,0 +1,40 @@
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Frame.Types;
|
||||
|
||||
sealed record MessageFrame(uint MessageId, ushort RegistryCode, ReadOnlyMemory<byte> SerializedMessage) : IFrame {
|
||||
public const int MaxMessageBytes = 1024 * 1024 * 8;
|
||||
|
||||
public ReadOnlyMemory<byte> Type => IFrame.TypeMessage;
|
||||
|
||||
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||
int messageLength = SerializedMessage.Length;
|
||||
CheckMessageLength(messageLength);
|
||||
|
||||
await Serialization.WriteUnsignedInt(MessageId, stream, cancellationToken);
|
||||
await Serialization.WriteUnsignedShort(RegistryCode, stream, cancellationToken);
|
||||
await Serialization.WriteSignedInt(messageLength, stream, cancellationToken);
|
||||
await stream.WriteAsync(SerializedMessage, cancellationToken);
|
||||
}
|
||||
|
||||
public static async Task<MessageFrame> Read(Stream stream, CancellationToken cancellationToken) {
|
||||
var messageId = await Serialization.ReadUnsignedInt(stream, cancellationToken);
|
||||
var registryCode = await Serialization.ReadUnsignedShort(stream, cancellationToken);
|
||||
var essageLength = await Serialization.ReadSignedInt(stream, cancellationToken);
|
||||
CheckMessageLength(essageLength);
|
||||
var serializedMessage = await Serialization.ReadBytes(essageLength, stream, cancellationToken);
|
||||
|
||||
return new MessageFrame(messageId, registryCode, serializedMessage);
|
||||
}
|
||||
|
||||
private static void CheckMessageLength(int messageLength) {
|
||||
if (messageLength < 0) {
|
||||
throw new RpcErrorException("Message length is negative", RpcError.InvalidData);
|
||||
}
|
||||
|
||||
if (messageLength > MaxMessageBytes) {
|
||||
throw new RpcErrorException("Message is too large: " + messageLength + " > " + MaxMessageBytes + " bytes", RpcError.MessageTooLarge);
|
||||
}
|
||||
}
|
||||
}
|
38
Utils/Phantom.Utils.Rpc/Frame/Types/ReplyFrame.cs
Normal file
38
Utils/Phantom.Utils.Rpc/Frame/Types/ReplyFrame.cs
Normal file
@@ -0,0 +1,38 @@
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Frame.Types;
|
||||
|
||||
sealed record ReplyFrame(uint ReplyingToMessageId, ReadOnlyMemory<byte> SerializedReply) : IFrame {
|
||||
public const int MaxReplyBytes = 1024 * 1024 * 32;
|
||||
|
||||
public ReadOnlyMemory<byte> Type => IFrame.TypeReply;
|
||||
|
||||
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||
int replyLength = SerializedReply.Length;
|
||||
CheckReplyLength(replyLength);
|
||||
|
||||
await Serialization.WriteUnsignedInt(ReplyingToMessageId, stream, cancellationToken);
|
||||
await Serialization.WriteSignedInt(replyLength, stream, cancellationToken);
|
||||
await stream.WriteAsync(SerializedReply, cancellationToken);
|
||||
}
|
||||
|
||||
public static async Task<ReplyFrame> Read(Stream stream, CancellationToken cancellationToken) {
|
||||
var replyingToMessageId = await Serialization.ReadUnsignedInt(stream, cancellationToken);
|
||||
var replyLength = await Serialization.ReadSignedInt(stream, cancellationToken);
|
||||
CheckReplyLength(replyLength);
|
||||
var reply = await Serialization.ReadBytes(replyLength, stream, cancellationToken);
|
||||
|
||||
return new ReplyFrame(replyingToMessageId, reply);
|
||||
}
|
||||
|
||||
private static void CheckReplyLength(int replyLength) {
|
||||
if (replyLength < 0) {
|
||||
throw new RpcErrorException("Reply length is negative", RpcError.InvalidData);
|
||||
}
|
||||
|
||||
if (replyLength > MaxReplyBytes) {
|
||||
throw new RpcErrorException("Reply is too large: " + replyLength + " > " + MaxReplyBytes + " bytes", RpcError.MessageTooLarge);
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,6 +1,6 @@
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public interface IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> : IReplyMessageFactory<TReplyMessage> where TReplyMessage : TClientMessage, TServerMessage {
|
||||
MessageRegistry<TClientMessage> ToClient { get; }
|
||||
MessageRegistry<TServerMessage> ToServer { get; }
|
||||
public interface IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> {
|
||||
MessageRegistry<TServerToClientMessage> ToClient { get; }
|
||||
MessageRegistry<TClientToServerMessage> ToServer { get; }
|
||||
}
|
||||
|
@@ -1,6 +0,0 @@
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public interface IReply {
|
||||
uint SequenceId { get; }
|
||||
byte[] SerializedReply { get; }
|
||||
}
|
@@ -1,5 +0,0 @@
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public interface IReplyMessageFactory<TReplyMessage> {
|
||||
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
|
||||
}
|
@@ -1,5 +0,0 @@
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
interface IReplySender {
|
||||
Task SendReply(uint sequenceId, byte[] serializedReply);
|
||||
}
|
@@ -1,35 +1,11 @@
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
sealed class MessageHandler<TMessageBase> {
|
||||
private readonly ILogger logger;
|
||||
private readonly ActorRef<TMessageBase> handlerActor;
|
||||
private readonly IReplySender replySender;
|
||||
interface MessageHandler<TMessageBase> {
|
||||
ActorRef<TMessageBase> Actor { get; }
|
||||
|
||||
public MessageHandler(string loggerName, ActorRef<TMessageBase> handlerActor, IReplySender replySender) {
|
||||
this.logger = PhantomLogger.Create("MessageHandler", loggerName);
|
||||
this.handlerActor = handlerActor;
|
||||
this.replySender = replySender;
|
||||
}
|
||||
|
||||
public void Tell(TMessageBase message) {
|
||||
handlerActor.Tell(message);
|
||||
}
|
||||
|
||||
public Task TellAndReply<TMessage, TReply>(TMessage message, uint sequenceId) where TMessage : ICanReply<TReply> {
|
||||
return handlerActor.Request(message).ContinueWith(task => {
|
||||
if (task.IsCompletedSuccessfully) {
|
||||
return replySender.SendReply(sequenceId, MessageSerializer.Serialize(task.Result));
|
||||
}
|
||||
|
||||
if (task.IsFaulted) {
|
||||
logger.Error(task.Exception, "Failed to handle message {Type}.", message.GetType().Name);
|
||||
}
|
||||
|
||||
return task;
|
||||
}, TaskScheduler.Default);
|
||||
}
|
||||
ValueTask OnReply<TMessage, TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) where TMessage : TMessageBase, ICanReply<TReply>;
|
||||
ValueTask OnError(uint messageId, RpcError error, CancellationToken cancellationToken);
|
||||
}
|
||||
|
@@ -1,22 +1,15 @@
|
||||
using System.Buffers;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Frame.Types;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public sealed class MessageRegistry<TMessageBase> {
|
||||
private const int DefaultBufferSize = 512;
|
||||
|
||||
private readonly ILogger logger;
|
||||
public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
||||
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
|
||||
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
|
||||
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TMessageBase>>> codeToHandlerMapping = new ();
|
||||
|
||||
public MessageRegistry(ILogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
private readonly Dictionary<ushort, Func<uint, ReadOnlyMemory<byte>, MessageHandler<TMessageBase>, CancellationToken, Task>> codeToHandlerMapping = new ();
|
||||
|
||||
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
|
||||
if (HasReplyType(typeof(TMessage))) {
|
||||
@@ -44,140 +37,64 @@ public sealed class MessageRegistry<TMessageBase> {
|
||||
return messageType.GetInterfaces().Any(type => type.FullName is {} name && name.StartsWith(replyInterfaceName, StringComparison.Ordinal));
|
||||
}
|
||||
|
||||
internal bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
|
||||
try {
|
||||
var code = MessageSerializer.ReadCode(ref data);
|
||||
return codeToTypeMapping.TryGetValue(code, out type);
|
||||
} catch (Exception) {
|
||||
type = null;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : TMessageBase {
|
||||
if (!GetMessageCode<TMessage>(out var code)) {
|
||||
return default;
|
||||
}
|
||||
|
||||
var buffer = new ArrayBufferWriter<byte>(DefaultBufferSize);
|
||||
|
||||
try {
|
||||
MessageSerializer.WriteCode(buffer, code);
|
||||
MessageSerializer.Serialize(buffer, message);
|
||||
|
||||
CheckWrittenBufferLength<TMessage>(buffer);
|
||||
return buffer.WrittenSpan;
|
||||
} catch (Exception e) {
|
||||
LogWriteFailure<TMessage>(e);
|
||||
return default;
|
||||
}
|
||||
}
|
||||
|
||||
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||
if (!GetMessageCode<TMessage>(out var code)) {
|
||||
return default;
|
||||
}
|
||||
|
||||
var buffer = new ArrayBufferWriter<byte>(DefaultBufferSize);
|
||||
|
||||
try {
|
||||
MessageSerializer.WriteCode(buffer, code);
|
||||
MessageSerializer.WriteSequenceId(buffer, sequenceId);
|
||||
MessageSerializer.Serialize(buffer, message);
|
||||
|
||||
CheckWrittenBufferLength<TMessage>(buffer);
|
||||
return buffer.WrittenSpan;
|
||||
} catch (Exception e) {
|
||||
LogWriteFailure<TMessage>(e);
|
||||
return default;
|
||||
}
|
||||
}
|
||||
|
||||
private bool GetMessageCode<TMessage>(out ushort code) where TMessage : TMessageBase {
|
||||
if (typeToCodeMapping.TryGetValue(typeof(TMessage), out code)) {
|
||||
return true;
|
||||
internal MessageFrame CreateFrame<TMessage>(uint messageId, TMessage message) where TMessage : TMessageBase {
|
||||
if (typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
|
||||
return new MessageFrame(messageId, code, Serialization.Serialize(message));
|
||||
}
|
||||
else {
|
||||
logger.Error("Unknown message type {Type}.", typeof(TMessage));
|
||||
return false;
|
||||
throw new ArgumentException("Unknown message type: " + typeof(TMessage));
|
||||
}
|
||||
}
|
||||
|
||||
private void CheckWrittenBufferLength<TMessage>(ArrayBufferWriter<byte> buffer) where TMessage : TMessageBase {
|
||||
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
|
||||
}
|
||||
}
|
||||
|
||||
private void LogWriteFailure<TMessage>(Exception e) where TMessage : TMessageBase {
|
||||
logger.Error(e, "Failed to serialize message {Type}.", typeof(TMessage).Name);
|
||||
}
|
||||
|
||||
internal bool Read<TMessage>(ReadOnlyMemory<byte> data, out TMessage message) where TMessage : TMessageBase {
|
||||
if (ReadTypeCode(ref data, out ushort code) && codeToTypeMapping.TryGetValue(code, out var expectedType) && expectedType == typeof(TMessage) && ReadMessage(data, out message)) {
|
||||
return true;
|
||||
internal async Task Handle(MessageFrame frame, MessageHandler<TMessageBase> handler, CancellationToken cancellationToken) {
|
||||
uint messageId = frame.MessageId;
|
||||
|
||||
if (codeToHandlerMapping.TryGetValue(frame.RegistryCode, out var action)) {
|
||||
await action(messageId, frame.SerializedMessage, handler, cancellationToken);
|
||||
}
|
||||
else {
|
||||
message = default!;
|
||||
return false;
|
||||
logger.Error("Unknown message code {Code} for message {MessageId}.", frame.RegistryCode, messageId);
|
||||
await handler.OnError(messageId, RpcError.UnknownMessageRegistryCode, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TMessageBase> handler) {
|
||||
if (!ReadTypeCode(ref data, out var code)) {
|
||||
private async Task DeserializationHandler<TMessage>(uint messageId, ReadOnlyMemory<byte> serializedMessage, MessageHandler<TMessageBase> handler, CancellationToken cancellationToken) where TMessage : TMessageBase {
|
||||
TMessage message;
|
||||
try {
|
||||
message = Serialization.Deserialize<TMessage>(serializedMessage);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not deserialize message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||
await handler.OnError(messageId, RpcError.MessageDeserializationError, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!codeToHandlerMapping.TryGetValue(code, out var handle)) {
|
||||
logger.Error("Unknown message code {Code}.", code);
|
||||
handler.Actor.Tell(message);
|
||||
}
|
||||
|
||||
private async Task DeserializationHandler<TMessage, TReply>(uint messageId, ReadOnlyMemory<byte> serializedMessage, MessageHandler<TMessageBase> handler, CancellationToken cancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||
TMessage message;
|
||||
try {
|
||||
message = Serialization.Deserialize<TMessage>(serializedMessage);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not deserialize message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||
await handler.OnError(messageId, RpcError.MessageDeserializationError, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
handle(data, code, handler);
|
||||
}
|
||||
|
||||
private bool ReadTypeCode(ref ReadOnlyMemory<byte> data, out ushort code) {
|
||||
TReply reply;
|
||||
try {
|
||||
code = MessageSerializer.ReadCode(ref data);
|
||||
return true;
|
||||
reply = await handler.Actor.Request(message, cancellationToken);
|
||||
} catch (Exception e) {
|
||||
code = default;
|
||||
logger.Error(e, "Failed to deserialize message code.");
|
||||
return false;
|
||||
logger.Error(e, "Could not handle message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||
await handler.OnError(messageId, RpcError.MessageHandlingError, cancellationToken);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private bool ReadSequenceId<TMessage, TReply>(ref ReadOnlyMemory<byte> data, out uint sequenceId) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||
|
||||
try {
|
||||
sequenceId = MessageSerializer.ReadSequenceId(ref data);
|
||||
return true;
|
||||
await handler.OnReply<TMessage, TReply>(messageId, reply, cancellationToken);
|
||||
} catch (Exception e) {
|
||||
sequenceId = default;
|
||||
logger.Error(e, "Failed to deserialize sequence ID of message {Type}.", typeof(TMessage).Name);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private bool ReadMessage<TMessage>(ReadOnlyMemory<byte> data, out TMessage message) where TMessage : TMessageBase {
|
||||
try {
|
||||
message = MessageSerializer.Deserialize<TMessage>(data);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
message = default!;
|
||||
logger.Error(e, "Failed to deserialize message {Type}.", typeof(TMessage).Name);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TMessageBase> handler) where TMessage : TMessageBase {
|
||||
if (ReadMessage<TMessage>(data, out var message)) {
|
||||
handler.Tell(message);
|
||||
}
|
||||
}
|
||||
|
||||
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TMessageBase> handler) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||
if (ReadSequenceId<TMessage, TReply>(ref data, out var sequenceId) && ReadMessage<TMessage>(data, out var message)) {
|
||||
handler.TellAndReply<TMessage, TReply>(message, sequenceId);
|
||||
logger.Error(e, "Could not reply to message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||
await handler.OnError(messageId, RpcError.MessageHandlingError, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,5 +1,7 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
using Phantom.Utils.Tasks;
|
||||
using Serilog;
|
||||
|
||||
@@ -7,55 +9,57 @@ namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
sealed class MessageReplyTracker {
|
||||
private readonly ILogger logger;
|
||||
private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16);
|
||||
|
||||
private uint lastSequenceId;
|
||||
private readonly ConcurrentDictionary<uint, TaskCompletionSource<ReadOnlyMemory<byte>>> replyTasks = new (concurrencyLevel: 4, capacity: 16);
|
||||
|
||||
internal MessageReplyTracker(string loggerName) {
|
||||
this.logger = PhantomLogger.Create<MessageReplyTracker>(loggerName);
|
||||
}
|
||||
|
||||
public uint RegisterReply() {
|
||||
var sequenceId = Interlocked.Increment(ref lastSequenceId);
|
||||
replyTasks[sequenceId] = AsyncTasks.CreateCompletionSource<byte[]>();
|
||||
return sequenceId;
|
||||
public void RegisterReply(uint messageId) {
|
||||
replyTasks[messageId] = AsyncTasks.CreateCompletionSource<ReadOnlyMemory<byte>>();
|
||||
}
|
||||
|
||||
public async Task<TReply> WaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) {
|
||||
if (!replyTasks.TryGetValue(sequenceId, out var completionSource)) {
|
||||
logger.Warning("No reply callback for id {SequenceId}.", sequenceId);
|
||||
throw new ArgumentException("No reply callback for id: " + sequenceId, nameof(sequenceId));
|
||||
public async Task<TReply> WaitForReply<TReply>(uint messageId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) {
|
||||
if (!replyTasks.TryGetValue(messageId, out var completionSource)) {
|
||||
logger.Warning("No reply callback for id {MessageId}.", messageId);
|
||||
throw new ArgumentException("No reply callback for id: " + messageId, nameof(messageId));
|
||||
}
|
||||
|
||||
try {
|
||||
byte[] replyBytes = await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
|
||||
return MessageSerializer.Deserialize<TReply>(replyBytes);
|
||||
ReadOnlyMemory<byte> serializedReply = await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
|
||||
return Serialization.Deserialize<TReply>(serializedReply);
|
||||
} catch (TimeoutException) {
|
||||
logger.Debug("Timed out waiting for reply with id {SequenceId}.", sequenceId);
|
||||
logger.Debug("Timed out waiting for reply with id {MessageId}.", messageId);
|
||||
throw;
|
||||
} catch (OperationCanceledException) {
|
||||
logger.Debug("Cancelled waiting for reply with id {SequenceId}.", sequenceId);
|
||||
logger.Debug("Cancelled waiting for reply with id {MessageId}.", messageId);
|
||||
throw;
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Error processing reply with id {SequenceId}.", sequenceId);
|
||||
logger.Warning(e, "Error processing reply with id {MessageId}.", messageId);
|
||||
throw;
|
||||
} finally {
|
||||
ForgetReply(sequenceId);
|
||||
ForgetReply(messageId);
|
||||
}
|
||||
}
|
||||
|
||||
public void ForgetReply(uint sequenceId) {
|
||||
if (replyTasks.TryRemove(sequenceId, out var task)) {
|
||||
public void ForgetReply(uint messageId) {
|
||||
if (replyTasks.TryRemove(messageId, out var task)) {
|
||||
task.SetCanceled();
|
||||
}
|
||||
}
|
||||
|
||||
public void ReceiveReply(uint sequenceId, byte[] serializedReply) {
|
||||
if (replyTasks.TryRemove(sequenceId, out var task)) {
|
||||
public void FailReply(uint messageId, RpcErrorException e) {
|
||||
if (replyTasks.TryRemove(messageId, out var task)) {
|
||||
task.SetException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void ReceiveReply(uint messageId, ReadOnlyMemory<byte> serializedReply) {
|
||||
if (replyTasks.TryRemove(messageId, out var task)) {
|
||||
task.SetResult(serializedReply);
|
||||
}
|
||||
else {
|
||||
logger.Warning("Received a reply with id {SequenceId} but no registered callback.", sequenceId);
|
||||
logger.Warning("Received a reply with id {MessageId} but no registered callback.", messageId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,45 +0,0 @@
|
||||
using System.Buffers;
|
||||
using System.Buffers.Binary;
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
static class MessageSerializer {
|
||||
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
|
||||
|
||||
public static byte[] Serialize<T>(T message) {
|
||||
return MemoryPackSerializer.Serialize(message, SerializerOptions);
|
||||
}
|
||||
|
||||
public static void Serialize<T>(IBufferWriter<byte> destination, T message) {
|
||||
MemoryPackSerializer.Serialize(typeof(T), destination, message, SerializerOptions);
|
||||
}
|
||||
|
||||
public static T Deserialize<T>(ReadOnlyMemory<byte> memory) {
|
||||
return MemoryPackSerializer.Deserialize<T>(memory.Span) ?? throw new NullReferenceException();
|
||||
}
|
||||
|
||||
public static void WriteCode(IBufferWriter<byte> destination, ushort value) {
|
||||
Span<byte> buffer = stackalloc byte[2];
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(buffer, value);
|
||||
destination.Write(buffer);
|
||||
}
|
||||
|
||||
public static ushort ReadCode(ref ReadOnlyMemory<byte> memory) {
|
||||
ushort value = BinaryPrimitives.ReadUInt16LittleEndian(memory.Span);
|
||||
memory = memory[2..];
|
||||
return value;
|
||||
}
|
||||
|
||||
public static void WriteSequenceId(IBufferWriter<byte> destination, uint sequenceId) {
|
||||
Span<byte> buffer = stackalloc byte[4];
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(buffer, sequenceId);
|
||||
destination.Write(buffer);
|
||||
}
|
||||
|
||||
public static uint ReadSequenceId(ref ReadOnlyMemory<byte> memory) {
|
||||
uint value = BinaryPrimitives.ReadUInt32LittleEndian(memory.Span);
|
||||
memory = memory[4..];
|
||||
return value;
|
||||
}
|
||||
}
|
@@ -1,17 +0,0 @@
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
sealed class ReplySender<TMessageBase, TReplyMessage> : IReplySender where TReplyMessage : TMessageBase {
|
||||
private readonly RpcConnection<TMessageBase> connection;
|
||||
private readonly IReplyMessageFactory<TReplyMessage> replyMessageFactory;
|
||||
|
||||
public ReplySender(RpcConnection<TMessageBase> connection, IReplyMessageFactory<TReplyMessage> replyMessageFactory) {
|
||||
this.connection = connection;
|
||||
this.replyMessageFactory = replyMessageFactory;
|
||||
}
|
||||
|
||||
public Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||
return connection.Send(replyMessageFactory.CreateReplyMessage(sequenceId, serializedReply));
|
||||
}
|
||||
}
|
@@ -1,100 +0,0 @@
|
||||
using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Authentication;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
|
||||
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage>(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake) {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||
|
||||
private bool loggedCertificateValidationError = false;
|
||||
|
||||
private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) {
|
||||
if (certificate == null || sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNotAvailable)) {
|
||||
logger.Error("Could not establish a secure connection, server did not provide a certificate.");
|
||||
}
|
||||
else if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch)) {
|
||||
logger.Error("Could not establish a secure connection, server certificate has the wrong name: {Name}", certificate.Subject);
|
||||
}
|
||||
else if (!certificateThumbprint.Check(certificate)) {
|
||||
logger.Error("Could not establish a secure connection, server certificate does not match.");
|
||||
}
|
||||
else if (TlsSupport.CheckAlgorithm((X509Certificate2) certificate) is {} error) {
|
||||
logger.Error("Could not establish a secure connection, server certificate rejected because it uses {ActualAlgorithmName} instead of {ExpectedAlgorithmName}.", error.ActualAlgorithmName, error.ExpectedAlgorithmName);
|
||||
}
|
||||
else if ((sslPolicyErrors & ~SslPolicyErrors.RemoteCertificateChainErrors) != SslPolicyErrors.None) {
|
||||
logger.Error("Could not establish a secure connection, server certificate validation failed.");
|
||||
}
|
||||
else {
|
||||
return true;
|
||||
}
|
||||
|
||||
loggedCertificateValidationError = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task<RpcClientConnection<TClientToServerMessage>?> Connect(CancellationToken shutdownToken) {
|
||||
SslClientAuthenticationOptions sslOptions = new () {
|
||||
AllowRenegotiation = false,
|
||||
AllowTlsResume = true,
|
||||
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
|
||||
EnabledSslProtocols = TlsSupport.SupportedProtocols,
|
||||
EncryptionPolicy = EncryptionPolicy.RequireEncryption,
|
||||
RemoteCertificateValidationCallback = ValidateServerCertificate,
|
||||
TargetHost = distinguishedName,
|
||||
};
|
||||
|
||||
try {
|
||||
using var clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
|
||||
|
||||
logger.Information("Connecting to {Host}:{Port}...", host, port);
|
||||
|
||||
try {
|
||||
await clientSocket.ConnectAsync(host, port, shutdownToken);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not connect.");
|
||||
return null;
|
||||
}
|
||||
|
||||
await using var stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
|
||||
|
||||
try {
|
||||
loggedCertificateValidationError = false;
|
||||
await stream.AuthenticateAsClientAsync(sslOptions, shutdownToken);
|
||||
} catch (AuthenticationException e) {
|
||||
if (!loggedCertificateValidationError) {
|
||||
logger.Error(e, "Could not establish a secure connection.");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
logger.Information("Established a secure connection.");
|
||||
|
||||
try {
|
||||
await handshake.AcceptServer(stream, shutdownToken);
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Could not perform application handshake, connection lost.");
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Could not perform application handshake.");
|
||||
return null;
|
||||
}
|
||||
// await stream.WriteAsync(new byte[] { 1, 2, 3 }, shutdownToken);
|
||||
|
||||
byte[] buffer = new byte[1024];
|
||||
int readBytes;
|
||||
while ((readBytes = await stream.ReadAsync(buffer, shutdownToken)) > 0) {}
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Client crashed with uncaught exception.");
|
||||
return null;
|
||||
} finally {
|
||||
logger.Information("Client stopped.");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
@@ -1,30 +0,0 @@
|
||||
using System.Threading.Channels;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
|
||||
public class RpcClientConnection<TClientToServerMessage>(string loggerName, CancellationToken shutdownCancellationToken) : IAsyncDisposable {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcClientConnection<TClientToServerMessage>>(loggerName);
|
||||
|
||||
private readonly Channel<TClientToServerMessage> sendQueue = Channel.CreateBounded<TClientToServerMessage>(new BoundedChannelOptions(500) {
|
||||
AllowSynchronousContinuations = false,
|
||||
FullMode = BoundedChannelFullMode.Wait,
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
});
|
||||
|
||||
public async Task WaitFor() {
|
||||
|
||||
}
|
||||
|
||||
public async Task Send(TClientToServerMessage message, CancellationToken cancellationToken) {
|
||||
if (!sendQueue.Writer.TryWrite(message)) {
|
||||
await sendQueue.Writer.WriteAsync(message, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync() {
|
||||
// TODO release managed resources here
|
||||
}
|
||||
}
|
@@ -1,15 +0,0 @@
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
|
||||
public static class Serialization {
|
||||
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
|
||||
|
||||
public static ValueTask Serialize<T>(T value, Stream stream, CancellationToken cancellationToken) {
|
||||
return MemoryPackSerializer.SerializeAsync(stream, value, SerializerOptions, cancellationToken);
|
||||
}
|
||||
|
||||
public static async ValueTask<T> Deserialize<T>(Stream stream, CancellationToken cancellationToken) {
|
||||
return (await MemoryPackSerializer.DeserializeAsync<T>(stream, SerializerOptions, cancellationToken))!;
|
||||
}
|
||||
}
|
@@ -1,8 +0,0 @@
|
||||
using NetMQ;
|
||||
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||
internal string LoggerName => "Rpc:" + ServiceName;
|
||||
internal string TcpUrl => "tcp://" + Host + ":" + Port;
|
||||
}
|
@@ -1,32 +0,0 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
static class RpcExtensions {
|
||||
public static ReadOnlyMemory<byte> Receive(this ClientSocket socket, CancellationToken cancellationToken) {
|
||||
var msg = new Msg();
|
||||
msg.InitEmpty();
|
||||
|
||||
try {
|
||||
socket.Receive(ref msg, cancellationToken);
|
||||
return msg.SliceAsMemory();
|
||||
} finally {
|
||||
// Only releases references, so the returned ReadOnlyMemory is safe.
|
||||
msg.Close();
|
||||
}
|
||||
}
|
||||
|
||||
public static (uint, ReadOnlyMemory<byte>) Receive(this ServerSocket socket, CancellationToken cancellationToken) {
|
||||
var msg = new Msg();
|
||||
msg.InitEmpty();
|
||||
|
||||
try {
|
||||
socket.Receive(ref msg, cancellationToken);
|
||||
return (msg.RoutingId, msg.SliceAsMemory());
|
||||
} finally {
|
||||
// Only releases references, so the returned ReadOnlyMemory is safe.
|
||||
msg.Close();
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,5 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
interface IRpcConnectionProvider {
|
||||
Task<Stream> GetStream();
|
||||
}
|
98
Utils/Phantom.Utils.Rpc/Runtime/RpcClient.cs
Normal file
98
Utils/Phantom.Utils.Rpc/Runtime/RpcClient.cs
Normal file
@@ -0,0 +1,98 @@
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Frame;
|
||||
using Phantom.Utils.Rpc.Frame.Types;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
|
||||
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
|
||||
RpcClientConnector connector = new RpcClientConnector(loggerName, host, port, distinguishedName, certificateThumbprint, handshake);
|
||||
RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
|
||||
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, messageDefinitions, connector, connection);
|
||||
}
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly MessageRegistry<TServerToClientMessage> serverToClientMessageRegistry;
|
||||
private readonly RpcClientConnection connection;
|
||||
|
||||
public RpcSendChannel<TClientToServerMessage> SendChannel { get; }
|
||||
|
||||
private RpcClient(string loggerName, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientConnector connector, RpcClientConnector.Connection connection) {
|
||||
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||
this.serverToClientMessageRegistry = messageDefinitions.ToClient;
|
||||
|
||||
this.connection = new RpcClientConnection(loggerName, connector, connection);
|
||||
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, this.connection, messageDefinitions.ToServer, sendQueueCapacity: 500);
|
||||
}
|
||||
|
||||
public async Task Listen(ActorRef<TServerToClientMessage> actor) {
|
||||
try {
|
||||
await connection.ReadConnection(stream => Receive(stream, new MessageHandlerImpl(SendChannel, actor)));
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
|
||||
private async Task Receive(Stream stream, MessageHandlerImpl handler) {
|
||||
await IFrame.ReadFrom(stream, new FrameReader(this, handler), CancellationToken.None);
|
||||
}
|
||||
|
||||
private sealed class FrameReader(RpcClient<TClientToServerMessage, TServerToClientMessage> client, MessageHandlerImpl handler) : IFrameReader {
|
||||
public Task OnMessageFrame(MessageFrame frame, Stream stream, CancellationToken cancellationToken) {
|
||||
return client.serverToClientMessageRegistry.Handle(frame, handler, cancellationToken);
|
||||
}
|
||||
|
||||
public void OnReplyFrame(ReplyFrame frame) {
|
||||
client.SendChannel.ReceiveReply(frame);
|
||||
}
|
||||
|
||||
public void OnErrorFrame(ErrorFrame frame) {
|
||||
client.SendChannel.ReceiveError(frame.ReplyingToMessageId, frame.Error);
|
||||
}
|
||||
|
||||
public void OnUnknownFrameStart(byte id) {
|
||||
client.logger.Error("Received unknown frame ID: {Id}", id);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class MessageHandlerImpl(RpcSendChannel<TClientToServerMessage> sendChannel, ActorRef<TServerToClientMessage> actor) : MessageHandler<TServerToClientMessage> {
|
||||
public ActorRef<TServerToClientMessage> Actor => actor;
|
||||
|
||||
public ValueTask OnReply<TMessage, TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) where TMessage : TServerToClientMessage, ICanReply<TReply> {
|
||||
return sendChannel.SendReply(messageId, reply, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask OnError(uint messageId, RpcError error, CancellationToken cancellationToken) {
|
||||
return sendChannel.SendError(messageId, error, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task Shutdown() {
|
||||
logger.Information("Shutting down client...");
|
||||
|
||||
try {
|
||||
await SendChannel.Close();
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Caught exception while closing send channel.");
|
||||
}
|
||||
|
||||
try {
|
||||
connection.Close();
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Caught exception while closing connection.");
|
||||
}
|
||||
|
||||
// TODO disconnection handshake?
|
||||
|
||||
logger.Information("Client shut down.");
|
||||
}
|
||||
|
||||
public void Dispose() {
|
||||
connection.Dispose();
|
||||
SendChannel.Dispose();
|
||||
}
|
||||
}
|
85
Utils/Phantom.Utils.Rpc/Runtime/RpcClientConnection.cs
Normal file
85
Utils/Phantom.Utils.Rpc/Runtime/RpcClientConnection.cs
Normal file
@@ -0,0 +1,85 @@
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
sealed class RpcClientConnection(string loggerName, RpcClientConnector connector, RpcClientConnector.Connection initialConnection) : IRpcConnectionProvider, IDisposable {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcClientConnection>(loggerName);
|
||||
|
||||
private readonly SemaphoreSlim semaphore = new (1);
|
||||
private RpcClientConnector.Connection currentConnection = initialConnection;
|
||||
|
||||
private readonly CancellationTokenSource newConnectionCancellationTokenSource = new ();
|
||||
|
||||
public async Task<Stream> GetStream() {
|
||||
return (await GetConnection()).Stream;
|
||||
}
|
||||
|
||||
private async Task<RpcClientConnector.Connection> GetConnection() {
|
||||
CancellationToken cancellationToken = newConnectionCancellationTokenSource.Token;
|
||||
|
||||
await semaphore.WaitAsync(cancellationToken);
|
||||
try {
|
||||
if (!currentConnection.Socket.Connected) {
|
||||
currentConnection = await connector.EstablishNewConnectionWithRetry(cancellationToken);
|
||||
}
|
||||
|
||||
return currentConnection;
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task ReadConnection(Func<Stream, Task> reader) {
|
||||
RpcClientConnector.Connection? connection = null;
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
connection?.Dispose();
|
||||
connection = null;
|
||||
|
||||
try {
|
||||
connection = await GetConnection();
|
||||
} catch (OperationCanceledException) {
|
||||
throw;
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Could not obtain a new connection.");
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await reader(connection.Stream);
|
||||
} catch (OperationCanceledException) {
|
||||
throw;
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Socket was closed.");
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Closing socket due to an exception while reading it.");
|
||||
|
||||
try {
|
||||
await connection.Shutdown();
|
||||
} catch (Exception e2) {
|
||||
logger.Error(e2, "Caught exception closing the socket.");
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
await connection.Disconnect(); // TODO what happens if already disconnected?
|
||||
} finally {
|
||||
connection.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Close() {
|
||||
newConnectionCancellationTokenSource.Cancel();
|
||||
}
|
||||
|
||||
public void Dispose() {
|
||||
semaphore.Dispose();
|
||||
newConnectionCancellationTokenSource.Dispose();
|
||||
}
|
||||
}
|
@@ -1,9 +0,0 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
sealed class RpcClientConnectionClosedEventArgs : EventArgs {
|
||||
internal uint RoutingId { get; }
|
||||
|
||||
internal RpcClientConnectionClosedEventArgs(uint routingId) {
|
||||
RoutingId = routingId;
|
||||
}
|
||||
}
|
173
Utils/Phantom.Utils.Rpc/Runtime/RpcClientConnector.cs
Normal file
173
Utils/Phantom.Utils.Rpc/Runtime/RpcClientConnector.cs
Normal file
@@ -0,0 +1,173 @@
|
||||
using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Authentication;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Collections;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
internal sealed class RpcClientConnector {
|
||||
private static readonly TimeSpan InitialRetryDelay = TimeSpan.FromMilliseconds(100);
|
||||
private static readonly TimeSpan MaximumRetryDelay = TimeSpan.FromSeconds(30);
|
||||
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly string host;
|
||||
private readonly ushort port;
|
||||
private readonly RpcCertificateThumbprint certificateThumbprint;
|
||||
private readonly RpcClientHandshake handshake;
|
||||
private readonly SslClientAuthenticationOptions sslOptions;
|
||||
|
||||
private bool loggedCertificateValidationError = false;
|
||||
|
||||
public RpcClientConnector(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake) {
|
||||
this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName);
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.certificateThumbprint = certificateThumbprint;
|
||||
this.handshake = handshake;
|
||||
|
||||
this.sslOptions = new SslClientAuthenticationOptions {
|
||||
AllowRenegotiation = false,
|
||||
AllowTlsResume = true,
|
||||
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
|
||||
EnabledSslProtocols = TlsSupport.SupportedProtocols,
|
||||
EncryptionPolicy = EncryptionPolicy.RequireEncryption,
|
||||
RemoteCertificateValidationCallback = ValidateServerCertificate,
|
||||
TargetHost = distinguishedName,
|
||||
};
|
||||
}
|
||||
|
||||
internal async Task<Connection> EstablishNewConnectionWithRetry(CancellationToken cancellationToken) {
|
||||
TimeSpan nextAttemptDelay = InitialRetryDelay;
|
||||
|
||||
while (true) {
|
||||
Connection? newConnection;
|
||||
try {
|
||||
newConnection = await EstablishNewConnection(cancellationToken);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Caught unhandled exception while connecting.");
|
||||
newConnection = null;
|
||||
}
|
||||
|
||||
if (newConnection != null) {
|
||||
return newConnection;
|
||||
}
|
||||
|
||||
logger.Warning("Failed to connect to server, trying again in {}.", nextAttemptDelay.TotalSeconds.ToString("F1"));
|
||||
|
||||
await Task.Delay(nextAttemptDelay, cancellationToken);
|
||||
nextAttemptDelay = Comparables.Min(nextAttemptDelay.Multiply(1.5), MaximumRetryDelay);
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<Connection?> EstablishNewConnection(CancellationToken cancellationToken) {
|
||||
logger.Information("Connecting to {Host}:{Port}...", host, port);
|
||||
|
||||
Socket clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
|
||||
try {
|
||||
await clientSocket.ConnectAsync(host, port, cancellationToken);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not connect.");
|
||||
throw;
|
||||
}
|
||||
|
||||
SslStream? stream = null;
|
||||
bool handledException = false;
|
||||
try {
|
||||
stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
|
||||
|
||||
try {
|
||||
loggedCertificateValidationError = false;
|
||||
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
|
||||
} catch (AuthenticationException e) {
|
||||
if (!loggedCertificateValidationError) {
|
||||
logger.Error(e, "Could not establish a secure connection.");
|
||||
}
|
||||
|
||||
handledException = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
logger.Information("Established a secure connection.");
|
||||
|
||||
try {
|
||||
await handshake.AcceptServer(stream, cancellationToken);
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Could not perform application handshake, connection lost.");
|
||||
handledException = true;
|
||||
throw;
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Could not perform application handshake.");
|
||||
handledException = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
return new Connection(clientSocket, stream);
|
||||
} catch (Exception e) {
|
||||
if (!handledException) {
|
||||
logger.Error(e, "Caught unhandled exception.");
|
||||
}
|
||||
|
||||
try {
|
||||
await DisconnectSocket(clientSocket, stream);
|
||||
} finally {
|
||||
clientSocket.Close();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) {
|
||||
if (certificate == null || sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNotAvailable)) {
|
||||
logger.Error("Could not establish a secure connection, server did not provide a certificate.");
|
||||
}
|
||||
else if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch)) {
|
||||
logger.Error("Could not establish a secure connection, server certificate has the wrong name: {Name}", certificate.Subject);
|
||||
}
|
||||
else if (!certificateThumbprint.Check(certificate)) {
|
||||
logger.Error("Could not establish a secure connection, server certificate does not match.");
|
||||
}
|
||||
else if (TlsSupport.CheckAlgorithm((X509Certificate2) certificate) is {} error) {
|
||||
logger.Error("Could not establish a secure connection, server certificate rejected because it uses {ActualAlgorithmName} instead of {ExpectedAlgorithmName}.", error.ActualAlgorithmName, error.ExpectedAlgorithmName);
|
||||
}
|
||||
else if ((sslPolicyErrors & ~SslPolicyErrors.RemoteCertificateChainErrors) != SslPolicyErrors.None) {
|
||||
logger.Error("Could not establish a secure connection, server certificate validation failed.");
|
||||
}
|
||||
else {
|
||||
return true;
|
||||
}
|
||||
|
||||
loggedCertificateValidationError = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
|
||||
if (stream != null) {
|
||||
await stream.DisposeAsync();
|
||||
}
|
||||
|
||||
using CancellationTokenSource timeoutTokenSource = new CancellationTokenSource(DisconnectTimeout);
|
||||
await socket.DisconnectAsync(reuseSocket: false, timeoutTokenSource.Token);
|
||||
}
|
||||
|
||||
internal sealed record Connection(Socket Socket, Stream Stream) : IDisposable {
|
||||
public async Task Disconnect() {
|
||||
await DisconnectSocket(Socket, Stream);
|
||||
}
|
||||
|
||||
public async ValueTask Shutdown() {
|
||||
await Stream.DisposeAsync();
|
||||
Socket.Shutdown(SocketShutdown.Both);
|
||||
}
|
||||
|
||||
public void Dispose() {
|
||||
Stream.Dispose();
|
||||
Socket.Close();
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcClientHandshake {
|
||||
protected internal abstract Task<bool> AcceptServer(Stream stream, CancellationToken cancellationToken);
|
@@ -1,72 +0,0 @@
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcClientRuntime<TClientMessage, TServerMessage, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : TClientMessage, TServerMessage {
|
||||
private readonly RpcConnectionToServer<TServerMessage> connection;
|
||||
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
||||
private readonly ActorRef<TClientMessage> handlerActor;
|
||||
|
||||
private readonly SemaphoreSlim disconnectSemaphore;
|
||||
private readonly CancellationToken receiveCancellationToken;
|
||||
|
||||
protected RpcClientRuntime(RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> socket, ActorRef<TClientMessage> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
|
||||
this.connection = socket.Connection;
|
||||
this.messageDefinitions = socket.MessageDefinitions;
|
||||
this.handlerActor = handlerActor;
|
||||
this.disconnectSemaphore = disconnectSemaphore;
|
||||
this.receiveCancellationToken = receiveCancellationToken;
|
||||
}
|
||||
|
||||
private protected sealed override Task Run(ClientSocket socket) {
|
||||
return RunWithConnection(socket, connection);
|
||||
}
|
||||
|
||||
protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerMessage> connection) {
|
||||
var replySender = new ReplySender<TServerMessage, TReplyMessage>(connection, messageDefinitions);
|
||||
var messageHandler = new MessageHandler<TClientMessage>(LoggerName, handlerActor, replySender);
|
||||
|
||||
try {
|
||||
while (!receiveCancellationToken.IsCancellationRequested) {
|
||||
var data = socket.Receive(receiveCancellationToken);
|
||||
|
||||
LogMessageType(RuntimeLogger, data);
|
||||
|
||||
if (data.Length > 0) {
|
||||
messageDefinitions.ToClient.Handle(data, messageHandler);
|
||||
}
|
||||
}
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
} finally {
|
||||
await handlerActor.Stop();
|
||||
RuntimeLogger.Debug("ZeroMQ client stopped receiving messages.");
|
||||
|
||||
await disconnectSemaphore.WaitAsync(CancellationToken.None);
|
||||
}
|
||||
}
|
||||
|
||||
private protected sealed override async Task Disconnect(ClientSocket socket) {
|
||||
await SendDisconnectMessage(socket, RuntimeLogger);
|
||||
}
|
||||
|
||||
protected abstract Task SendDisconnectMessage(ClientSocket socket, ILogger logger);
|
||||
|
||||
private void LogMessageType(ILogger logger, ReadOnlyMemory<byte> data) {
|
||||
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.Length > 0 && messageDefinitions.ToClient.TryGetType(data, out var type)) {
|
||||
logger.Verbose("Received {MessageType} ({Bytes} B).", type.Name, data.Length);
|
||||
}
|
||||
else {
|
||||
logger.Verbose("Received {Bytes} B message.", data.Length);
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,40 +0,0 @@
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcConnection<TMessageBase> {
|
||||
private readonly MessageRegistry<TMessageBase> messageRegistry;
|
||||
private readonly MessageReplyTracker replyTracker;
|
||||
|
||||
internal RpcConnection(MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) {
|
||||
this.messageRegistry = messageRegistry;
|
||||
this.replyTracker = replyTracker;
|
||||
}
|
||||
|
||||
private protected abstract ValueTask Send(byte[] bytes);
|
||||
|
||||
public async Task Send<TMessage>(TMessage message) where TMessage : TMessageBase {
|
||||
var bytes = messageRegistry.Write(message).ToArray();
|
||||
if (bytes.Length > 0) {
|
||||
await Send(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||
var sequenceId = replyTracker.RegisterReply();
|
||||
|
||||
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
||||
if (bytes.Length == 0) {
|
||||
replyTracker.ForgetReply(sequenceId);
|
||||
throw new ArgumentException("Could not write message.", nameof(message));
|
||||
}
|
||||
|
||||
await Send(bytes);
|
||||
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
|
||||
}
|
||||
|
||||
public void Receive(IReply message) {
|
||||
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
|
||||
}
|
||||
}
|
@@ -1,41 +0,0 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcConnectionToClient<TMessageBase> : RpcConnection<TMessageBase> {
|
||||
private readonly ServerSocket socket;
|
||||
private readonly uint routingId;
|
||||
|
||||
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
|
||||
private bool isClosed;
|
||||
|
||||
internal RpcConnectionToClient(ServerSocket socket, uint routingId, MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) : base(messageRegistry, replyTracker) {
|
||||
this.socket = socket;
|
||||
this.routingId = routingId;
|
||||
}
|
||||
|
||||
public bool IsSame(RpcConnectionToClient<TMessageBase> other) {
|
||||
return this.routingId == other.routingId && this.socket == other.socket;
|
||||
}
|
||||
|
||||
public void Close() {
|
||||
bool hasClosed = false;
|
||||
|
||||
lock (this) {
|
||||
if (!isClosed) {
|
||||
isClosed = true;
|
||||
hasClosed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasClosed) {
|
||||
Closed?.Invoke(this, new RpcClientConnectionClosedEventArgs(routingId));
|
||||
}
|
||||
}
|
||||
|
||||
private protected override ValueTask Send(byte[] bytes) {
|
||||
return socket.SendAsync(routingId, bytes);
|
||||
}
|
||||
}
|
@@ -1,25 +0,0 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Tasks;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcConnectionToServer<TMessageBase> : RpcConnection<TMessageBase> {
|
||||
private readonly ClientSocket socket;
|
||||
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
|
||||
|
||||
public Task IsReady => isReady.Task;
|
||||
|
||||
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TMessageBase> messageRegistry, MessageReplyTracker replyTracker) : base(messageRegistry, replyTracker) {
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
public void SetIsReady() {
|
||||
isReady.TrySetResult();
|
||||
}
|
||||
|
||||
private protected override ValueTask Send(byte[] bytes) {
|
||||
return socket.SendAsync(bytes);
|
||||
}
|
||||
}
|
9
Utils/Phantom.Utils.Rpc/Runtime/RpcError.cs
Normal file
9
Utils/Phantom.Utils.Rpc/Runtime/RpcError.cs
Normal file
@@ -0,0 +1,9 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public enum RpcError : byte {
|
||||
InvalidData = 0,
|
||||
UnknownMessageRegistryCode = 1,
|
||||
MessageDeserializationError = 2,
|
||||
MessageHandlingError = 3,
|
||||
MessageTooLarge = 4,
|
||||
}
|
5
Utils/Phantom.Utils.Rpc/Runtime/RpcErrorException.cs
Normal file
5
Utils/Phantom.Utils.Rpc/Runtime/RpcErrorException.cs
Normal file
@@ -0,0 +1,5 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcErrorException(string message, RpcError error) : Exception(message) {
|
||||
public RpcError Error => error;
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public enum RpcHandshakeResult : byte {
|
||||
UnknownError = 0,
|
5
Utils/Phantom.Utils.Rpc/Runtime/RpcReceiveChannel.cs
Normal file
5
Utils/Phantom.Utils.Rpc/Runtime/RpcReceiveChannel.cs
Normal file
@@ -0,0 +1,5 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
sealed class RpcReceiveChannel {
|
||||
|
||||
}
|
@@ -1,75 +0,0 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
sealed class RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand>, IWithStash where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||
public readonly record struct Init(
|
||||
string LoggerName,
|
||||
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> MessageDefinitions,
|
||||
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> RegistrationHandler,
|
||||
RpcConnectionToClient<TClientMessage> Connection
|
||||
);
|
||||
|
||||
public static Props<ReceiveMessageCommand> Factory(Init init) {
|
||||
return Props<ReceiveMessageCommand>.Create(() => new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(init), new ActorConfiguration {
|
||||
SupervisorStrategy = SupervisorStrategies.Resume,
|
||||
StashCapacity = 100
|
||||
});
|
||||
}
|
||||
|
||||
public IStash Stash { get; set; } = null!;
|
||||
|
||||
private readonly string loggerName;
|
||||
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
||||
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
|
||||
private readonly RpcConnectionToClient<TClientMessage> connection;
|
||||
|
||||
private RpcReceiverActor(Init init) {
|
||||
this.loggerName = init.LoggerName;
|
||||
this.messageDefinitions = init.MessageDefinitions;
|
||||
this.registrationHandler = init.RegistrationHandler;
|
||||
this.connection = init.Connection;
|
||||
|
||||
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
|
||||
}
|
||||
|
||||
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data);
|
||||
|
||||
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
|
||||
if (command.MessageType == typeof(TRegistrationMessage)) {
|
||||
await HandleRegistrationMessage(command);
|
||||
}
|
||||
else if (Stash.IsFull) {
|
||||
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
|
||||
}
|
||||
else {
|
||||
Stash.Stash();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleRegistrationMessage(ReceiveMessageCommand command) {
|
||||
if (!messageDefinitions.ToServer.Read(command.Data, out TRegistrationMessage message)) {
|
||||
return;
|
||||
}
|
||||
|
||||
var props = await registrationHandler.TryRegister(connection, message);
|
||||
if (props == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
var handlerActor = Context.ActorOf(props, "Handler");
|
||||
var replySender = new ReplySender<TClientMessage, TReplyMessage>(connection, messageDefinitions);
|
||||
BecomeAuthorized(new MessageHandler<TServerMessage>(loggerName, handlerActor, replySender));
|
||||
}
|
||||
|
||||
private void BecomeAuthorized(MessageHandler<TServerMessage> handler) {
|
||||
Stash.UnstashAll();
|
||||
|
||||
Become(() => {
|
||||
Receive<ReceiveMessageCommand>(command => messageDefinitions.ToServer.Handle(command.Data, handler));
|
||||
});
|
||||
}
|
||||
}
|
@@ -1,50 +0,0 @@
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using NetMQ;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
|
||||
private readonly TSocket socket;
|
||||
|
||||
private protected string LoggerName { get; }
|
||||
private protected ILogger RuntimeLogger { get; }
|
||||
private protected MessageReplyTracker ReplyTracker { get; }
|
||||
|
||||
protected RpcRuntime(RpcSocket<TSocket> socket) {
|
||||
this.socket = socket.Socket;
|
||||
|
||||
this.LoggerName = socket.Config.LoggerName;
|
||||
this.RuntimeLogger = PhantomLogger.Create(LoggerName);
|
||||
this.ReplyTracker = socket.ReplyTracker;
|
||||
}
|
||||
|
||||
protected async Task Launch() {
|
||||
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
|
||||
async Task RunTask() {
|
||||
try {
|
||||
await Run(socket);
|
||||
} catch (Exception e) {
|
||||
RuntimeLogger.Error(e, "Caught exception in RPC thread.");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
} finally {
|
||||
await Disconnect(socket);
|
||||
|
||||
socket.Dispose();
|
||||
RuntimeLogger.Information("ZeroMQ runtime stopped.");
|
||||
}
|
||||
}
|
||||
|
||||
private protected abstract Task Run(TSocket socket);
|
||||
|
||||
private protected abstract Task Disconnect(TSocket socket);
|
||||
}
|
128
Utils/Phantom.Utils.Rpc/Runtime/RpcSendChannel.cs
Normal file
128
Utils/Phantom.Utils.Rpc/Runtime/RpcSendChannel.cs
Normal file
@@ -0,0 +1,128 @@
|
||||
using System.Threading.Channels;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Frame;
|
||||
using Phantom.Utils.Rpc.Frame.Types;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Runtime.Utils;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
||||
private readonly IRpcConnectionProvider connectionProvider;
|
||||
private readonly MessageRegistry<TMessageBase> messageRegistry;
|
||||
private readonly MessageReplyTracker messageReplyTracker;
|
||||
|
||||
private readonly Channel<IFrame> sendQueue;
|
||||
private readonly Task sendQueueTask;
|
||||
|
||||
private readonly CancellationTokenSource cancellationTokenSource = new ();
|
||||
|
||||
private uint nextMessageId;
|
||||
|
||||
internal RpcSendChannel(string loggerName, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry, int sendQueueCapacity) {
|
||||
this.connectionProvider = connectionProvider;
|
||||
this.messageRegistry = messageRegistry;
|
||||
this.messageReplyTracker = new MessageReplyTracker(loggerName);
|
||||
|
||||
this.sendQueue = Channel.CreateBounded<IFrame>(new BoundedChannelOptions(sendQueueCapacity) {
|
||||
AllowSynchronousContinuations = false,
|
||||
FullMode = BoundedChannelFullMode.Wait,
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
});
|
||||
|
||||
this.sendQueueTask = ProcessSendQueue();
|
||||
}
|
||||
|
||||
public bool TrySendMessage<TMessage>(TMessage message) where TMessage : TMessageBase {
|
||||
return sendQueue.Writer.TryWrite(NextMessageFrame(message));
|
||||
}
|
||||
|
||||
public async ValueTask SendMessage<TMessage>(TMessage message, CancellationToken cancellationToken) where TMessage : TMessageBase {
|
||||
await SendFrame(NextMessageFrame(message), cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<TReply> SendMessage<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||
MessageFrame frame = NextMessageFrame(message);
|
||||
uint messageId = frame.MessageId;
|
||||
|
||||
messageReplyTracker.RegisterReply(messageId);
|
||||
try {
|
||||
await SendFrame(frame, cancellationToken);
|
||||
} catch (Exception) {
|
||||
messageReplyTracker.ForgetReply(messageId);
|
||||
throw;
|
||||
}
|
||||
|
||||
return await messageReplyTracker.WaitForReply<TReply>(messageId, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
|
||||
internal async ValueTask SendReply<TReply>(uint replyingToMessageId, TReply reply, CancellationToken cancellationToken) {
|
||||
await SendFrame(new ReplyFrame(replyingToMessageId, Serialization.Serialize(reply)), cancellationToken);
|
||||
}
|
||||
|
||||
internal async ValueTask SendError(uint replyingToMessageId, RpcError error, CancellationToken cancellationToken) {
|
||||
await SendFrame(new ErrorFrame(replyingToMessageId, error), cancellationToken);
|
||||
}
|
||||
|
||||
private async ValueTask SendFrame(IFrame frame, CancellationToken cancellationToken) {
|
||||
if (!sendQueue.Writer.TryWrite(frame)) {
|
||||
await sendQueue.Writer.WriteAsync(frame, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private MessageFrame NextMessageFrame<T>(T message) where T : TMessageBase {
|
||||
uint messageId = Interlocked.Increment(ref nextMessageId);
|
||||
return messageRegistry.CreateFrame(messageId, message);
|
||||
}
|
||||
|
||||
private async Task ProcessSendQueue() {
|
||||
CancellationToken cancellationToken = cancellationTokenSource.Token;
|
||||
|
||||
// TODO figure out cancellation
|
||||
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync(cancellationToken)) {
|
||||
while (!cancellationToken.IsCancellationRequested) {
|
||||
Stream stream;
|
||||
try {
|
||||
stream = await connectionProvider.GetStream();
|
||||
} catch (OperationCanceledException) {
|
||||
throw;
|
||||
} catch (Exception) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await stream.WriteAsync(frame.Type, cancellationToken);
|
||||
await frame.Write(stream, cancellationToken);
|
||||
await stream.FlushAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal void ReceiveReply(ReplyFrame frame) {
|
||||
messageReplyTracker.ReceiveReply(frame.ReplyingToMessageId, frame.SerializedReply);
|
||||
}
|
||||
|
||||
internal void ReceiveError(uint messageId, RpcError error) {
|
||||
messageReplyTracker.FailReply(messageId, error switch {
|
||||
RpcError.UnknownMessageRegistryCode => new RpcErrorException("Unknown message registry code", error),
|
||||
RpcError.MessageDeserializationError => new RpcErrorException("Message deserialization error", error),
|
||||
RpcError.MessageHandlingError => new RpcErrorException("Message handling error", error),
|
||||
_ => new RpcErrorException("Unknown error", error),
|
||||
});
|
||||
}
|
||||
|
||||
internal async Task Close() {
|
||||
sendQueue.Writer.TryComplete();
|
||||
|
||||
try {
|
||||
await sendQueueTask;
|
||||
} catch (Exception) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose() {
|
||||
sendQueueTask.Dispose();
|
||||
cancellationTokenSource.Dispose();
|
||||
}
|
||||
}
|
@@ -3,13 +3,13 @@ using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCertificate certificate, RpcServerHandshake handshake) {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcServer>(loggerName);
|
||||
|
||||
private readonly List<Client> clients = [];
|
||||
|
||||
public async Task<bool> Run(CancellationToken shutdownToken) {
|
||||
@@ -73,6 +73,8 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
}
|
||||
|
||||
private sealed class Client {
|
||||
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
public Task Task { get; }
|
||||
|
||||
private readonly ILogger logger;
|
||||
@@ -115,7 +117,16 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
int readBytes;
|
||||
while ((readBytes = await stream.ReadAsync(buffer, shutdownToken)) > 0) {}
|
||||
} finally {
|
||||
socket.Close();
|
||||
try {
|
||||
using var timeoutTokenSource = new CancellationTokenSource(DisconnectTimeout);
|
||||
await socket.DisconnectAsync(reuseSocket: false, timeoutTokenSource.Token);
|
||||
} catch (OperationCanceledException) {
|
||||
logger.Error("Could not disconnect client socket, disconnection timed out.");
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not disconnect client socket.");
|
||||
} finally {
|
||||
socket.Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,7 +1,8 @@
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Monads;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcServerCertificate {
|
||||
public static byte[] CreateAndExport(string commonName) {
|
@@ -1,4 +1,4 @@
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcServerHandshake {
|
||||
protected internal abstract Task<bool> AcceptClient(string remoteAddress, Stream stream, CancellationToken cancellationToken);
|
@@ -1,3 +1,3 @@
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
public sealed record DisallowedAlgorithmError(string ExpectedAlgorithmName, string ActualAlgorithmName);
|
@@ -2,7 +2,7 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
public sealed record RpcCertificateThumbprint {
|
||||
private const int Length = 20;
|
@@ -3,7 +3,7 @@ using System.Security.Cryptography;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Monads;
|
||||
|
||||
namespace Phantom.Utils.Rpc.New;
|
||||
namespace Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
/// <summary>
|
||||
/// <para>
|
22
Utils/Phantom.Utils.Rpc/Runtime/Utils/RentedMemory.cs
Normal file
22
Utils/Phantom.Utils.Rpc/Runtime/Utils/RentedMemory.cs
Normal file
@@ -0,0 +1,22 @@
|
||||
using System.Buffers;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime.Utils;
|
||||
|
||||
readonly record struct RentedMemory<T>(T[] Array, int Length) : IDisposable {
|
||||
public Span<T> AsSpan => Array.AsSpan(..Length);
|
||||
public Memory<T> AsMemory => Array.AsMemory(..Length);
|
||||
|
||||
public void Dispose() {
|
||||
ArrayPool<T>.Shared.Return(Array);
|
||||
}
|
||||
|
||||
public static RentedMemory<T> Rent(int bytes) {
|
||||
T[] buffer = ArrayPool<T>.Shared.Rent(bytes);
|
||||
try {
|
||||
return new RentedMemory<T>(buffer, bytes);
|
||||
} catch (Exception) {
|
||||
ArrayPool<T>.Shared.Return(buffer);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
69
Utils/Phantom.Utils.Rpc/Runtime/Utils/Serialization.cs
Normal file
69
Utils/Phantom.Utils.Rpc/Runtime/Utils/Serialization.cs
Normal file
@@ -0,0 +1,69 @@
|
||||
using System.Buffers;
|
||||
using System.Buffers.Binary;
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime.Utils;
|
||||
|
||||
public static class Serialization {
|
||||
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
|
||||
|
||||
private static async ValueTask WritePrimitive<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
|
||||
using var buffer = RentedMemory<byte>.Rent(size);
|
||||
writer(buffer.AsSpan, value);
|
||||
await stream.WriteAsync(buffer.AsMemory, cancellationToken);
|
||||
}
|
||||
|
||||
private static async ValueTask<T> ReadPrimitive<T>(Func<ReadOnlySpan<byte>, T> reader, int size, Stream stream, CancellationToken cancellationToken) {
|
||||
using var buffer = RentedMemory<byte>.Rent(size);
|
||||
await stream.ReadExactlyAsync(buffer.AsMemory, cancellationToken);
|
||||
return reader(buffer.AsSpan);
|
||||
}
|
||||
|
||||
public static ValueTask WriteByte(byte value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(byte), static (span, value) => span[0] = value, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<byte> ReadByte(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(static span => span[0], sizeof(byte), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteUnsignedShort(ushort value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(ushort), BinaryPrimitives.WriteUInt16LittleEndian, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<ushort> ReadUnsignedShort(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(BinaryPrimitives.ReadUInt16LittleEndian, sizeof(ushort), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteSignedInt(int value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(int), BinaryPrimitives.WriteInt32LittleEndian, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<int> ReadSignedInt(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(BinaryPrimitives.ReadInt32LittleEndian, sizeof(int), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteUnsignedInt(uint value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(uint), BinaryPrimitives.WriteUInt32LittleEndian, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<uint> ReadUnsignedInt(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static async ValueTask<ReadOnlyMemory<byte>> ReadBytes(int length, Stream stream, CancellationToken cancellationToken) {
|
||||
Memory<byte> buffer = new byte[length];
|
||||
await stream.ReadExactlyAsync(buffer, cancellationToken);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public static ReadOnlyMemory<byte> Serialize<T>(T value) {
|
||||
var buffer = new ArrayBufferWriter<byte>();
|
||||
MemoryPackSerializer.Serialize(buffer, value, SerializerOptions);
|
||||
return buffer.WrittenMemory;
|
||||
}
|
||||
|
||||
public static T Deserialize<T>(ReadOnlyMemory<byte> buffer) {
|
||||
return MemoryPackSerializer.Deserialize<T>(buffer.Span, SerializerOptions)!;
|
||||
}
|
||||
}
|
@@ -1,6 +1,6 @@
|
||||
using Phantom.Utils.Actor;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
namespace Phantom.Utils.Rpc.Runtime2;
|
||||
|
||||
public interface IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> where TRegistrationMessage : TServerMessage {
|
||||
Task<Props<TServerMessage>?> TryRegister(RpcConnectionToClient<TClientMessage> connection, TRegistrationMessage message);
|
@@ -3,12 +3,10 @@ using Akka.Actor;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
namespace Phantom.Utils.Rpc.Runtime2;
|
||||
|
||||
public static class RpcServerRuntime {
|
||||
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
|
@@ -1,42 +0,0 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Sockets;
|
||||
|
||||
public static class RpcClientSocket {
|
||||
public static RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> Connect<TClientMessage, TServerMessage, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||
return RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : TClientMessage, TServerMessage {
|
||||
internal static RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : TServerMessage {
|
||||
var socket = new ClientSocket();
|
||||
var options = socket.Options;
|
||||
|
||||
options.CurveServerCertificate = config.ServerCertificate;
|
||||
options.CurveCertificate = new NetMQCertificate();
|
||||
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
|
||||
RpcSocket.SetDefaultSocketOptions(options);
|
||||
|
||||
var url = config.TcpUrl;
|
||||
var logger = PhantomLogger.Create(config.LoggerName);
|
||||
|
||||
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
||||
socket.Connect(url);
|
||||
logger.Information("ZeroMQ client ready.");
|
||||
|
||||
return new RpcClientSocket<TClientMessage, TServerMessage, TReplyMessage>(socket, config, messageDefinitions);
|
||||
}
|
||||
|
||||
public RpcConnectionToServer<TServerMessage> Connection { get; }
|
||||
internal IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> MessageDefinitions { get; }
|
||||
|
||||
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions) : base(socket, config) {
|
||||
MessageDefinitions = messageDefinitions;
|
||||
Connection = new RpcConnectionToServer<TServerMessage>(socket, messageDefinitions.ToServer, ReplyTracker);
|
||||
}
|
||||
}
|
@@ -1,26 +0,0 @@
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Logging;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Sockets;
|
||||
|
||||
sealed class RpcServerSocket : RpcSocket<ServerSocket> {
|
||||
public static RpcServerSocket Connect(RpcConfiguration config) {
|
||||
var socket = new ServerSocket();
|
||||
var options = socket.Options;
|
||||
|
||||
options.CurveServer = true;
|
||||
options.CurveCertificate = config.ServerCertificate;
|
||||
RpcSocket.SetDefaultSocketOptions(options);
|
||||
|
||||
var url = config.TcpUrl;
|
||||
var logger = PhantomLogger.Create(config.LoggerName);
|
||||
|
||||
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
||||
socket.Bind(url);
|
||||
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
|
||||
|
||||
return new RpcServerSocket(socket, config);
|
||||
}
|
||||
|
||||
private RpcServerSocket(ServerSocket socket, RpcConfiguration config) : base(socket, config) {}
|
||||
}
|
@@ -1,25 +0,0 @@
|
||||
using NetMQ;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Sockets;
|
||||
|
||||
static class RpcSocket {
|
||||
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
|
||||
// TODO test behavior when either agent or server are offline for a very long time
|
||||
options.DelayAttachOnConnect = true;
|
||||
options.ReceiveHighWatermark = 10_000;
|
||||
options.SendHighWatermark = 10_000;
|
||||
}
|
||||
}
|
||||
|
||||
public abstract class RpcSocket<TSocket> where TSocket : ThreadSafeSocket {
|
||||
internal TSocket Socket { get; }
|
||||
internal RpcConfiguration Config { get; }
|
||||
internal MessageReplyTracker ReplyTracker { get; }
|
||||
|
||||
protected RpcSocket(TSocket socket, RpcConfiguration config) {
|
||||
Socket = socket;
|
||||
Config = config;
|
||||
ReplyTracker = new MessageReplyTracker(config.LoggerName);
|
||||
}
|
||||
}
|
11
Utils/Phantom.Utils/Collections/Comparables.cs
Normal file
11
Utils/Phantom.Utils/Collections/Comparables.cs
Normal file
@@ -0,0 +1,11 @@
|
||||
namespace Phantom.Utils.Collections;
|
||||
|
||||
public static class Comparables {
|
||||
public static T Min<T>(T first, T second) where T : IComparable<T> {
|
||||
return first.CompareTo(second) < 0 ? first : second;
|
||||
}
|
||||
|
||||
public static T Max<T>(T first, T second) where T : IComparable<T> {
|
||||
return first.CompareTo(second) < 0 ? second : first;
|
||||
}
|
||||
}
|
@@ -1,6 +1,5 @@
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Web.Services.Rpc;
|
||||
|
||||
|
@@ -1,8 +1,6 @@
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Common.Messages.Web.BiDirectional;
|
||||
using Phantom.Common.Messages.Web.ToWeb;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Web.Services.Agents;
|
||||
using Phantom.Web.Services.Authentication;
|
||||
using Phantom.Web.Services.Instances;
|
||||
|
@@ -1,7 +1,6 @@
|
||||
using Akka.Actor;
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Tasks;
|
||||
using Phantom.Web.Services.Agents;
|
||||
using Phantom.Web.Services.Authentication;
|
||||
|
@@ -1,11 +1,7 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Common.Messages.Web.BiDirectional;
|
||||
using Phantom.Common.Messages.Web.ToController;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
using ILogger = Serilog.ILogger;
|
||||
|
||||
namespace Phantom.Web.Services.Rpc;
|
||||
|
@@ -1,6 +1,5 @@
|
||||
using Microsoft.AspNetCore.DataProtection;
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Web.Services;
|
||||
using Serilog;
|
||||
using ILogger = Serilog.ILogger;
|
||||
|
Reference in New Issue
Block a user