From bbc416b8d3feeb39603cb88770f175ea1a677b5d Mon Sep 17 00:00:00 2001 From: chylex <contact@chylex.com> Date: Wed, 4 Oct 2023 13:40:59 +0200 Subject: [PATCH] Refactor module structure and file names --- .../access_log_parser.rs} | 0 .../filesystem_watcher.rs} | 0 src/{ => logs}/log_file_pattern.rs | 2 +- .../log_file_watcher.rs} | 34 ++++-------- src/logs/mod.rs | 52 +++++++++++++++++++ src/main.rs | 52 ++++--------------- src/{apache_metrics.rs => metrics.rs} | 13 ++--- src/web/metrics_endpoint.rs | 42 +++++++++++++++ src/{web_server.rs => web/mod.rs} | 41 ++------------- 9 files changed, 123 insertions(+), 113 deletions(-) rename src/{log_parser.rs => logs/access_log_parser.rs} (100%) rename src/{fs_watcher.rs => logs/filesystem_watcher.rs} (100%) rename src/{ => logs}/log_file_pattern.rs (98%) rename src/{log_watcher.rs => logs/log_file_watcher.rs} (88%) create mode 100644 src/logs/mod.rs rename src/{apache_metrics.rs => metrics.rs} (68%) create mode 100644 src/web/metrics_endpoint.rs rename src/{web_server.rs => web/mod.rs} (55%) diff --git a/src/log_parser.rs b/src/logs/access_log_parser.rs similarity index 100% rename from src/log_parser.rs rename to src/logs/access_log_parser.rs diff --git a/src/fs_watcher.rs b/src/logs/filesystem_watcher.rs similarity index 100% rename from src/fs_watcher.rs rename to src/logs/filesystem_watcher.rs diff --git a/src/log_file_pattern.rs b/src/logs/log_file_pattern.rs similarity index 98% rename from src/log_file_pattern.rs rename to src/logs/log_file_pattern.rs index 6c20289..a9b545a 100644 --- a/src/log_file_pattern.rs +++ b/src/logs/log_file_pattern.rs @@ -178,7 +178,7 @@ impl LogFilePath { #[cfg(test)] mod tests { - use crate::log_file_pattern::{LogFilePattern, parse_log_file_pattern_from_str}; + use super::{LogFilePattern, parse_log_file_pattern_from_str}; #[test] fn empty_path() { diff --git a/src/log_watcher.rs b/src/logs/log_file_watcher.rs similarity index 88% rename from src/log_watcher.rs rename to src/logs/log_file_watcher.rs index 3d662d2..1dab437 100644 --- a/src/log_watcher.rs +++ b/src/logs/log_file_watcher.rs @@ -9,12 +9,12 @@ use tokio::io::{AsyncBufReadExt, BufReader, Lines}; use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; -use crate::ApacheMetrics; -use crate::fs_watcher::{FsEventCallbacks, FsWatcher}; -use crate::log_file_pattern::LogFilePath; +use crate::logs::filesystem_watcher::{FsEventCallbacks, FsWatcher}; +use crate::logs::log_file_pattern::LogFilePath; +use crate::metrics::Metrics; #[derive(Copy, Clone, PartialEq)] -enum LogFileKind { +pub enum LogFileKind { Access, Error, } @@ -30,26 +30,12 @@ impl LogFileMetadata { } } -pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> bool { - let mut watcher = LogWatcherConfiguration::new(); - - for log_file in access_log_files.into_iter() { - watcher.add_file(log_file, LogFileKind::Access); - } - - for log_file in error_log_files.into_iter() { - watcher.add_file(log_file, LogFileKind::Error); - } - - watcher.start(&metrics).await -} - -struct LogWatcherConfiguration { +pub struct LogWatcherConfiguration { files: Vec<(PathBuf, LogFileMetadata)>, } impl LogWatcherConfiguration { - fn new() -> LogWatcherConfiguration { + pub fn new() -> LogWatcherConfiguration { LogWatcherConfiguration { files: Vec::new() } } @@ -57,14 +43,14 @@ impl LogWatcherConfiguration { return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count(); } - fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) { + pub fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) { let path = log_file.path; let label = log_file.label; let metadata = LogFileMetadata { kind, label }; self.files.push((path, metadata)); } - async fn start(self, metrics: &ApacheMetrics) -> bool { + pub async fn start(self, metrics: &Metrics) -> bool { if self.files.is_empty() { println!("[LogWatcher] No log files provided."); return false; @@ -141,7 +127,7 @@ struct LogWatcher { } impl LogWatcher { - async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: ApacheMetrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> { + async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: Metrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> { let state = match LogWatchingState::initialize(path.clone(), fs_watcher).await { Some(state) => state, None => return None, @@ -261,7 +247,7 @@ impl LogWatchingState { struct LogLineProcessor { path: PathBuf, metadata: LogFileMetadata, - metrics: ApacheMetrics, + metrics: Metrics, } impl LogLineProcessor { diff --git a/src/logs/mod.rs b/src/logs/mod.rs new file mode 100644 index 0000000..ee5a320 --- /dev/null +++ b/src/logs/mod.rs @@ -0,0 +1,52 @@ +use log_file_watcher::{LogFileKind, LogWatcherConfiguration}; + +use crate::logs::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; +use crate::metrics::Metrics; + +mod access_log_parser; +mod filesystem_watcher; +mod log_file_pattern; +mod log_file_watcher; + +pub fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> { + let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) { + Ok(pattern) => pattern, + Err(error) => { + println!("Error: {}", error); + return None; + } + }; + + let log_files = match log_file_pattern.search() { + Ok(files) => files, + Err(error) => { + println!("Error searching {} files: {}", log_kind, error); + return None; + } + }; + + if log_files.is_empty() { + println!("Found no matching {} files.", log_kind); + return None; + } + + for log_file in &log_files { + println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label); + } + + Some(log_files) +} + +pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: Metrics) -> bool { + let mut watcher = LogWatcherConfiguration::new(); + + for log_file in access_log_files.into_iter() { + watcher.add_file(log_file, LogFileKind::Access); + } + + for log_file in error_log_files.into_iter() { + watcher.add_file(log_file, LogFileKind::Error); + } + + watcher.start(&metrics).await +} diff --git a/src/main.rs b/src/main.rs index ad919ee..de107b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,50 +6,16 @@ use std::sync::Mutex; use tokio::signal; -use crate::apache_metrics::ApacheMetrics; -use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; -use crate::log_watcher::start_log_watcher; -use crate::web_server::WebServer; +use crate::metrics::Metrics; +use crate::web::WebServer; -mod apache_metrics; -mod fs_watcher; -mod log_file_pattern; -mod log_parser; -mod log_watcher; -mod web_server; +mod logs; +mod metrics; +mod web; const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN"; const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN"; -fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> { - let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) { - Ok(pattern) => pattern, - Err(error) => { - println!("Error: {}", error); - return None; - } - }; - - let log_files = match log_file_pattern.search() { - Ok(files) => files, - Err(error) => { - println!("Error searching {} files: {}", log_kind, error); - return None; - } - }; - - if log_files.is_empty() { - println!("Found no matching {} files.", log_kind); - return None; - } - - for log_file in &log_files { - println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label); - } - - Some(log_files) -} - #[tokio::main(flavor = "current_thread")] async fn main() -> ExitCode { let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1")); @@ -63,12 +29,12 @@ async fn main() -> ExitCode { println!("Initializing exporter..."); - let access_log_files = match find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") { + let access_log_files = match logs::find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") { Some(files) => files, None => return ExitCode::FAILURE, }; - let error_log_files = match find_log_files(ERROR_LOG_FILE_PATTERN, "error log") { + let error_log_files = match logs::find_log_files(ERROR_LOG_FILE_PATTERN, "error log") { Some(files) => files, None => return ExitCode::FAILURE, }; @@ -78,9 +44,9 @@ async fn main() -> ExitCode { None => return ExitCode::FAILURE }; - let (metrics_registry, metrics) = ApacheMetrics::new(); + let (metrics_registry, metrics) = Metrics::new(); - if !start_log_watcher(access_log_files, error_log_files, metrics).await { + if !logs::start_log_watcher(access_log_files, error_log_files, metrics).await { return ExitCode::FAILURE; } diff --git a/src/apache_metrics.rs b/src/metrics.rs similarity index 68% rename from src/apache_metrics.rs rename to src/metrics.rs index 8e9d0c9..b45133a 100644 --- a/src/apache_metrics.rs +++ b/src/metrics.rs @@ -4,20 +4,17 @@ use prometheus_client::registry::Registry; type SingleLabel = [(&'static str, String); 1]; -#[derive(Clone)] -pub struct ApacheMetrics { +#[derive(Clone, Default)] +pub struct Metrics { pub requests_total: Family<SingleLabel, Counter>, pub errors_total: Family<SingleLabel, Counter> } -impl ApacheMetrics { - pub fn new() -> (Registry, ApacheMetrics) { +impl Metrics { + pub fn new() -> (Registry, Metrics) { let mut registry = <Registry>::default(); - let metrics = ApacheMetrics { - requests_total: Family::<SingleLabel, Counter>::default(), - errors_total: Family::<SingleLabel, Counter>::default() - }; + let metrics = Metrics::default(); registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone()); registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone()); diff --git a/src/web/metrics_endpoint.rs b/src/web/metrics_endpoint.rs new file mode 100644 index 0000000..9e31cc5 --- /dev/null +++ b/src/web/metrics_endpoint.rs @@ -0,0 +1,42 @@ +use std::fmt; +use std::sync::{Arc, Mutex}; + +use hyper::{Body, http, Response, StatusCode}; +use hyper::header::CONTENT_TYPE; +use prometheus_client::encoding::text::encode; +use prometheus_client::registry::Registry; + +//noinspection SpellCheckingInspection +const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8"; + +pub async fn handle(metrics_registry: Arc<Mutex<Registry>>) -> http::Result<Response<Body>> { + match try_encode(metrics_registry) { + MetricsEncodeResult::Ok(buf) => { + Response::builder().status(StatusCode::OK).header(CONTENT_TYPE, METRICS_CONTENT_TYPE).body(Body::from(buf)) + } + MetricsEncodeResult::FailedAcquiringRegistryLock => { + println!("[WebServer] Failed acquiring lock on registry."); + Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()) + } + MetricsEncodeResult::FailedEncodingMetrics(e) => { + println!("[WebServer] Error encoding metrics: {}", e); + Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()) + } + } +} + +enum MetricsEncodeResult { + Ok(String), + FailedAcquiringRegistryLock, + FailedEncodingMetrics(fmt::Error), +} + +fn try_encode(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult { + let mut buf = String::new(); + + return if let Ok(metrics_registry) = metrics_registry.lock() { + encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf)) + } else { + MetricsEncodeResult::FailedAcquiringRegistryLock + }; +} diff --git a/src/web_server.rs b/src/web/mod.rs similarity index 55% rename from src/web_server.rs rename to src/web/mod.rs index 64aa7ce..5fb2048 100644 --- a/src/web_server.rs +++ b/src/web/mod.rs @@ -1,16 +1,16 @@ -use std::fmt; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::Duration; -use hyper::{Body, Error, header, Method, Request, Response, Server, StatusCode}; +use hyper::{Body, Error, Method, Request, Response, Server, StatusCode}; use hyper::http::Result; use hyper::server::Builder; use hyper::server::conn::AddrIncoming; use hyper::service::{make_service_fn, service_fn}; -use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; +mod metrics_endpoint; + const MAX_BUFFER_SIZE: usize = 1024 * 32; pub struct WebServer { @@ -55,41 +55,8 @@ impl WebServer { async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> { if req.method() == Method::GET && req.uri().path() == "/metrics" { - metrics_handler(Arc::clone(&metrics_registry)).await + metrics_endpoint::handle(Arc::clone(&metrics_registry)).await } else { Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()) } } - -//noinspection SpellCheckingInspection -async fn metrics_handler(metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> { - match encode_metrics(metrics_registry) { - MetricsEncodeResult::Ok(buf) => { - Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8").body(Body::from(buf)) - } - MetricsEncodeResult::FailedAcquiringRegistryLock => { - println!("[WebServer] Failed acquiring lock on registry."); - Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()) - } - MetricsEncodeResult::FailedEncodingMetrics(e) => { - println!("[WebServer] Error encoding metrics: {}", e); - Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()) - } - } -} - -enum MetricsEncodeResult { - Ok(String), - FailedAcquiringRegistryLock, - FailedEncodingMetrics(fmt::Error), -} - -fn encode_metrics(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult { - let mut buf = String::new(); - - return if let Ok(metrics_registry) = metrics_registry.lock() { - encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf)) - } else { - MetricsEncodeResult::FailedAcquiringRegistryLock - }; -}