mirror of
https://github.com/chylex/Apache-Prometheus-Exporter.git
synced 2025-04-10 20:15:45 +02:00
Implement own log file tailing
This commit is contained in:
parent
ec099185c3
commit
f0e1447ae5
124
Cargo.lock
generated
124
Cargo.lock
generated
@ -22,6 +22,7 @@ name = "apache_prometheus_exporter"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"hyper",
|
||||
"notify",
|
||||
"path-slash",
|
||||
"prometheus-client",
|
||||
"tokio",
|
||||
@ -54,6 +55,12 @@ version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.2.1"
|
||||
@ -78,6 +85,18 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656"
|
||||
|
||||
[[package]]
|
||||
name = "filetime"
|
||||
version = "0.2.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall 0.3.5",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@ -180,12 +199,52 @@ dependencies = [
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify-sys"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
|
||||
|
||||
[[package]]
|
||||
name = "kqueue"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
|
||||
dependencies = [
|
||||
"kqueue-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue-sys"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.148"
|
||||
@ -202,6 +261,12 @@ dependencies = [
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.5.0"
|
||||
@ -224,10 +289,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"wasi",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "6.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
|
||||
dependencies = [
|
||||
"bitflags 2.4.0",
|
||||
"filetime",
|
||||
"inotify",
|
||||
"kqueue",
|
||||
"libc",
|
||||
"log",
|
||||
"mio",
|
||||
"walkdir",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.32.1"
|
||||
@ -261,7 +344,7 @@ checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"redox_syscall 0.2.16",
|
||||
"smallvec",
|
||||
"windows-sys 0.36.1",
|
||||
]
|
||||
@ -331,7 +414,16 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -340,6 +432,15 @@ version = "0.1.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
|
||||
|
||||
[[package]]
|
||||
name = "same-file"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
|
||||
dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@ -458,6 +559,16 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
|
||||
|
||||
[[package]]
|
||||
name = "walkdir"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee"
|
||||
dependencies = [
|
||||
"same-file",
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "want"
|
||||
version = "0.3.1"
|
||||
@ -489,6 +600,15 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
|
@ -14,6 +14,7 @@ codegen-units = 1
|
||||
|
||||
[dependencies]
|
||||
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
|
||||
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
|
||||
path-slash = "0.2.1"
|
||||
prometheus-client = "0.21.2"
|
||||
tokio = { version = "1.32.0", features = ["io-util", "process", "rt", "macros", "signal"] }
|
||||
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }
|
||||
|
14
README.md
14
README.md
@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
|
||||
|
||||
#### Notes
|
||||
|
||||
> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error.
|
||||
|
||||
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
|
||||
> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
|
||||
|
||||
## 4. Launch the Exporter
|
||||
|
||||
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
|
||||
|
||||
Press `Ctrl-C` to stop the exporter.
|
||||
If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
|
||||
|
||||
**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation.
|
||||
Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
|
||||
|
||||
#### Notes
|
||||
|
||||
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
|
||||
|
||||
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
|
||||
|
||||
## 5. Collect Prometheus Metrics
|
||||
|
||||
|
62
src/fs_watcher.rs
Normal file
62
src/fs_watcher.rs
Normal file
@ -0,0 +1,62 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct FsWatcher {
|
||||
watcher: Mutex<RecommendedWatcher>,
|
||||
}
|
||||
|
||||
impl FsWatcher {
|
||||
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
|
||||
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
|
||||
let watcher = Mutex::new(watcher);
|
||||
|
||||
Ok(Self { watcher })
|
||||
}
|
||||
|
||||
pub async fn watch(&self, path: &Path) -> Result<()> {
|
||||
let mut watcher = self.watcher.lock().await;
|
||||
|
||||
if let Err(e) = watcher.unwatch(path) {
|
||||
if !matches!(e.kind, ErrorKind::WatchNotFound) {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
watcher.watch(path, RecursiveMode::NonRecursive)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FsEventCallbacks {
|
||||
senders: HashMap<PathBuf, Sender<Event>>,
|
||||
}
|
||||
|
||||
impl FsEventCallbacks {
|
||||
pub fn new() -> Self {
|
||||
Self { senders: HashMap::new() }
|
||||
}
|
||||
|
||||
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
|
||||
self.senders.insert(path.to_path_buf(), sender);
|
||||
}
|
||||
|
||||
fn handle_event(&self, event: Result<Event>) {
|
||||
match event {
|
||||
Ok(event) => {
|
||||
for path in &event.paths {
|
||||
if let Some(sender) = self.senders.get(path) {
|
||||
if let Err(e) = sender.try_send(event.clone()) {
|
||||
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("[FsWatcher] Error receiving filesystem event: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,10 +1,16 @@
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::process::Command;
|
||||
use notify::{Event, EventKind};
|
||||
use notify::event::{CreateKind, ModifyKind};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use crate::ApacheMetrics;
|
||||
use crate::fs_watcher::{FsEventCallbacks, FsWatcher};
|
||||
use crate::log_file_pattern::LogFilePath;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
@ -13,11 +19,6 @@ enum LogFileKind {
|
||||
Error,
|
||||
}
|
||||
|
||||
struct LogFile {
|
||||
pub path: PathBuf,
|
||||
pub metadata: LogFileMetadata,
|
||||
}
|
||||
|
||||
struct LogFileMetadata {
|
||||
pub kind: LogFileKind,
|
||||
pub label: String,
|
||||
@ -30,7 +31,7 @@ impl LogFileMetadata {
|
||||
}
|
||||
|
||||
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> bool {
|
||||
let mut watcher = LogWatcher::new();
|
||||
let mut watcher = LogWatcherConfiguration::new();
|
||||
|
||||
for log_file in access_log_files.into_iter() {
|
||||
watcher.add_file(log_file, LogFileKind::Access);
|
||||
@ -43,24 +44,24 @@ pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_fil
|
||||
watcher.start(&metrics).await
|
||||
}
|
||||
|
||||
struct LogWatcher {
|
||||
files: Vec<LogFile>,
|
||||
struct LogWatcherConfiguration {
|
||||
files: Vec<(PathBuf, LogFileMetadata)>,
|
||||
}
|
||||
|
||||
impl LogWatcher {
|
||||
fn new() -> LogWatcher {
|
||||
LogWatcher { files: Vec::new() }
|
||||
impl LogWatcherConfiguration {
|
||||
fn new() -> LogWatcherConfiguration {
|
||||
LogWatcherConfiguration { files: Vec::new() }
|
||||
}
|
||||
|
||||
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
|
||||
return self.files.iter().filter(|info| info.metadata.kind == kind).count();
|
||||
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
|
||||
}
|
||||
|
||||
fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
|
||||
let path = log_file.path;
|
||||
let label = log_file.label;
|
||||
let metadata = LogFileMetadata { kind, label };
|
||||
self.files.push(LogFile { path, metadata });
|
||||
self.files.push((path, metadata));
|
||||
}
|
||||
|
||||
async fn start(self, metrics: &ApacheMetrics) -> bool {
|
||||
@ -71,68 +72,221 @@ impl LogWatcher {
|
||||
|
||||
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
|
||||
|
||||
for file in self.files.into_iter() {
|
||||
let metadata = file.metadata;
|
||||
let label_set = metadata.get_label_set();
|
||||
struct PreparedFile {
|
||||
path: PathBuf,
|
||||
metadata: LogFileMetadata,
|
||||
fs_event_receiver: Receiver<Event>,
|
||||
}
|
||||
|
||||
let mut prepared_files = Vec::new();
|
||||
let mut fs_callbacks = FsEventCallbacks::new();
|
||||
|
||||
for (path, metadata) in self.files {
|
||||
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
|
||||
fs_callbacks.register(&path, fs_event_sender);
|
||||
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
|
||||
}
|
||||
|
||||
let fs_watcher = match FsWatcher::new(fs_callbacks) {
|
||||
Ok(fs_watcher) => fs_watcher,
|
||||
Err(e) => {
|
||||
println!("[LogWatcher] Error creating filesystem watcher: {}", e);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
for file in &prepared_files {
|
||||
let file_path = &file.path;
|
||||
if !file_path.is_absolute() {
|
||||
println!("[LogWatcher] Error creating filesystem watcher, path is not absolute: {}", file_path.to_string_lossy());
|
||||
return false;
|
||||
}
|
||||
|
||||
let parent_path = if let Some(parent) = file_path.parent() {
|
||||
parent
|
||||
} else {
|
||||
println!("[LogWatcher] Error creating filesystem watcher for parent directory of file \"{}\", parent directory does not exist", file_path.to_string_lossy());
|
||||
return false;
|
||||
};
|
||||
|
||||
if let Err(e) = fs_watcher.watch(parent_path).await {
|
||||
println!("[LogWatcher] Error creating filesystem watcher for directory \"{}\": {}", parent_path.to_string_lossy(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let fs_watcher = Arc::new(fs_watcher);
|
||||
|
||||
for file in prepared_files {
|
||||
let label_set = file.metadata.get_label_set();
|
||||
let _ = metrics.requests_total.get_or_create(&label_set);
|
||||
let _ = metrics.errors_total.get_or_create(&label_set);
|
||||
|
||||
let command = Command::new("tail")
|
||||
.arg("-q") // Don't print file names.
|
||||
.arg("-F") // Follow rotations.
|
||||
.arg("-n").arg("0") // Start from end.
|
||||
.arg(&file.path)
|
||||
.env_clear()
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::null())
|
||||
.spawn();
|
||||
|
||||
let mut process = match command {
|
||||
Ok(process) => process,
|
||||
Err(error) => {
|
||||
println!("[LogWatcher] Error spawning tail process for file \"{}\": {}", file.path.to_string_lossy(), error);
|
||||
return false;
|
||||
}
|
||||
let log_watcher = match LogWatcher::create(file.path, file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver).await {
|
||||
Some(log_watcher) => log_watcher,
|
||||
None => return false,
|
||||
};
|
||||
|
||||
let stdout = match process.stdout.take() {
|
||||
Some(stdout) => stdout,
|
||||
None => {
|
||||
println!("[LogWatcher] No output handle in tail process for file: {}", file.path.to_string_lossy());
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let mut output_reader = BufReader::new(stdout).lines();
|
||||
let metrics = metrics.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match output_reader.next_line().await {
|
||||
Ok(maybe_line) => match maybe_line {
|
||||
Some(line) => handle_line(&metadata, line, &metrics),
|
||||
None => break,
|
||||
},
|
||||
Err(e) => {
|
||||
println!("[LogWatcher] Error reading from file \"{}\": {}", metadata.label, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
tokio::spawn(log_watcher.watch());
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_line(metadata: &LogFileMetadata, line: String, metrics: &ApacheMetrics) {
|
||||
let (kind, family) = match metadata.kind {
|
||||
LogFileKind::Access => ("access log", &metrics.requests_total),
|
||||
LogFileKind::Error => ("error log", &metrics.errors_total),
|
||||
};
|
||||
|
||||
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, metadata.label, line);
|
||||
family.get_or_create(&metadata.get_label_set()).inc();
|
||||
struct LogWatcher {
|
||||
state: LogWatchingState,
|
||||
processor: LogLineProcessor,
|
||||
fs_event_receiver: Receiver<Event>,
|
||||
}
|
||||
|
||||
impl LogWatcher {
|
||||
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: ApacheMetrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> {
|
||||
let state = match LogWatchingState::initialize(path.clone(), fs_watcher).await {
|
||||
Some(state) => state,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let processor = LogLineProcessor { path, metadata, metrics };
|
||||
Some(LogWatcher { state, processor, fs_event_receiver })
|
||||
}
|
||||
|
||||
async fn watch(mut self) {
|
||||
while let Ok(Some(_)) = self.state.lines.next_line().await {
|
||||
// Skip lines that already existed.
|
||||
}
|
||||
|
||||
let path = &self.processor.path;
|
||||
|
||||
'read_loop:
|
||||
loop {
|
||||
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||
break 'read_loop;
|
||||
}
|
||||
|
||||
'event_loop:
|
||||
loop {
|
||||
let mut next_event = CoalescedFsEvent::None;
|
||||
|
||||
match self.fs_event_receiver.recv().await {
|
||||
None => break 'read_loop,
|
||||
Some(event) => {
|
||||
next_event = next_event.merge(event);
|
||||
|
||||
while let Ok(event) = self.fs_event_receiver.try_recv() {
|
||||
next_event = next_event.merge(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match next_event {
|
||||
CoalescedFsEvent::None => continue 'event_loop,
|
||||
CoalescedFsEvent::NewData => continue 'read_loop,
|
||||
CoalescedFsEvent::NewFile => {
|
||||
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
|
||||
|
||||
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||
break 'read_loop;
|
||||
}
|
||||
|
||||
self.state = match self.state.reinitialize().await {
|
||||
Some(state) => state,
|
||||
None => break 'read_loop,
|
||||
};
|
||||
|
||||
continue 'read_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
enum CoalescedFsEvent {
|
||||
None = 0,
|
||||
NewData = 1,
|
||||
NewFile = 2,
|
||||
}
|
||||
|
||||
impl CoalescedFsEvent {
|
||||
fn merge(self, event: Event) -> CoalescedFsEvent {
|
||||
match event.kind {
|
||||
EventKind::Modify(ModifyKind::Data(_)) => {
|
||||
max(self, CoalescedFsEvent::NewData)
|
||||
}
|
||||
|
||||
EventKind::Create(CreateKind::File) => {
|
||||
max(self, CoalescedFsEvent::NewFile)
|
||||
}
|
||||
|
||||
_ => self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LogWatchingState {
|
||||
path: PathBuf,
|
||||
lines: Lines<BufReader<File>>,
|
||||
fs_watcher: Arc<FsWatcher>,
|
||||
}
|
||||
|
||||
impl LogWatchingState {
|
||||
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
|
||||
|
||||
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Option<LogWatchingState> {
|
||||
if let Err(e) = fs_watcher.watch(&path).await {
|
||||
println!("[LogWatcher] Error creating filesystem watcher for file \"{}\": {}", path.to_string_lossy(), e);
|
||||
return None;
|
||||
}
|
||||
|
||||
let lines = match File::open(&path).await {
|
||||
Ok(file) => BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines(),
|
||||
Err(e) => {
|
||||
println!("[LogWatcher] Error opening file \"{}\": {}", path.to_string_lossy(), e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some(LogWatchingState { path, lines, fs_watcher })
|
||||
}
|
||||
|
||||
async fn reinitialize(self) -> Option<LogWatchingState> {
|
||||
LogWatchingState::initialize(self.path, self.fs_watcher).await
|
||||
}
|
||||
}
|
||||
|
||||
struct LogLineProcessor {
|
||||
path: PathBuf,
|
||||
metadata: LogFileMetadata,
|
||||
metrics: ApacheMetrics,
|
||||
}
|
||||
|
||||
impl LogLineProcessor {
|
||||
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
|
||||
loop {
|
||||
match reader.next_line().await {
|
||||
Ok(maybe_line) => match maybe_line {
|
||||
Some(line) => self.handle_line(line),
|
||||
None => return true,
|
||||
},
|
||||
Err(e) => {
|
||||
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_line(&self, line: String) {
|
||||
let (kind, family) = match self.metadata.kind {
|
||||
LogFileKind::Access => ("access log", &self.metrics.requests_total),
|
||||
LogFileKind::Error => ("error log", &self.metrics.errors_total),
|
||||
};
|
||||
|
||||
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
|
||||
family.get_or_create(&self.metadata.get_label_set()).inc();
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ use crate::log_watcher::start_log_watcher;
|
||||
use crate::web_server::WebServer;
|
||||
|
||||
mod apache_metrics;
|
||||
mod fs_watcher;
|
||||
mod log_file_pattern;
|
||||
mod log_parser;
|
||||
mod log_watcher;
|
||||
|
Loading…
Reference in New Issue
Block a user