mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-08-16 21:31:45 +02:00
.config
.run
.workdir
Agent
Common
Controller
Phantom.Controller
Phantom.Controller.Database
Phantom.Controller.Database.Postgres
Phantom.Controller.Minecraft
Phantom.Controller.Services
Agents
Events
EventLogDatabaseStorageActor.cs
EventLogManager.InstanceEventVisitor.cs
EventLogManager.cs
Instances
Rpc
Users
ControllerServices.cs
ControllerState.cs
Phantom.Controller.Services.csproj
Docker
Utils
Web
.dockerignore
.gitattributes
.gitignore
AddMigration.bat
AddMigration.sh
Directory.Build.props
Directory.Build.targets
Dockerfile
LICENSE
Packages.props
PhantomPanel.sln
README.md
global.json
78 lines
2.6 KiB
C#
78 lines
2.6 KiB
C#
using Phantom.Common.Data.Web.EventLog;
|
|
using Phantom.Controller.Database;
|
|
using Phantom.Controller.Database.Repositories;
|
|
using Phantom.Utils.Actor;
|
|
using Phantom.Utils.Logging;
|
|
using Serilog;
|
|
|
|
namespace Phantom.Controller.Services.Events;
|
|
|
|
sealed class EventLogDatabaseStorageActor : ReceiveActor<EventLogDatabaseStorageActor.ICommand> {
|
|
private static readonly ILogger Logger = PhantomLogger.Create<EventLogDatabaseStorageActor>();
|
|
|
|
public readonly record struct Init(IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
|
|
|
public static Props<ICommand> Factory(Init init) {
|
|
return Props<ICommand>.Create(() => new EventLogDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
|
}
|
|
|
|
private readonly IDbContextProvider dbProvider;
|
|
private readonly CancellationToken cancellationToken;
|
|
|
|
private readonly LinkedList<StoreEventCommand> pendingCommands = new ();
|
|
private bool hasScheduledFlush = false;
|
|
|
|
private EventLogDatabaseStorageActor(Init init) {
|
|
this.dbProvider = init.DbProvider;
|
|
this.cancellationToken = init.CancellationToken;
|
|
|
|
Receive<StoreEventCommand>(StoreEvent);
|
|
ReceiveAsync<FlushChangesCommand>(FlushChanges);
|
|
}
|
|
|
|
public interface ICommand {}
|
|
|
|
public sealed record StoreEventCommand(Guid EventGuid, DateTime UtcTime, Guid? AgentGuid, EventLogEventType EventType, string SubjectId, Dictionary<string, object?>? Extra = null) : ICommand;
|
|
|
|
private sealed record FlushChangesCommand : ICommand;
|
|
|
|
private void StoreEvent(StoreEventCommand command) {
|
|
pendingCommands.AddLast(command);
|
|
ScheduleFlush(TimeSpan.FromMilliseconds(500));
|
|
}
|
|
|
|
private async Task FlushChanges(FlushChangesCommand command) {
|
|
hasScheduledFlush = false;
|
|
|
|
if (pendingCommands.Count == 0) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await using var db = dbProvider.Lazy();
|
|
var eventLogRepository = new EventLogRepository(db);
|
|
|
|
foreach (var (eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra) in pendingCommands) {
|
|
eventLogRepository.AddItem(eventGuid, dateTime, agentGuid, eventLogEventType, subjectId, extra);
|
|
}
|
|
|
|
await db.Ctx.SaveChangesAsync(cancellationToken);
|
|
} catch (Exception e) {
|
|
ScheduleFlush(TimeSpan.FromSeconds(10));
|
|
Logger.Error(e, "Could not store {EventCount} event(s) in database.", pendingCommands.Count);
|
|
return;
|
|
}
|
|
|
|
Logger.Information("Stored {EventCount} event(s) in database.", pendingCommands.Count);
|
|
|
|
pendingCommands.Clear();
|
|
}
|
|
|
|
private void ScheduleFlush(TimeSpan delay) {
|
|
if (!hasScheduledFlush) {
|
|
hasScheduledFlush = true;
|
|
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
|
|
}
|
|
}
|
|
}
|