1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-04-22 10:15:48 +02:00

Migrate event log to Akka.NET

This commit is contained in:
chylex 2024-03-22 03:10:24 +01:00
parent 7cdb0a1910
commit 02828a91c6
Signed by: chylex
GPG Key ID: 4DE42C8F19A80548
4 changed files with 89 additions and 23 deletions

View File

@ -38,7 +38,7 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
private sealed record FlushChangesCommand : ICommand;
private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) {
this.configurationToStore = command.Configuration;
configurationToStore = command.Configuration;
ScheduleFlush(TimeSpan.FromSeconds(2));
}
@ -72,11 +72,9 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
}
private void ScheduleFlush(TimeSpan delay) {
if (hasScheduledFlush) {
return;
if (!hasScheduledFlush) {
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}

View File

@ -12,9 +12,7 @@ using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;
@ -23,7 +21,6 @@ namespace Phantom.Controller.Services;
public sealed class ControllerServices : IDisposable {
public ActorSystem ActorSystem { get; }
private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; }
@ -51,7 +48,6 @@ public sealed class ControllerServices : IDisposable {
this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions();
@ -65,7 +61,7 @@ public sealed class ControllerServices : IDisposable {
this.UserRoleManager = new UserRoleManager(dbProvider);
this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager);
this.AuditLogManager = new AuditLogManager(dbProvider);
this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken);
this.EventLogManager = new EventLogManager(ActorSystem, dbProvider, shutdownCancellationToken);
this.AgentRegistrationHandler = new AgentRegistrationHandler(AgentManager, InstanceLogManager, EventLogManager);
this.WebRegistrationHandler = new WebRegistrationHandler(webAuthToken, ControllerState, InstanceLogManager, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, MinecraftVersions, EventLogManager);

View File

@ -0,0 +1,77 @@
using Phantom.Common.Data.Web.EventLog;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Events;
sealed class EventLogDatabaseStorageActor : ReceiveActor<EventLogDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<EventLogDatabaseStorageActor>();
public readonly record struct Init(IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new EventLogDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly LinkedList<StoreEventCommand> pendingCommands = new ();
private bool hasScheduledFlush = false;
private EventLogDatabaseStorageActor(Init init) {
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<StoreEventCommand>(StoreEvent);
ReceiveAsync<FlushChangesCommand>(FlushChanges);
}
public interface ICommand {}
public sealed record StoreEventCommand(Guid EventGuid, DateTime UtcTime, Guid? AgentGuid, EventLogEventType EventType, string SubjectId, Dictionary<string, object?>? Extra = null) : ICommand;
private sealed record FlushChangesCommand : ICommand;
private void StoreEvent(StoreEventCommand command) {
pendingCommands.AddLast(command);
ScheduleFlush(TimeSpan.FromMilliseconds(500));
}
private async Task FlushChanges(FlushChangesCommand command) {
hasScheduledFlush = false;
if (pendingCommands.Count == 0) {
return;
}
try {
await using var db = dbProvider.Lazy();
var eventLogRepository = new EventLogRepository(db);
foreach (var (eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra) in pendingCommands) {
eventLogRepository.AddItem(eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
} catch (Exception e) {
ScheduleFlush(TimeSpan.FromSeconds(10));
Logger.Error(e, "Could not store {EventCount} event(s) in database.", pendingCommands.Count);
return;
}
Logger.Information("Stored {EventCount} event(s) in database.", pendingCommands.Count);
pendingCommands.Clear();
}
private void ScheduleFlush(TimeSpan delay) {
if (!hasScheduledFlush) {
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}
}

View File

@ -1,32 +1,27 @@
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data.Web.EventLog;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Tasks;
using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Events;
sealed partial class EventLogManager {
private readonly ActorRef<EventLogDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly IDbContextProvider dbProvider;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
public EventLogManager(IDbContextProvider dbProvider, TaskManager taskManager, CancellationToken cancellationToken) {
public EventLogManager(IActorRefFactory actorSystem, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.databaseStorageActor = actorSystem.ActorOf(EventLogDatabaseStorageActor.Factory(new EventLogDatabaseStorageActor.Init(dbProvider, cancellationToken)), "EventLogDatabaseStorage");
this.dbProvider = dbProvider;
this.taskManager = taskManager;
this.cancellationToken = cancellationToken;
}
public void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
taskManager.Run("Store event log item to database", () => AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra));
private void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
databaseStorageActor.Tell(new EventLogDatabaseStorageActor.StoreEventCommand(eventGuid, utcTime, agentGuid, eventType, subjectId, extra));
}
public async Task AddItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
await using var db = dbProvider.Lazy();
new EventLogRepository(db).AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
public async Task<ImmutableArray<EventLogItem>> GetMostRecentItems(int count) {
await using var db = dbProvider.Lazy();
return await new EventLogRepository(db).GetMostRecentItems(count, cancellationToken);