Compare commits

...

2 Commits

Author SHA1 Message Date
chylex f0e1447ae5
Implement own log file tailing 2023-10-02 08:49:36 +02:00
chylex ec099185c3
Replace linemux with reading output of 'tail' processes 2023-10-01 18:55:45 +02:00
6 changed files with 371 additions and 223 deletions

181
Cargo.lock generated
View File

@ -22,7 +22,7 @@ name = "apache_prometheus_exporter"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"hyper", "hyper",
"linemux", "notify",
"path-slash", "path-slash",
"prometheus-client", "prometheus-client",
"tokio", "tokio",
@ -55,6 +55,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.2.1" version = "1.2.1"
@ -73,26 +79,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]] [[package]]
name = "dtoa" name = "dtoa"
version = "1.0.3" version = "1.0.3"
@ -101,14 +87,14 @@ checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656"
[[package]] [[package]]
name = "filetime" name = "filetime"
version = "0.2.17" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c" checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall 0.3.5",
"windows-sys 0.36.1", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -148,7 +134,6 @@ dependencies = [
"futures-task", "futures-task",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]
@ -220,7 +205,7 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
"inotify-sys", "inotify-sys",
"libc", "libc",
] ]
@ -242,9 +227,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
[[package]] [[package]]
name = "kqueue" name = "kqueue"
version = "1.0.6" version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [ dependencies = [
"kqueue-sys", "kqueue-sys",
"libc", "libc",
@ -252,11 +237,11 @@ dependencies = [
[[package]] [[package]]
name = "kqueue-sys" name = "kqueue-sys"
version = "1.0.3" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
"libc", "libc",
] ]
@ -266,18 +251,6 @@ version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "linemux"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb035c7806bd7982a317d8d66815021e91f7ab14a5fbedee22b06f608f11b43"
dependencies = [
"futures-util",
"notify",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.8" version = "0.4.8"
@ -290,12 +263,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
@ -326,19 +296,19 @@ dependencies = [
[[package]] [[package]]
name = "notify" name = "notify"
version = "5.2.0" version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486" checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.4.0",
"crossbeam-channel",
"filetime", "filetime",
"inotify", "inotify",
"kqueue", "kqueue",
"libc", "libc",
"log",
"mio", "mio",
"walkdir", "walkdir",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -374,7 +344,7 @@ checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall 0.2.16",
"smallvec", "smallvec",
"windows-sys 0.36.1", "windows-sys 0.36.1",
] ]
@ -444,7 +414,16 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
] ]
[[package]] [[package]]
@ -477,15 +456,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.9.0" version = "1.9.0"
@ -591,12 +561,11 @@ checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.2" version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee"
dependencies = [ dependencies = [
"same-file", "same-file",
"winapi",
"winapi-util", "winapi-util",
] ]
@ -633,9 +602,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]] [[package]]
name = "winapi-util" name = "winapi-util"
version = "0.1.5" version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
dependencies = [ dependencies = [
"winapi", "winapi",
] ]
@ -659,37 +628,13 @@ dependencies = [
"windows_x86_64_msvc 0.36.1", "windows_x86_64_msvc 0.36.1",
] ]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [ dependencies = [
"windows-targets 0.48.5", "windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
] ]
[[package]] [[package]]
@ -698,21 +643,15 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm 0.48.5", "windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.48.5", "windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5", "windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5", "windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5", "windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm 0.48.5", "windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.48.5", "windows_x86_64_msvc 0.48.5",
] ]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.48.5" version = "0.48.5"
@ -725,12 +664,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.5" version = "0.48.5"
@ -743,12 +676,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.5" version = "0.48.5"
@ -761,12 +688,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.5" version = "0.48.5"
@ -779,24 +700,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.5" version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.48.5" version = "0.48.5"
@ -809,12 +718,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.48.5"

View File

@ -14,7 +14,7 @@ codegen-units = 1
[dependencies] [dependencies]
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] } hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
linemux = "0.3.0" notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
path-slash = "0.2.1" path-slash = "0.2.1"
prometheus-client = "0.21.2" prometheus-client = "0.21.2"
tokio = { version = "1.32.0", features = ["rt", "macros", "signal"] } tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }

View File

@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
#### Notes #### Notes
> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error. > The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
## 4. Launch the Exporter ## 4. Launch the Exporter
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL. Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
Press `Ctrl-C` to stop the exporter. If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation. Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
#### Notes
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
## 5. Collect Prometheus Metrics ## 5. Collect Prometheus Metrics

62
src/fs_watcher.rs Normal file
View File

@ -0,0 +1,62 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FsWatcher {
watcher: Mutex<RecommendedWatcher>,
}
impl FsWatcher {
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
let watcher = Mutex::new(watcher);
Ok(Self { watcher })
}
pub async fn watch(&self, path: &Path) -> Result<()> {
let mut watcher = self.watcher.lock().await;
if let Err(e) = watcher.unwatch(path) {
if !matches!(e.kind, ErrorKind::WatchNotFound) {
return Err(e);
}
}
watcher.watch(path, RecursiveMode::NonRecursive)
}
}
pub struct FsEventCallbacks {
senders: HashMap<PathBuf, Sender<Event>>,
}
impl FsEventCallbacks {
pub fn new() -> Self {
Self { senders: HashMap::new() }
}
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
self.senders.insert(path.to_path_buf(), sender);
}
fn handle_event(&self, event: Result<Event>) {
match event {
Ok(event) => {
for path in &event.paths {
if let Some(sender) = self.senders.get(path) {
if let Err(e) = sender.try_send(event.clone()) {
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
}
}
}
}
Err(e) => {
println!("[FsWatcher] Error receiving filesystem event: {}", e);
}
}
}
}

View File

@ -1,12 +1,16 @@
use std::collections::HashMap; use std::cmp::max;
use std::io;
use std::io::{Error, ErrorKind};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use linemux::{Line, MuxedLines}; use notify::{Event, EventKind};
use tokio::sync::mpsc::UnboundedSender; use notify::event::{CreateKind, ModifyKind};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use crate::ApacheMetrics; use crate::ApacheMetrics;
use crate::fs_watcher::{FsEventCallbacks, FsWatcher};
use crate::log_file_pattern::LogFilePath; use crate::log_file_pattern::LogFilePath;
#[derive(Copy, Clone, PartialEq)] #[derive(Copy, Clone, PartialEq)]
@ -15,98 +19,274 @@ enum LogFileKind {
Error, Error,
} }
struct LogFileInfo<'a> { struct LogFileMetadata {
pub kind: LogFileKind, pub kind: LogFileKind,
pub label: &'a String, pub label: String,
} }
impl<'a> LogFileInfo<'a> { impl LogFileMetadata {
fn get_label_set(&self) -> [(&'static str, String); 1] { fn get_label_set(&self) -> [(&'static str, String); 1] {
[("file", self.label.clone())] [("file", self.label.clone())]
} }
} }
pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) { pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> bool {
if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await { let mut watcher = LogWatcherConfiguration::new();
println!("[LogWatcher] Error reading logs: {}", error);
shutdown_send.send(()).unwrap(); for log_file in access_log_files.into_iter() {
watcher.add_file(log_file, LogFileKind::Access);
} }
for log_file in error_log_files.into_iter() {
watcher.add_file(log_file, LogFileKind::Error);
}
watcher.start(&metrics).await
} }
struct LogWatcher<'a> { struct LogWatcherConfiguration {
reader: MuxedLines, files: Vec<(PathBuf, LogFileMetadata)>,
files: HashMap<PathBuf, LogFileInfo<'a>>,
} }
impl<'a> LogWatcher<'a> { impl LogWatcherConfiguration {
fn new() -> io::Result<LogWatcher<'a>> { fn new() -> LogWatcherConfiguration {
Ok(LogWatcher { LogWatcherConfiguration { files: Vec::new() }
reader: MuxedLines::new()?,
files: HashMap::new(),
})
} }
fn count_files_of_kind(&self, kind: LogFileKind) -> usize { fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
return self.files.values().filter(|info| info.kind == kind).count(); return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
} }
async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> { fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
let lookup_key = self.reader.add_file(&log_file.path).await?; let path = log_file.path;
self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label }); let label = log_file.label;
Ok(()) let metadata = LogFileMetadata { kind, label };
self.files.push((path, metadata));
} }
async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> { async fn start(self, metrics: &ApacheMetrics) -> bool {
if self.files.is_empty() { if self.files.is_empty() {
println!("[LogWatcher] No log files provided."); println!("[LogWatcher] No log files provided.");
return Err(Error::from(ErrorKind::Unsupported)); return false;
} }
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error)); println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
for metadata in self.files.values() { struct PreparedFile {
let label_set = metadata.get_label_set(); path: PathBuf,
let _ = metrics.requests_total.get_or_create(&label_set); metadata: LogFileMetadata,
let _ = metrics.errors_total.get_or_create(&label_set); fs_event_receiver: Receiver<Event>,
} }
loop { let mut prepared_files = Vec::new();
if let Some(event) = self.reader.next_line().await? { let mut fs_callbacks = FsEventCallbacks::new();
self.handle_line(event, metrics);
for (path, metadata) in self.files {
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
fs_callbacks.register(&path, fs_event_sender);
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
}
let fs_watcher = match FsWatcher::new(fs_callbacks) {
Ok(fs_watcher) => fs_watcher,
Err(e) => {
println!("[LogWatcher] Error creating filesystem watcher: {}", e);
return false;
}
};
for file in &prepared_files {
let file_path = &file.path;
if !file_path.is_absolute() {
println!("[LogWatcher] Error creating filesystem watcher, path is not absolute: {}", file_path.to_string_lossy());
return false;
}
let parent_path = if let Some(parent) = file_path.parent() {
parent
} else {
println!("[LogWatcher] Error creating filesystem watcher for parent directory of file \"{}\", parent directory does not exist", file_path.to_string_lossy());
return false;
};
if let Err(e) = fs_watcher.watch(parent_path).await {
println!("[LogWatcher] Error creating filesystem watcher for directory \"{}\": {}", parent_path.to_string_lossy(), e);
return false;
} }
} }
let fs_watcher = Arc::new(fs_watcher);
for file in prepared_files {
let label_set = file.metadata.get_label_set();
let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set);
let log_watcher = match LogWatcher::create(file.path, file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver).await {
Some(log_watcher) => log_watcher,
None => return false,
};
tokio::spawn(log_watcher.watch());
}
true
}
}
struct LogWatcher {
state: LogWatchingState,
processor: LogLineProcessor,
fs_event_receiver: Receiver<Event>,
}
impl LogWatcher {
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: ApacheMetrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> {
let state = match LogWatchingState::initialize(path.clone(), fs_watcher).await {
Some(state) => state,
None => return None,
};
let processor = LogLineProcessor { path, metadata, metrics };
Some(LogWatcher { state, processor, fs_event_receiver })
} }
fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) { async fn watch(mut self) {
match self.files.get(event.source()) { while let Ok(Some(_)) = self.state.lines.next_line().await {
Some(metadata) => { // Skip lines that already existed.
let label = metadata.label; }
let (kind, family) = match metadata.kind {
LogFileKind::Access => ("access log", &metrics.requests_total), let path = &self.processor.path;
LogFileKind::Error => ("error log", &metrics.errors_total),
}; 'read_loop:
loop {
if !self.processor.process_lines(&mut self.state.lines).await {
break 'read_loop;
}
'event_loop:
loop {
let mut next_event = CoalescedFsEvent::None;
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line()); match self.fs_event_receiver.recv().await {
family.get_or_create(&metadata.get_label_set()).inc(); None => break 'read_loop,
Some(event) => {
next_event = next_event.merge(event);
while let Ok(event) = self.fs_event_receiver.try_recv() {
next_event = next_event.merge(event);
}
}
}
match next_event {
CoalescedFsEvent::None => continue 'event_loop,
CoalescedFsEvent::NewData => continue 'read_loop,
CoalescedFsEvent::NewFile => {
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
if !self.processor.process_lines(&mut self.state.lines).await {
break 'read_loop;
}
self.state = match self.state.reinitialize().await {
Some(state) => state,
None => break 'read_loop,
};
continue 'read_loop;
}
}
} }
None => { }
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
enum CoalescedFsEvent {
None = 0,
NewData = 1,
NewFile = 2,
}
impl CoalescedFsEvent {
fn merge(self, event: Event) -> CoalescedFsEvent {
match event.kind {
EventKind::Modify(ModifyKind::Data(_)) => {
max(self, CoalescedFsEvent::NewData)
} }
EventKind::Create(CreateKind::File) => {
max(self, CoalescedFsEvent::NewFile)
}
_ => self
} }
} }
} }
async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> { struct LogWatchingState {
let mut watcher = LogWatcher::new()?; path: PathBuf,
lines: Lines<BufReader<File>>,
for log_file in &access_log_files { fs_watcher: Arc<FsWatcher>,
watcher.add_file(log_file, LogFileKind::Access).await?; }
}
impl LogWatchingState {
for log_file in &error_log_files { const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
watcher.add_file(log_file, LogFileKind::Error).await?;
} async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Option<LogWatchingState> {
if let Err(e) = fs_watcher.watch(&path).await {
watcher.start_watching(&metrics).await?; println!("[LogWatcher] Error creating filesystem watcher for file \"{}\": {}", path.to_string_lossy(), e);
Ok(()) return None;
}
let lines = match File::open(&path).await {
Ok(file) => BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines(),
Err(e) => {
println!("[LogWatcher] Error opening file \"{}\": {}", path.to_string_lossy(), e);
return None;
}
};
Some(LogWatchingState { path, lines, fs_watcher })
}
async fn reinitialize(self) -> Option<LogWatchingState> {
LogWatchingState::initialize(self.path, self.fs_watcher).await
}
}
struct LogLineProcessor {
path: PathBuf,
metadata: LogFileMetadata,
metrics: ApacheMetrics,
}
impl LogLineProcessor {
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
loop {
match reader.next_line().await {
Ok(maybe_line) => match maybe_line {
Some(line) => self.handle_line(line),
None => return true,
},
Err(e) => {
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
return false;
}
}
}
}
fn handle_line(&self, line: String) {
let (kind, family) = match self.metadata.kind {
LogFileKind::Access => ("access log", &self.metrics.requests_total),
LogFileKind::Error => ("error log", &self.metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
family.get_or_create(&self.metadata.get_label_set()).inc();
}
} }

View File

@ -5,14 +5,14 @@ use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::signal; use tokio::signal;
use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics; use crate::apache_metrics::ApacheMetrics;
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
use crate::log_watcher::watch_logs_task; use crate::log_watcher::start_log_watcher;
use crate::web_server::WebServer; use crate::web_server::WebServer;
mod apache_metrics; mod apache_metrics;
mod fs_watcher;
mod log_file_pattern; mod log_file_pattern;
mod log_parser; mod log_parser;
mod log_watcher; mod log_watcher;
@ -79,22 +79,21 @@ async fn main() -> ExitCode {
}; };
let (metrics_registry, metrics) = ApacheMetrics::new(); let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone())); if !start_log_watcher(access_log_files, error_log_files, metrics).await {
tokio::spawn(server.serve(Mutex::new(metrics_registry))); return ExitCode::FAILURE;
drop(shutdown_send);
tokio::select! {
_ = signal::ctrl_c() => {
println!("Received CTRL-C, shutting down...")
}
_ = shutdown_recv.recv() => {
println!("Shutting down...");
}
} }
ExitCode::SUCCESS tokio::spawn(server.serve(Mutex::new(metrics_registry)));
match signal::ctrl_c().await {
Ok(_) => {
println!("Received CTRL-C, shutting down...");
ExitCode::SUCCESS
}
Err(e) => {
println!("Error registering CTRL-C handler: {}", e);
ExitCode::FAILURE
}
}
} }