Compare commits

...

2 Commits

5 changed files with 141 additions and 55 deletions

View File

@ -44,7 +44,8 @@ services:
- logs:/logs - logs:/logs
environment: environment:
HTTP_HOST: "0.0.0.0" HTTP_HOST: "0.0.0.0"
LOG_FILE_PATTERN: "/logs/*.access.log" ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
restart: "always" restart: "always"
volumes: volumes:

View File

@ -2,22 +2,26 @@ use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family; use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
type SingleLabel = (&'static str, String);
#[derive(Clone)] #[derive(Clone)]
pub struct ApacheMetrics { pub struct ApacheMetrics {
pub requests_total: Family<(&'static str, String), Counter> pub requests_total: Family<SingleLabel, Counter>,
pub errors_total: Family<SingleLabel, Counter>
} }
impl ApacheMetrics { impl ApacheMetrics {
pub fn new() -> (Registry, ApacheMetrics) { pub fn new() -> (Registry, ApacheMetrics) {
let mut registry = <Registry>::default(); let mut registry = <Registry>::default();
let requests_total = Family::<(&'static str, String), Counter>::default();
registry.register("apache_requests", "Number of received requests", Box::new(requests_total.clone()));
let metrics = ApacheMetrics { let metrics = ApacheMetrics {
requests_total requests_total: Family::<SingleLabel, Counter>::default(),
errors_total: Family::<SingleLabel, Counter>::default()
}; };
registry.register("apache_requests", "Number of received requests", Box::new(metrics.requests_total.clone()));
registry.register("apache_errors", "Number of logged errors", Box::new(metrics.errors_total.clone()));
return (registry, metrics); return (registry, metrics);
} }
} }

View File

@ -6,24 +6,22 @@ use std::path::{Path, PathBuf};
use path_slash::PathExt; use path_slash::PathExt;
/// Environment variable that determines the path and file name pattern of log files. /// Reads and parses an environment variable that determines the path and file name pattern of log files.
/// ///
/// Supports 3 pattern types: /// Supports 3 pattern types:
/// ///
/// 1. A simple path to a file. /// 1. A simple path to a file.
/// 2. A path with a wildcard anywhere in the file name. /// 2. A path with a wildcard anywhere in the file name.
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name). /// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
pub const LOG_FILE_PATTERN: &'static str = "LOG_FILE_PATTERN"; pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> {
return match env::var(variable_name) {
pub fn parse_log_file_pattern_from_env() -> Result<LogFilePattern, String> {
return match env::var(LOG_FILE_PATTERN) {
Ok(str) => { Ok(str) => {
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", LOG_FILE_PATTERN))?; let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?;
parse_log_file_pattern_from_str(&pattern_str) parse_log_file_pattern_from_str(&pattern_str)
} }
Err(err) => match err { Err(err) => match err {
VarError::NotPresent => Err(format!("Environment variable {} must be set.", LOG_FILE_PATTERN)), VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)),
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", LOG_FILE_PATTERN)) VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name))
} }
}; };
} }

View File

@ -3,47 +3,110 @@ use std::io;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use std::path::PathBuf; use std::path::PathBuf;
use linemux::MuxedLines; use linemux::{Line, MuxedLines};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use crate::ApacheMetrics; use crate::ApacheMetrics;
use crate::log_file_pattern::LogFilePath; use crate::log_file_pattern::LogFilePath;
pub async fn read_logs_task(log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) { #[derive(Copy, Clone, PartialEq)]
if let Err(error) = read_logs(log_files, metrics).await { enum LogFileKind {
Access,
Error,
}
struct LogFileInfo<'a> {
pub kind: LogFileKind,
pub label: &'a String,
}
impl<'a> LogFileInfo<'a> {
fn get_label_set(&self) -> (&'static str, String) {
return ("file", self.label.clone());
}
}
pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await {
println!("[LogWatcher] Error reading logs: {}", error); println!("[LogWatcher] Error reading logs: {}", error);
shutdown_send.send(()).unwrap(); shutdown_send.send(()).unwrap();
} }
} }
async fn read_logs(log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> { struct LogWatcher<'a> {
let mut file_reader = MuxedLines::new()?; reader: MuxedLines,
let mut label_lookup: HashMap<PathBuf, &String> = HashMap::new(); files: HashMap<PathBuf, LogFileInfo<'a>>,
}
for log_file in &log_files {
let lookup_key = file_reader.add_file(&log_file.path).await?; impl<'a> LogWatcher<'a> {
label_lookup.insert(lookup_key, &log_file.label); fn new() -> io::Result<LogWatcher<'a>> {
return Ok(LogWatcher {
reader: MuxedLines::new()?,
files: HashMap::new(),
});
} }
if log_files.is_empty() { fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
println!("[LogWatcher] No log files provided."); return self.files.values().filter(|info| info.kind == kind).count();
return Err(Error::from(ErrorKind::Unsupported));
} }
println!("[LogWatcher] Watching {} log file(s).", log_files.len()); async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> {
let lookup_key = self.reader.add_file(&log_file.path).await?;
self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label });
Ok(())
}
loop { async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> {
let event_result = file_reader.next_line().await?; if self.files.is_empty() {
if let Some(event) = event_result { println!("[LogWatcher] No log files provided.");
match label_lookup.get(event.source()) { return Err(Error::from(ErrorKind::Unsupported));
Some(&label) => { }
println!("[LogWatcher] Received line from \"{}\": {}", label, event.line());
metrics.requests_total.get_or_create(&("file", label.clone())).inc(); println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
}
None => { for metadata in self.files.values() {
println!("[LogWatcher] Received line from unknown file: {}", event.source().display()); let label_set = metadata.get_label_set();
} let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set);
}
loop {
if let Some(event) = self.reader.next_line().await? {
self.handle_line(event, metrics);
}
}
}
fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) {
match self.files.get(event.source()) {
Some(metadata) => {
let label = metadata.label;
let (kind, family) = match metadata.kind {
LogFileKind::Access => ("access log", &metrics.requests_total),
LogFileKind::Error => ("error log", &metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line());
family.get_or_create(&metadata.get_label_set()).inc();
}
None => {
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
} }
} }
} }
} }
async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
let mut watcher = LogWatcher::new()?;
for log_file in &access_log_files {
watcher.add_file(log_file, LogFileKind::Access).await?;
}
for log_file in &error_log_files {
watcher.add_file(log_file, LogFileKind::Error).await?;
}
watcher.start_watching(&metrics).await?;
Ok(())
}

View File

@ -1,12 +1,13 @@
use std::env; use std::env;
use std::process::ExitCode;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::signal; use tokio::signal;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics; use crate::apache_metrics::ApacheMetrics;
use crate::log_file_pattern::parse_log_file_pattern_from_env; use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
use crate::log_watcher::read_logs_task; use crate::log_watcher::watch_logs_task;
use crate::web_server::{create_web_server, run_web_server}; use crate::web_server::{create_web_server, run_web_server};
mod log_file_pattern; mod log_file_pattern;
@ -14,41 +15,58 @@ mod log_watcher;
mod apache_metrics; mod apache_metrics;
mod web_server; mod web_server;
#[tokio::main(flavor = "current_thread")] const ACCESS_LOG_FILE_PATTERN: &'static str = "ACCESS_LOG_FILE_PATTERN";
async fn main() { const ERROR_LOG_FILE_PATTERN: &'static str = "ERROR_LOG_FILE_PATTERN";
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> {
println!("Initializing exporter..."); let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) {
let log_file_pattern = match parse_log_file_pattern_from_env() {
Ok(pattern) => pattern, Ok(pattern) => pattern,
Err(error) => { Err(error) => {
println!("Error: {}", error); println!("Error: {}", error);
return; return None;
} }
}; };
let log_files = match log_file_pattern.search() { let log_files = match log_file_pattern.search() {
Ok(files) => files, Ok(files) => files,
Err(error) => { Err(error) => {
println!("Error searching log files: {}", error); println!("Error searching {} files: {}", log_kind, error);
return; return None;
} }
}; };
if log_files.is_empty() { if log_files.is_empty() {
println!("Found no matching log files."); println!("Found no matching {} files.", log_kind);
return; return None;
} }
for log_file in &log_files { for log_file in &log_files {
println!("Found log file: {} (label \"{}\")", log_file.path.display(), log_file.label); println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
} }
return Some(log_files);
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode {
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
println!("Initializing exporter...");
let access_log_files = match find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") {
Some(files) => files,
None => return ExitCode::FAILURE,
};
let error_log_files = match find_log_files(ERROR_LOG_FILE_PATTERN, "error log") {
Some(files) => files,
None => return ExitCode::FAILURE,
};
let (metrics_registry, metrics) = ApacheMetrics::new(); let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel(); let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
tokio::spawn(read_logs_task(log_files, metrics.clone(), shutdown_send.clone())); tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry)))); tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry))));
drop(shutdown_send); drop(shutdown_send);
@ -62,4 +80,6 @@ async fn main() {
println!("Shutting down..."); println!("Shutting down...");
} }
} }
ExitCode::SUCCESS
} }