1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-03-02 07:22:29 +01:00
This commit is contained in:
chylex 2023-10-31 08:47:00 +01:00
parent cd332a6571
commit f14d7f5590
Signed by: chylex
GPG Key ID: 4DE42C8F19A80548
4 changed files with 30 additions and 11 deletions
Agent/Phantom.Agent.Rpc
Controller/Phantom.Controller.Rpc
Utils/Phantom.Utils.Rpc

View File

@ -18,8 +18,4 @@ public sealed class ControllerConnection {
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.Send(message);
}
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
}

View File

@ -66,7 +66,7 @@ public sealed class RpcConnectionToClient<TListener> {
}
await socket.SendAsync(routingId, bytes);
return await messageReplyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
return await messageReplyTracker.TryWaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}
public void Receive(IReply message) {

View File

@ -20,26 +20,36 @@ public sealed class MessageReplyTracker {
return sequenceId;
}
public async Task<TReply?> WaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TReply : class {
public async Task<TReply> WaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) {
if (!replyTasks.TryGetValue(sequenceId, out var completionSource)) {
logger.Warning("No reply callback for id {SequenceId}.", sequenceId);
return null;
throw new ArgumentException("No reply callback for id: " + sequenceId, nameof(sequenceId));
}
try {
byte[] replyBytes = await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
return MessageSerializer.Deserialize<TReply>(replyBytes);
} catch (TimeoutException) {
return null;
logger.Debug("Timed out waiting for reply with id {SequenceId}.", sequenceId);
throw;
} catch (OperationCanceledException) {
return null;
logger.Debug("Cancelled waiting for reply with id {SequenceId}.", sequenceId);
throw;
} catch (Exception e) {
logger.Warning(e, "Error processing reply with id {SequenceId}.", sequenceId);
return null;
throw;
} finally {
ForgetReply(sequenceId);
}
}
public async Task<TReply?> TryWaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TReply : class {
try {
return await WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
} catch (Exception) {
return null;
}
}
public void ForgetReply(uint sequenceId) {
if (replyTasks.TryRemove(sequenceId, out var task)) {

View File

@ -22,7 +22,7 @@ public sealed class RpcConnectionToServer<TListener> {
}
}
public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
public async Task<TReply?> TrySend<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
@ -31,6 +31,19 @@ public sealed class RpcConnectionToServer<TListener> {
return null;
}
await socket.SendAsync(bytes);
return await replyTracker.TryWaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}
public async Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId);
throw new ArgumentException("Could not write message.", nameof(message));
}
await socket.SendAsync(bytes);
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}