diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3830eb3..1184df6 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -44,7 +44,8 @@ services: - logs:/logs environment: HTTP_HOST: "0.0.0.0" - LOG_FILE_PATTERN: "/logs/*.access.log" + ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log" + ERROR_LOG_FILE_PATTERN: "/logs/*.error.log" restart: "always" volumes: diff --git a/src/apache_metrics.rs b/src/apache_metrics.rs index 7b982eb..4421041 100644 --- a/src/apache_metrics.rs +++ b/src/apache_metrics.rs @@ -2,22 +2,26 @@ use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::registry::Registry; +type SingleLabel = (&'static str, String); + #[derive(Clone)] pub struct ApacheMetrics { - pub requests_total: Family<(&'static str, String), Counter> + pub requests_total: Family<SingleLabel, Counter>, + pub errors_total: Family<SingleLabel, Counter> } impl ApacheMetrics { pub fn new() -> (Registry, ApacheMetrics) { let mut registry = <Registry>::default(); - let requests_total = Family::<(&'static str, String), Counter>::default(); - registry.register("apache_requests", "Number of received requests", Box::new(requests_total.clone())); - let metrics = ApacheMetrics { - requests_total + requests_total: Family::<SingleLabel, Counter>::default(), + errors_total: Family::<SingleLabel, Counter>::default() }; + registry.register("apache_requests", "Number of received requests", Box::new(metrics.requests_total.clone())); + registry.register("apache_errors", "Number of logged errors", Box::new(metrics.errors_total.clone())); + return (registry, metrics); } } diff --git a/src/log_file_pattern.rs b/src/log_file_pattern.rs index 54e05fd..ef65deb 100644 --- a/src/log_file_pattern.rs +++ b/src/log_file_pattern.rs @@ -6,24 +6,22 @@ use std::path::{Path, PathBuf}; use path_slash::PathExt; -/// Environment variable that determines the path and file name pattern of log files. +/// Reads and parses an environment variable that determines the path and file name pattern of log files. /// /// Supports 3 pattern types: /// /// 1. A simple path to a file. /// 2. A path with a wildcard anywhere in the file name. /// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name). -pub const LOG_FILE_PATTERN: &'static str = "LOG_FILE_PATTERN"; - -pub fn parse_log_file_pattern_from_env() -> Result<LogFilePattern, String> { - return match env::var(LOG_FILE_PATTERN) { +pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> { + return match env::var(variable_name) { Ok(str) => { - let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", LOG_FILE_PATTERN))?; + let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?; parse_log_file_pattern_from_str(&pattern_str) } Err(err) => match err { - VarError::NotPresent => Err(format!("Environment variable {} must be set.", LOG_FILE_PATTERN)), - VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", LOG_FILE_PATTERN)) + VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)), + VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name)) } }; } diff --git a/src/log_watcher.rs b/src/log_watcher.rs index bd93f3e..d453560 100644 --- a/src/log_watcher.rs +++ b/src/log_watcher.rs @@ -3,47 +3,98 @@ use std::io; use std::io::{Error, ErrorKind}; use std::path::PathBuf; -use linemux::MuxedLines; +use linemux::{Line, MuxedLines}; use tokio::sync::mpsc::UnboundedSender; use crate::ApacheMetrics; use crate::log_file_pattern::LogFilePath; -pub async fn read_logs_task(log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) { - if let Err(error) = read_logs(log_files, metrics).await { +#[derive(Copy, Clone, PartialEq)] +enum LogFileKind { + Access, + Error, +} + +struct LogFileInfo<'a> { + pub kind: LogFileKind, + pub label: &'a String, +} + +pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) { + if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await { println!("[LogWatcher] Error reading logs: {}", error); shutdown_send.send(()).unwrap(); } } -async fn read_logs(log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> { - let mut file_reader = MuxedLines::new()?; - let mut label_lookup: HashMap<PathBuf, &String> = HashMap::new(); - - for log_file in &log_files { - let lookup_key = file_reader.add_file(&log_file.path).await?; - label_lookup.insert(lookup_key, &log_file.label); +struct LogWatcher<'a> { + reader: MuxedLines, + files: HashMap<PathBuf, LogFileInfo<'a>>, +} + +impl<'a> LogWatcher<'a> { + fn new() -> io::Result<LogWatcher<'a>> { + return Ok(LogWatcher { + reader: MuxedLines::new()?, + files: HashMap::new(), + }); } - if log_files.is_empty() { - println!("[LogWatcher] No log files provided."); - return Err(Error::from(ErrorKind::Unsupported)); + fn count_files_of_kind(&self, kind: LogFileKind) -> usize { + return self.files.values().filter(|info| info.kind == kind).count(); } - println!("[LogWatcher] Watching {} log file(s).", log_files.len()); + async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> { + let lookup_key = self.reader.add_file(&log_file.path).await?; + self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label }); + Ok(()) + } - loop { - let event_result = file_reader.next_line().await?; - if let Some(event) = event_result { - match label_lookup.get(event.source()) { - Some(&label) => { - println!("[LogWatcher] Received line from \"{}\": {}", label, event.line()); - metrics.requests_total.get_or_create(&("file", label.clone())).inc(); - } - None => { - println!("[LogWatcher] Received line from unknown file: {}", event.source().display()); - } + async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> { + if self.files.is_empty() { + println!("[LogWatcher] No log files provided."); + return Err(Error::from(ErrorKind::Unsupported)); + } + + println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error)); + + loop { + if let Some(event) = self.reader.next_line().await? { + self.handle_line(event, metrics); + } + } + } + + fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) { + match self.files.get(event.source()) { + Some(metadata) => { + let label = metadata.label; + let (kind, family) = match metadata.kind { + LogFileKind::Access => ("access log", &metrics.requests_total), + LogFileKind::Error => ("error log", &metrics.errors_total), + }; + + println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line()); + family.get_or_create(&("file", label.clone())).inc(); + } + None => { + println!("[LogWatcher] Received line from unknown file: {}", event.source().display()); } } } } + +async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> { + let mut watcher = LogWatcher::new()?; + + for log_file in &access_log_files { + watcher.add_file(log_file, LogFileKind::Access).await?; + } + + for log_file in &error_log_files { + watcher.add_file(log_file, LogFileKind::Error).await?; + } + + watcher.start_watching(&metrics).await?; + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 21fbe46..1b769e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,13 @@ use std::env; +use std::process::ExitCode; use std::sync::Mutex; use tokio::signal; use tokio::sync::mpsc; use crate::apache_metrics::ApacheMetrics; -use crate::log_file_pattern::parse_log_file_pattern_from_env; -use crate::log_watcher::read_logs_task; +use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; +use crate::log_watcher::watch_logs_task; use crate::web_server::{create_web_server, run_web_server}; mod log_file_pattern; @@ -14,41 +15,58 @@ mod log_watcher; mod apache_metrics; mod web_server; -#[tokio::main(flavor = "current_thread")] -async fn main() { - let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1")); - - println!("Initializing exporter..."); - - let log_file_pattern = match parse_log_file_pattern_from_env() { +const ACCESS_LOG_FILE_PATTERN: &'static str = "ACCESS_LOG_FILE_PATTERN"; +const ERROR_LOG_FILE_PATTERN: &'static str = "ERROR_LOG_FILE_PATTERN"; + +fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> { + let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) { Ok(pattern) => pattern, Err(error) => { println!("Error: {}", error); - return; + return None; } }; let log_files = match log_file_pattern.search() { Ok(files) => files, Err(error) => { - println!("Error searching log files: {}", error); - return; + println!("Error searching {} files: {}", log_kind, error); + return None; } }; if log_files.is_empty() { - println!("Found no matching log files."); - return; + println!("Found no matching {} files.", log_kind); + return None; } for log_file in &log_files { - println!("Found log file: {} (label \"{}\")", log_file.path.display(), log_file.label); + println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label); } + return Some(log_files); +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> ExitCode { + let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1")); + + println!("Initializing exporter..."); + + let access_log_files = match find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") { + Some(files) => files, + None => return ExitCode::FAILURE, + }; + + let error_log_files = match find_log_files(ERROR_LOG_FILE_PATTERN, "error log") { + Some(files) => files, + None => return ExitCode::FAILURE, + }; + let (metrics_registry, metrics) = ApacheMetrics::new(); let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel(); - tokio::spawn(read_logs_task(log_files, metrics.clone(), shutdown_send.clone())); + tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone())); tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry)))); drop(shutdown_send); @@ -62,4 +80,6 @@ async fn main() { println!("Shutting down..."); } } + + ExitCode::SUCCESS }