diff --git a/Cargo.lock b/Cargo.lock index 88c170e..4dba285 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,7 @@ name = "apache_prometheus_exporter" version = "0.1.0" dependencies = [ "hyper", + "notify", "path-slash", "prometheus-client", "tokio", @@ -54,6 +55,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + [[package]] name = "bytes" version = "1.2.1" @@ -78,6 +85,18 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656" +[[package]] +name = "filetime" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.3.5", + "windows-sys 0.48.0", +] + [[package]] name = "fnv" version = "1.0.7" @@ -180,12 +199,52 @@ dependencies = [ "want", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "itoa" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "libc" version = "0.2.148" @@ -202,6 +261,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + [[package]] name = "memchr" version = "2.5.0" @@ -224,10 +289,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.4.0", + "filetime", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "object" version = "0.32.1" @@ -261,7 +344,7 @@ checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "windows-sys 0.36.1", ] @@ -331,7 +414,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags", + "bitflags 1.3.2", +] + +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", ] [[package]] @@ -340,6 +432,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -458,6 +559,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -489,6 +600,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 95389b9..06ab33c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ codegen-units = 1 [dependencies] hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] } +notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] } path-slash = "0.2.1" prometheus-client = "0.21.2" -tokio = { version = "1.32.0", features = ["io-util", "process", "rt", "macros", "signal"] } +tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] } diff --git a/README.md b/README.md index 3f9b052..080932e 100644 --- a/README.md +++ b/README.md @@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/ #### Notes -> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error. - -> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter. +> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it. ## 4. Launch the Exporter Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL. -Press `Ctrl-C` to stop the exporter. +If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning. -**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation. +Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored. + +#### Notes + +> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported. + +> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output. ## 5. Collect Prometheus Metrics diff --git a/src/fs_watcher.rs b/src/fs_watcher.rs new file mode 100644 index 0000000..b69ad95 --- /dev/null +++ b/src/fs_watcher.rs @@ -0,0 +1,62 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher}; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; + +pub struct FsWatcher { + watcher: Mutex<RecommendedWatcher>, +} + +impl FsWatcher { + pub fn new(callbacks: FsEventCallbacks) -> Result<Self> { + let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?; + let watcher = Mutex::new(watcher); + + Ok(Self { watcher }) + } + + pub async fn watch(&self, path: &Path) -> Result<()> { + let mut watcher = self.watcher.lock().await; + + if let Err(e) = watcher.unwatch(path) { + if !matches!(e.kind, ErrorKind::WatchNotFound) { + return Err(e); + } + } + + watcher.watch(path, RecursiveMode::NonRecursive) + } +} + +pub struct FsEventCallbacks { + senders: HashMap<PathBuf, Sender<Event>>, +} + +impl FsEventCallbacks { + pub fn new() -> Self { + Self { senders: HashMap::new() } + } + + pub fn register(&mut self, path: &Path, sender: Sender<Event>) { + self.senders.insert(path.to_path_buf(), sender); + } + + fn handle_event(&self, event: Result<Event>) { + match event { + Ok(event) => { + for path in &event.paths { + if let Some(sender) = self.senders.get(path) { + if let Err(e) = sender.try_send(event.clone()) { + println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e); + } + } + } + } + Err(e) => { + println!("[FsWatcher] Error receiving filesystem event: {}", e); + } + } + } +} diff --git a/src/log_watcher.rs b/src/log_watcher.rs index 90f509a..3d662d2 100644 --- a/src/log_watcher.rs +++ b/src/log_watcher.rs @@ -1,10 +1,16 @@ +use std::cmp::max; use std::path::PathBuf; -use std::process::Stdio; +use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Command; +use notify::{Event, EventKind}; +use notify::event::{CreateKind, ModifyKind}; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader, Lines}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; use crate::ApacheMetrics; +use crate::fs_watcher::{FsEventCallbacks, FsWatcher}; use crate::log_file_pattern::LogFilePath; #[derive(Copy, Clone, PartialEq)] @@ -13,11 +19,6 @@ enum LogFileKind { Error, } -struct LogFile { - pub path: PathBuf, - pub metadata: LogFileMetadata, -} - struct LogFileMetadata { pub kind: LogFileKind, pub label: String, @@ -30,7 +31,7 @@ impl LogFileMetadata { } pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> bool { - let mut watcher = LogWatcher::new(); + let mut watcher = LogWatcherConfiguration::new(); for log_file in access_log_files.into_iter() { watcher.add_file(log_file, LogFileKind::Access); @@ -43,24 +44,24 @@ pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_fil watcher.start(&metrics).await } -struct LogWatcher { - files: Vec<LogFile>, +struct LogWatcherConfiguration { + files: Vec<(PathBuf, LogFileMetadata)>, } -impl LogWatcher { - fn new() -> LogWatcher { - LogWatcher { files: Vec::new() } +impl LogWatcherConfiguration { + fn new() -> LogWatcherConfiguration { + LogWatcherConfiguration { files: Vec::new() } } fn count_files_of_kind(&self, kind: LogFileKind) -> usize { - return self.files.iter().filter(|info| info.metadata.kind == kind).count(); + return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count(); } fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) { let path = log_file.path; let label = log_file.label; let metadata = LogFileMetadata { kind, label }; - self.files.push(LogFile { path, metadata }); + self.files.push((path, metadata)); } async fn start(self, metrics: &ApacheMetrics) -> bool { @@ -71,68 +72,221 @@ impl LogWatcher { println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error)); - for file in self.files.into_iter() { - let metadata = file.metadata; - let label_set = metadata.get_label_set(); + struct PreparedFile { + path: PathBuf, + metadata: LogFileMetadata, + fs_event_receiver: Receiver<Event>, + } + + let mut prepared_files = Vec::new(); + let mut fs_callbacks = FsEventCallbacks::new(); + + for (path, metadata) in self.files { + let (fs_event_sender, fs_event_receiver) = mpsc::channel(20); + fs_callbacks.register(&path, fs_event_sender); + prepared_files.push(PreparedFile { path, metadata, fs_event_receiver }); + } + + let fs_watcher = match FsWatcher::new(fs_callbacks) { + Ok(fs_watcher) => fs_watcher, + Err(e) => { + println!("[LogWatcher] Error creating filesystem watcher: {}", e); + return false; + } + }; + + for file in &prepared_files { + let file_path = &file.path; + if !file_path.is_absolute() { + println!("[LogWatcher] Error creating filesystem watcher, path is not absolute: {}", file_path.to_string_lossy()); + return false; + } + + let parent_path = if let Some(parent) = file_path.parent() { + parent + } else { + println!("[LogWatcher] Error creating filesystem watcher for parent directory of file \"{}\", parent directory does not exist", file_path.to_string_lossy()); + return false; + }; + + if let Err(e) = fs_watcher.watch(parent_path).await { + println!("[LogWatcher] Error creating filesystem watcher for directory \"{}\": {}", parent_path.to_string_lossy(), e); + return false; + } + } + + let fs_watcher = Arc::new(fs_watcher); + + for file in prepared_files { + let label_set = file.metadata.get_label_set(); let _ = metrics.requests_total.get_or_create(&label_set); let _ = metrics.errors_total.get_or_create(&label_set); - let command = Command::new("tail") - .arg("-q") // Don't print file names. - .arg("-F") // Follow rotations. - .arg("-n").arg("0") // Start from end. - .arg(&file.path) - .env_clear() - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::null()) - .spawn(); - - let mut process = match command { - Ok(process) => process, - Err(error) => { - println!("[LogWatcher] Error spawning tail process for file \"{}\": {}", file.path.to_string_lossy(), error); - return false; - } + let log_watcher = match LogWatcher::create(file.path, file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver).await { + Some(log_watcher) => log_watcher, + None => return false, }; - let stdout = match process.stdout.take() { - Some(stdout) => stdout, - None => { - println!("[LogWatcher] No output handle in tail process for file: {}", file.path.to_string_lossy()); - return false; - } - }; - - let mut output_reader = BufReader::new(stdout).lines(); - let metrics = metrics.clone(); - - tokio::spawn(async move { - loop { - match output_reader.next_line().await { - Ok(maybe_line) => match maybe_line { - Some(line) => handle_line(&metadata, line, &metrics), - None => break, - }, - Err(e) => { - println!("[LogWatcher] Error reading from file \"{}\": {}", metadata.label, e); - break; - } - } - } - }); + tokio::spawn(log_watcher.watch()); } true } } -fn handle_line(metadata: &LogFileMetadata, line: String, metrics: &ApacheMetrics) { - let (kind, family) = match metadata.kind { - LogFileKind::Access => ("access log", &metrics.requests_total), - LogFileKind::Error => ("error log", &metrics.errors_total), - }; - - println!("[LogWatcher] Received {} line from \"{}\": {}", kind, metadata.label, line); - family.get_or_create(&metadata.get_label_set()).inc(); +struct LogWatcher { + state: LogWatchingState, + processor: LogLineProcessor, + fs_event_receiver: Receiver<Event>, +} + +impl LogWatcher { + async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: ApacheMetrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> { + let state = match LogWatchingState::initialize(path.clone(), fs_watcher).await { + Some(state) => state, + None => return None, + }; + + let processor = LogLineProcessor { path, metadata, metrics }; + Some(LogWatcher { state, processor, fs_event_receiver }) + } + + async fn watch(mut self) { + while let Ok(Some(_)) = self.state.lines.next_line().await { + // Skip lines that already existed. + } + + let path = &self.processor.path; + + 'read_loop: + loop { + if !self.processor.process_lines(&mut self.state.lines).await { + break 'read_loop; + } + + 'event_loop: + loop { + let mut next_event = CoalescedFsEvent::None; + + match self.fs_event_receiver.recv().await { + None => break 'read_loop, + Some(event) => { + next_event = next_event.merge(event); + + while let Ok(event) = self.fs_event_receiver.try_recv() { + next_event = next_event.merge(event); + } + } + } + + match next_event { + CoalescedFsEvent::None => continue 'event_loop, + CoalescedFsEvent::NewData => continue 'read_loop, + CoalescedFsEvent::NewFile => { + println!("[LogWatcher] File recreated: {}", path.to_string_lossy()); + + if !self.processor.process_lines(&mut self.state.lines).await { + break 'read_loop; + } + + self.state = match self.state.reinitialize().await { + Some(state) => state, + None => break 'read_loop, + }; + + continue 'read_loop; + } + } + } + } + + println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy()); + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +enum CoalescedFsEvent { + None = 0, + NewData = 1, + NewFile = 2, +} + +impl CoalescedFsEvent { + fn merge(self, event: Event) -> CoalescedFsEvent { + match event.kind { + EventKind::Modify(ModifyKind::Data(_)) => { + max(self, CoalescedFsEvent::NewData) + } + + EventKind::Create(CreateKind::File) => { + max(self, CoalescedFsEvent::NewFile) + } + + _ => self + } + } +} + +struct LogWatchingState { + path: PathBuf, + lines: Lines<BufReader<File>>, + fs_watcher: Arc<FsWatcher>, +} + +impl LogWatchingState { + const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4; + + async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Option<LogWatchingState> { + if let Err(e) = fs_watcher.watch(&path).await { + println!("[LogWatcher] Error creating filesystem watcher for file \"{}\": {}", path.to_string_lossy(), e); + return None; + } + + let lines = match File::open(&path).await { + Ok(file) => BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines(), + Err(e) => { + println!("[LogWatcher] Error opening file \"{}\": {}", path.to_string_lossy(), e); + return None; + } + }; + + Some(LogWatchingState { path, lines, fs_watcher }) + } + + async fn reinitialize(self) -> Option<LogWatchingState> { + LogWatchingState::initialize(self.path, self.fs_watcher).await + } +} + +struct LogLineProcessor { + path: PathBuf, + metadata: LogFileMetadata, + metrics: ApacheMetrics, +} + +impl LogLineProcessor { + async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool { + loop { + match reader.next_line().await { + Ok(maybe_line) => match maybe_line { + Some(line) => self.handle_line(line), + None => return true, + }, + Err(e) => { + println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e); + return false; + } + } + } + } + + fn handle_line(&self, line: String) { + let (kind, family) = match self.metadata.kind { + LogFileKind::Access => ("access log", &self.metrics.requests_total), + LogFileKind::Error => ("error log", &self.metrics.errors_total), + }; + + println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line); + family.get_or_create(&self.metadata.get_label_set()).inc(); + } } diff --git a/src/main.rs b/src/main.rs index a305442..ad919ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use crate::log_watcher::start_log_watcher; use crate::web_server::WebServer; mod apache_metrics; +mod fs_watcher; mod log_file_pattern; mod log_parser; mod log_watcher;