1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-05-05 21:34:05 +02:00

Refactor sequence IDs in message replies

This commit is contained in:
chylex 2023-01-24 00:48:07 +01:00
parent c8a2a539e8
commit f4aec6f11d
Signed by: chylex
GPG Key ID: 4DE42C8F19A80548
17 changed files with 107 additions and 69 deletions

View File

@ -15,29 +15,24 @@ public sealed class RpcServerConnection {
this.replyTracker = replyTracker;
}
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToServer<TReply> {
return MessageRegistries.ToServer.Write<TMessage, TReply>(message).ToArray();
}
internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
var bytes = WriteBytes<TMessage, NoReply>(message);
var bytes = MessageRegistries.ToServer.Write(message).ToArray();
if (bytes.Length > 0) {
await socket.SendAsync(bytes);
}
}
internal async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
internal async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
var sequenceId = replyTracker.RegisterReply();
var message = messageFactory(sequenceId);
var bytes = WriteBytes<TMessage, TReply>(message);
var bytes = MessageRegistries.ToServer.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId);
return null;
}
await socket.SendAsync(bytes);
return await replyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
}
public void Receive(ReplyMessage message) {

View File

@ -28,7 +28,7 @@ public static class ServerMessaging {
return CurrentConnectionOrThrow.Send(message);
}
public static Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
return CurrentConnectionOrThrow.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
}
}

View File

@ -4,6 +4,4 @@ namespace Phantom.Common.Messages;
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}
public interface IMessageToAgent : IMessageToAgent<NoReply> {
uint IMessage<IMessageToAgentListener, NoReply>.SequenceId => 0;
}
public interface IMessageToAgent : IMessageToAgent<NoReply> {}

View File

@ -1,9 +1,7 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages;
namespace Phantom.Common.Messages;
public interface IMessageToServer<TReply> : IMessage<IMessageToServerListener, TReply> {}
public interface IMessageToServer : IMessageToServer<NoReply> {
uint IMessage<IMessageToServerListener, NoReply>.SequenceId => 0;
}
public interface IMessageToServer : IMessageToServer<NoReply> {}

View File

@ -12,20 +12,20 @@ public static class MessageRegistries {
public static MessageRegistry<IMessageToServerListener> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
static MessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage, NoReply>(0);
ToAgent.Add<RegisterAgentFailureMessage, NoReply>(1);
ToAgent.Add<RegisterAgentSuccessMessage>(0);
ToAgent.Add<RegisterAgentFailureMessage>(1);
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
ToAgent.Add<ReplyMessage, NoReply>(127);
ToAgent.Add<ReplyMessage>(127);
ToServer.Add<RegisterAgentMessage, NoReply>(0);
ToServer.Add<UnregisterAgentMessage, NoReply>(1);
ToServer.Add<AgentIsAliveMessage, NoReply>(2);
ToServer.Add<AdvertiseJavaRuntimesMessage, NoReply>(3);
ToServer.Add<ReportInstanceStatusMessage, NoReply>(4);
ToServer.Add<InstanceOutputMessage, NoReply>(5);
ToServer.Add<ReplyMessage, NoReply>(127);
ToServer.Add<RegisterAgentMessage>(0);
ToServer.Add<UnregisterAgentMessage>(1);
ToServer.Add<AgentIsAliveMessage>(2);
ToServer.Add<AdvertiseJavaRuntimesMessage>(3);
ToServer.Add<ReportInstanceStatusMessage>(4);
ToServer.Add<InstanceOutputMessage>(5);
ToServer.Add<ReplyMessage>(127);
}
}

View File

@ -6,8 +6,7 @@ namespace Phantom.Common.Messages.ToAgent;
[MemoryPackable]
public sealed partial record ConfigureInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleConfigureInstance(this);

View File

@ -5,8 +5,7 @@ namespace Phantom.Common.Messages.ToAgent;
[MemoryPackable]
public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] Guid InstanceGuid
[property: MemoryPackOrder(0)] Guid InstanceGuid
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleLaunchInstance(this);

View File

@ -5,9 +5,8 @@ namespace Phantom.Common.Messages.ToAgent;
[MemoryPackable]
public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] string Command
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] string Command
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleSendCommandToInstance(this);

View File

@ -6,9 +6,8 @@ namespace Phantom.Common.Messages.ToAgent;
[MemoryPackable]
public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] MinecraftStopStrategy StopStrategy
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] MinecraftStopStrategy StopStrategy
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleStopInstance(this);

View File

@ -34,29 +34,32 @@ public sealed class RpcClientConnection {
}
}
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<TReply> {
return isClosed ? Array.Empty<byte>() : MessageRegistries.ToAgent.Write<TMessage, TReply>(message).ToArray();
}
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
var bytes = WriteBytes<TMessage, NoReply>(message);
if (isClosed) {
return;
}
var bytes = MessageRegistries.ToAgent.Write(message).ToArray();
if (bytes.Length > 0) {
await socket.SendAsync(routingId, bytes);
}
}
public async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
var sequenceId = messageReplyTracker.RegisterReply();
var message = messageFactory(sequenceId);
public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
if (isClosed) {
return null;
}
var bytes = WriteBytes<TMessage, TReply>(message);
var sequenceId = messageReplyTracker.RegisterReply();
var bytes = MessageRegistries.ToAgent.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
messageReplyTracker.ForgetReply(sequenceId);
return null;
}
await socket.SendAsync(routingId, bytes);
return await messageReplyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
return await messageReplyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
}
public void Receive(ReplyMessage message) {

View File

@ -22,7 +22,7 @@ sealed class AgentConnection {
return connection.Send(message);
}
public Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
}
}

View File

@ -113,14 +113,14 @@ public sealed class AgentManager {
}
}
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
if (connection == null) {
// TODO handle missing agent?
return null;
}
return await connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
}
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {

View File

@ -3,6 +3,7 @@ using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToAgent;
using Phantom.Common.Minecraft;
using Phantom.Server.Database;
@ -67,7 +68,7 @@ public sealed class InstanceManager {
var agentName = agent.Name;
var reply = (await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, sequenceId => new ConfigureInstanceMessage(sequenceId, configuration), TimeSpan.FromSeconds(10))).DidNotReplyIfNull();
var reply = (await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, new ConfigureInstanceMessage(configuration), TimeSpan.FromSeconds(10))).DidNotReplyIfNull();
if (reply.Is(ConfigureInstanceResult.Success)) {
using (var scope = databaseProvider.CreateScope()) {
InstanceEntity entity = scope.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
@ -119,6 +120,11 @@ public sealed class InstanceManager {
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(Instance instance, TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
var reply = await agentManager.SendMessage<TMessage, InstanceActionResult<TReply>>(instance.Configuration.AgentGuid, message, TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
}
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid instanceGuid) {
var instance = GetInstance(instanceGuid);
if (instance == null) {
@ -127,8 +133,7 @@ public sealed class InstanceManager {
await SetInstanceShouldLaunchAutomatically(instanceGuid, true);
var reply = await agentManager.SendMessage<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new LaunchInstanceMessage(sequenceId, instanceGuid), TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
return await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(instance, new LaunchInstanceMessage(instanceGuid));
}
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
@ -139,8 +144,7 @@ public sealed class InstanceManager {
await SetInstanceShouldLaunchAutomatically(instanceGuid, false);
var reply = await agentManager.SendMessage<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new StopInstanceMessage(sequenceId, instanceGuid, stopStrategy), TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
return await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(instance, new StopInstanceMessage(instanceGuid, stopStrategy));
}
private async Task SetInstanceShouldLaunchAutomatically(Guid instanceGuid, bool shouldLaunchAutomatically) {
@ -162,8 +166,7 @@ public sealed class InstanceManager {
return InstanceActionResult.General<SendCommandToInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
var reply = await agentManager.SendMessage<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new SendCommandToInstanceMessage(sequenceId, instanceGuid, command), TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
return await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(instance, new SendCommandToInstanceMessage(instanceGuid, command));
}
internal ImmutableArray<InstanceConfiguration> GetInstanceConfigurationsForAgent(Guid agentGuid) {

View File

@ -1,6 +1,5 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessage<TListener, TReply> {
public uint SequenceId { get; }
Task<TReply> Accept(TListener listener);
}

View File

@ -17,22 +17,22 @@ public abstract class MessageHandler<TListener> {
this.cancellationToken = cancellationToken;
}
internal void Enqueue<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
cancellationToken.ThrowIfCancellationRequested();
taskManager.Run(async () => {
try {
await Handle<TMessage, TReply>(message);
await Handle<TMessage, TReply>(sequenceId, message);
} catch (Exception e) {
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
}
});
}
private async Task Handle<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply = await message.Accept(Listener);
if (reply is not NoReply) {
await SendReply(message.SequenceId, MessageSerializer.Serialize(reply));
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
}
}

View File

@ -17,10 +17,23 @@ public sealed class MessageRegistry<TListener> {
this.logger = logger;
}
public void Add<TMessage>(ushort code) where TMessage : IMessage<TListener, NoReply> {
AddTypeCodeMapping<TMessage, NoReply>(code);
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage>);
}
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
if (typeof(TReply) == typeof(NoReply)) {
throw new InvalidOperationException("This overload of Add must not be used with NoReply as the reply type!");
}
AddTypeCodeMapping<TMessage, TReply>(code);
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage, TReply>);
}
private void AddTypeCodeMapping<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
typeToCodeMapping.Add(typeof(TMessage), code);
codeToTypeMapping.Add(code, typeof(TMessage));
codeToHandlerMapping.Add(code, HandleInternal<TMessage, TReply>);
}
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
@ -34,10 +47,10 @@ public sealed class MessageRegistry<TListener> {
}
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return Write<TMessage, NoReply>(message);
return Write<TMessage, NoReply>(0, message);
}
public ReadOnlySpan<byte> Write<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
logger.Error("Unknown message type {Type}.", typeof(TMessage));
return default;
@ -47,6 +60,11 @@ public sealed class MessageRegistry<TListener> {
try {
MessageSerializer.WriteCode(buffer, code);
if (typeof(TReply) != typeof(NoReply)) {
MessageSerializer.WriteSequenceId(buffer, sequenceId);
}
MessageSerializer.Serialize(buffer, message);
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
@ -77,7 +95,23 @@ public sealed class MessageRegistry<TListener> {
handle(data, code, handler);
}
private void HandleInternal<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, NoReply> {
DeserializeAndEnqueueMessage<TMessage, NoReply>(data, code, handler, 0);
}
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
uint sequenceId;
try {
sequenceId = MessageSerializer.ReadSequenceId(ref data);
} catch (Exception e) {
logger.Error(e, "Failed to deserialize sequence ID of message with code {Code}.", code);
return;
}
DeserializeAndEnqueueMessage<TMessage, TReply>(data, code, handler, sequenceId);
}
private void DeserializeAndEnqueueMessage<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler, uint sequenceId) where TMessage : IMessage<TListener, TReply> {
TMessage message;
try {
message = MessageSerializer.Deserialize<TMessage>(data);
@ -86,6 +120,6 @@ public sealed class MessageRegistry<TListener> {
return;
}
handler.Enqueue<TMessage, TReply>(message);
handler.Enqueue<TMessage, TReply>(sequenceId, message);
}
}

View File

@ -30,4 +30,16 @@ static class MessageSerializer {
memory = memory[2..];
return value;
}
public static void WriteSequenceId(IBufferWriter<byte> destination, uint sequenceId) {
Span<byte> buffer = stackalloc byte[4];
BinaryPrimitives.WriteUInt32LittleEndian(buffer, sequenceId);
destination.Write(buffer);
}
public static uint ReadSequenceId(ref ReadOnlyMemory<byte> memory) {
uint value = BinaryPrimitives.ReadUInt32LittleEndian(memory.Span);
memory = memory[4..];
return value;
}
}