diff --git a/Controller/Phantom.Controller.Services/Agents/AgentDatabaseStorageActor.cs b/Controller/Phantom.Controller.Services/Agents/AgentDatabaseStorageActor.cs index 2fe95a1..fdd0483 100644 --- a/Controller/Phantom.Controller.Services/Agents/AgentDatabaseStorageActor.cs +++ b/Controller/Phantom.Controller.Services/Agents/AgentDatabaseStorageActor.cs @@ -38,7 +38,7 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor. private sealed record FlushChangesCommand : ICommand; private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) { - this.configurationToStore = command.Configuration; + configurationToStore = command.Configuration; ScheduleFlush(TimeSpan.FromSeconds(2)); } @@ -72,11 +72,9 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor. } private void ScheduleFlush(TimeSpan delay) { - if (hasScheduledFlush) { - return; + if (!hasScheduledFlush) { + hasScheduledFlush = true; + Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self); } - - hasScheduledFlush = true; - Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self); } } diff --git a/Controller/Phantom.Controller.Services/ControllerServices.cs b/Controller/Phantom.Controller.Services/ControllerServices.cs index e3839ad..7e9abc2 100644 --- a/Controller/Phantom.Controller.Services/ControllerServices.cs +++ b/Controller/Phantom.Controller.Services/ControllerServices.cs @@ -12,9 +12,7 @@ using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Rpc; using Phantom.Controller.Services.Users; using Phantom.Utils.Actor; -using Phantom.Utils.Logging; using Phantom.Utils.Rpc.Runtime; -using Phantom.Utils.Tasks; using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController; using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController; @@ -23,7 +21,6 @@ namespace Phantom.Controller.Services; public sealed class ControllerServices : IDisposable { public ActorSystem ActorSystem { get; } - private TaskManager TaskManager { get; } private ControllerState ControllerState { get; } private MinecraftVersions MinecraftVersions { get; } @@ -51,7 +48,6 @@ public sealed class ControllerServices : IDisposable { this.ActorSystem = ActorSystemFactory.Create("Controller"); - this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>()); this.ControllerState = new ControllerState(); this.MinecraftVersions = new MinecraftVersions(); @@ -65,7 +61,7 @@ public sealed class ControllerServices : IDisposable { this.UserRoleManager = new UserRoleManager(dbProvider); this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager); this.AuditLogManager = new AuditLogManager(dbProvider); - this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken); + this.EventLogManager = new EventLogManager(ActorSystem, dbProvider, shutdownCancellationToken); this.AgentRegistrationHandler = new AgentRegistrationHandler(AgentManager, InstanceLogManager, EventLogManager); this.WebRegistrationHandler = new WebRegistrationHandler(webAuthToken, ControllerState, InstanceLogManager, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, MinecraftVersions, EventLogManager); diff --git a/Controller/Phantom.Controller.Services/Events/EventLogDatabaseStorageActor.cs b/Controller/Phantom.Controller.Services/Events/EventLogDatabaseStorageActor.cs new file mode 100644 index 0000000..ba83f25 --- /dev/null +++ b/Controller/Phantom.Controller.Services/Events/EventLogDatabaseStorageActor.cs @@ -0,0 +1,77 @@ +using Phantom.Common.Data.Web.EventLog; +using Phantom.Controller.Database; +using Phantom.Controller.Database.Repositories; +using Phantom.Utils.Actor; +using Phantom.Utils.Logging; +using Serilog; + +namespace Phantom.Controller.Services.Events; + +sealed class EventLogDatabaseStorageActor : ReceiveActor<EventLogDatabaseStorageActor.ICommand> { + private static readonly ILogger Logger = PhantomLogger.Create<EventLogDatabaseStorageActor>(); + + public readonly record struct Init(IDbContextProvider DbProvider, CancellationToken CancellationToken); + + public static Props<ICommand> Factory(Init init) { + return Props<ICommand>.Create(() => new EventLogDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume }); + } + + private readonly IDbContextProvider dbProvider; + private readonly CancellationToken cancellationToken; + + private readonly LinkedList<StoreEventCommand> pendingCommands = new (); + private bool hasScheduledFlush = false; + + private EventLogDatabaseStorageActor(Init init) { + this.dbProvider = init.DbProvider; + this.cancellationToken = init.CancellationToken; + + Receive<StoreEventCommand>(StoreEvent); + ReceiveAsync<FlushChangesCommand>(FlushChanges); + } + + public interface ICommand {} + + public sealed record StoreEventCommand(Guid EventGuid, DateTime UtcTime, Guid? AgentGuid, EventLogEventType EventType, string SubjectId, Dictionary<string, object?>? Extra = null) : ICommand; + + private sealed record FlushChangesCommand : ICommand; + + private void StoreEvent(StoreEventCommand command) { + pendingCommands.AddLast(command); + ScheduleFlush(TimeSpan.FromMilliseconds(500)); + } + + private async Task FlushChanges(FlushChangesCommand command) { + hasScheduledFlush = false; + + if (pendingCommands.Count == 0) { + return; + } + + try { + await using var db = dbProvider.Lazy(); + var eventLogRepository = new EventLogRepository(db); + + foreach (var (eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra) in pendingCommands) { + eventLogRepository.AddItem(eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra); + } + + await db.Ctx.SaveChangesAsync(cancellationToken); + } catch (Exception e) { + ScheduleFlush(TimeSpan.FromSeconds(10)); + Logger.Error(e, "Could not store {EventCount} event(s) in database.", pendingCommands.Count); + return; + } + + Logger.Information("Stored {EventCount} event(s) in database.", pendingCommands.Count); + + pendingCommands.Clear(); + } + + private void ScheduleFlush(TimeSpan delay) { + if (!hasScheduledFlush) { + hasScheduledFlush = true; + Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self); + } + } +} diff --git a/Controller/Phantom.Controller.Services/Events/EventLogManager.cs b/Controller/Phantom.Controller.Services/Events/EventLogManager.cs index a6d74ee..7bad97e 100644 --- a/Controller/Phantom.Controller.Services/Events/EventLogManager.cs +++ b/Controller/Phantom.Controller.Services/Events/EventLogManager.cs @@ -1,32 +1,27 @@ using System.Collections.Immutable; +using Akka.Actor; using Phantom.Common.Data.Web.EventLog; using Phantom.Controller.Database; using Phantom.Controller.Database.Repositories; -using Phantom.Utils.Tasks; +using Phantom.Utils.Actor; namespace Phantom.Controller.Services.Events; sealed partial class EventLogManager { + private readonly ActorRef<EventLogDatabaseStorageActor.ICommand> databaseStorageActor; private readonly IDbContextProvider dbProvider; - private readonly TaskManager taskManager; private readonly CancellationToken cancellationToken; - public EventLogManager(IDbContextProvider dbProvider, TaskManager taskManager, CancellationToken cancellationToken) { + public EventLogManager(IActorRefFactory actorSystem, IDbContextProvider dbProvider, CancellationToken cancellationToken) { + this.databaseStorageActor = actorSystem.ActorOf(EventLogDatabaseStorageActor.Factory(new EventLogDatabaseStorageActor.Init(dbProvider, cancellationToken)), "EventLogDatabaseStorage"); this.dbProvider = dbProvider; - this.taskManager = taskManager; this.cancellationToken = cancellationToken; } - public void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { - taskManager.Run("Store event log item to database", () => AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra)); + private void EnqueueItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { + databaseStorageActor.Tell(new EventLogDatabaseStorageActor.StoreEventCommand(eventGuid, utcTime, agentGuid, eventType, subjectId, extra)); } - public async Task AddItem(Guid eventGuid, DateTime utcTime, Guid? agentGuid, EventLogEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { - await using var db = dbProvider.Lazy(); - new EventLogRepository(db).AddItem(eventGuid, utcTime, agentGuid, eventType, subjectId, extra); - await db.Ctx.SaveChangesAsync(cancellationToken); - } - public async Task<ImmutableArray<EventLogItem>> GetMostRecentItems(int count) { await using var db = dbProvider.Lazy(); return await new EventLogRepository(db).GetMostRecentItems(count, cancellationToken);