1
0
Fork 0
Brotli-Builder/BrotliCalc/Helpers/FileWorker.cs

130 lines
4.4 KiB
C#

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
namespace BrotliCalc.Helpers{
sealed class FileWorker<T> where T : BrotliFile{
private const int WriterSleepMillis = 200;
public delegate IEnumerable<object?[]> WorkFunc(BrotliFileGroup group, T file);
public delegate IEnumerable<object?[]> ErrorFunc(BrotliFileGroup group, T file, Exception ex);
private static readonly List<object?[]> ErrorEntry = new List<object?[]>();
public Func<T, string>? Name { get; set; }
public WorkFunc? Work { get; set; }
public ErrorFunc? Error { get; set; }
public Result Start(Table output, IEnumerable<(BrotliFileGroup group, T file)> input){
int errors = 0;
var name = Name ?? (item => item.ToString());
var work = Work ?? throw new InvalidOperationException("Cannot start worker without a defined Work function.");
var error = Error;
var items = input.ToArray();
var count = items.Length;
var results = new List<object?[]>?[count];
using var progress = new Progress(count);
var writerToken = new CancellationTokenSource();
var writerInfo = new ResultWriter(output, results, writerToken.Token);
var writerThread = new Thread(ResultWriterThread){
Name = "ResultWriter",
Priority = ThreadPriority.BelowNormal,
IsBackground = true
};
writerThread.Start(writerInfo);
items.WithIndex().Parallelize().ForAll(entry => {
var (index, (group, file)) = entry;
try{
progress.Start($"Processing {name(file)}");
results[index] = work(group, file).ToList();
}catch(Exception e){
results[index] = error == null ? ErrorEntry : error(group, file, e).ToList();
Interlocked.Increment(ref errors);
progress.Print(ConsoleColor.Red, $"Error processing {name(file)}: {e.Message}");
Debug.WriteLine(e.ToString());
}finally{
progress.Finish($"Completed {name(file)}");
}
});
writerToken.Cancel();
writerThread.Join();
if (writerInfo.MissingEntries.Count > 0){
foreach(int index in writerInfo.MissingEntries){
progress.Print(ConsoleColor.Red, $"Missing result entry for file {items[index].file}");
}
progress.Print(ConsoleColor.Red, "");
}
return new Result(count, errors);
}
public class Result{
public int TotalProcessed { get; }
public int TotalErrors { get; }
public Result(int totalProcessed, int totalErrors){
this.TotalProcessed = totalProcessed;
this.TotalErrors = totalErrors;
}
}
private class ResultWriter{
public Table Output { get; }
public List<object?[]>?[] Results { get; }
public CancellationToken Token { get; }
public List<int> MissingEntries { get; } = new List<int>();
public ResultWriter(Table output, List<object?[]>?[] results, CancellationToken token){
this.Output = output;
this.Results = results;
this.Token = token;
}
}
private void ResultWriterThread(object? obj){
var info = (ResultWriter)obj!;
var output = info.Output;
var results = info.Results;
var token = info.Token;
int index = 0;
int count = results.Length;
while(index < count){
var result = results[index];
if (result == null){
if (token.IsCancellationRequested){
info.MissingEntries.Add(index++);
}
else{
Thread.Sleep(WriterSleepMillis);
}
}
else{
foreach(var row in result){
output.AddRow(row.ToArray());
}
++index;
}
}
}
}
}