Compare commits

...

2 Commits

Author SHA1 Message Date
chylex 7def8921b0
Use SIGINT stop signal in Docker Compose example 2023-09-30 23:40:04 +02:00
chylex 383a187358
Migrate from actix-web to hyper 2023-09-30 23:39:39 +02:00
5 changed files with 144 additions and 820 deletions

849
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@ lto = true
codegen-units = 1
[dependencies]
actix-web = "4.4.0"
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
linemux = "0.3.0"
path-slash = "0.2.1"
prometheus-client = "0.21.2"

View File

@ -46,6 +46,7 @@ services:
HTTP_HOST: "0.0.0.0"
ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
stop_signal: SIGINT
restart: "always"
volumes:

View File

@ -1,5 +1,7 @@
use std::env;
use std::net::{IpAddr, SocketAddr};
use std::process::ExitCode;
use std::str::FromStr;
use std::sync::Mutex;
use tokio::signal;
@ -8,7 +10,7 @@ use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics;
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
use crate::log_watcher::watch_logs_task;
use crate::web_server::{create_web_server, run_web_server};
use crate::web_server::WebServer;
mod apache_metrics;
mod log_file_pattern;
@ -51,6 +53,13 @@ fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec
#[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode {
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
let bind_ip = match IpAddr::from_str(&host) {
Ok(addr) => addr,
Err(_) => {
println!("Invalid HTTP host: {}", host);
return ExitCode::FAILURE;
}
};
println!("Initializing exporter...");
@ -64,11 +73,16 @@ async fn main() -> ExitCode {
None => return ExitCode::FAILURE,
};
let server = match WebServer::try_bind(SocketAddr::new(bind_ip, 9240)) {
Some(server) => server,
None => return ExitCode::FAILURE
};
let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry))));
tokio::spawn(server.serve(Mutex::new(metrics_registry)));
drop(shutdown_send);

View File

@ -1,55 +1,81 @@
use std::{fmt, str};
use std::sync::Mutex;
use std::fmt;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use actix_web::{App, HttpResponse, HttpServer, Result, web};
use actix_web::dev::Server;
use hyper::{Body, Error, header, Method, Request, Response, Server, StatusCode};
use hyper::http::Result;
use hyper::server::Builder;
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
//noinspection HttpUrlsUsage
pub fn create_web_server(host: &str, port: u16, metrics_registry: Mutex<Registry>) -> Server {
let metrics_registry = web::Data::new(metrics_registry);
let server = HttpServer::new(move || {
App::new()
.app_data(metrics_registry.clone())
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
});
let server = server.keep_alive(Duration::from_secs(60));
let server = server.shutdown_timeout(0);
let server = server.disable_signals();
let server = server.workers(1);
let server = server.bind((host, port));
const MAX_BUFFER_SIZE: usize = 1024 * 32;
println!("[WebServer] Starting web server on {0}:{1} with metrics endpoint: http://{0}:{1}/metrics", host, port);
server.unwrap().run()
pub struct WebServer {
builder: Builder<AddrIncoming>,
}
pub async fn run_web_server(server: Server) {
if let Err(e) = server.await {
println!("[WebServer] Error running web server: {}", e);
impl WebServer {
//noinspection HttpUrlsUsage
pub fn try_bind(addr: SocketAddr) -> Option<WebServer> {
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
let builder = match Server::try_bind(&addr) {
Ok(builder) => builder,
Err(e) => {
println!("[WebServer] Could not bind to {}: {}", addr, e);
return None;
}
};
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
let builder = builder.http1_only(true);
let builder = builder.http1_keepalive(true);
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
Some(WebServer { builder })
}
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
let metrics_registry = Arc::new(metrics_registry);
let service = make_service_fn(move |_| {
let metrics_registry = Arc::clone(&metrics_registry);
async move {
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
}
});
if let Err(e) = self.builder.serve(service).await {
println!("[WebServer] Error starting web server: {}", e);
}
}
}
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
if req.method() == Method::GET && req.uri().path() == "/metrics" {
metrics_handler(Arc::clone(&metrics_registry)).await
} else {
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
}
}
//noinspection SpellCheckingInspection
async fn metrics_handler(metrics_registry: web::Data<Mutex<Registry>>) -> Result<HttpResponse> {
let response = match encode_metrics(metrics_registry) {
async fn metrics_handler(metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
match encode_metrics(metrics_registry) {
MetricsEncodeResult::Ok(buf) => {
HttpResponse::Ok().content_type("application/openmetrics-text; version=1.0.0; charset=utf-8").body(buf)
Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8").body(Body::from(buf))
}
MetricsEncodeResult::FailedAcquiringRegistryLock => {
println!("[WebServer] Failed acquiring lock on registry.");
HttpResponse::InternalServerError().body("")
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
MetricsEncodeResult::FailedEncodingMetrics(e) => {
println!("[WebServer] Error encoding metrics: {}", e);
HttpResponse::InternalServerError().body("")
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
};
Ok(response)
}
}
enum MetricsEncodeResult {
@ -58,12 +84,12 @@ enum MetricsEncodeResult {
FailedEncodingMetrics(fmt::Error),
}
fn encode_metrics(metrics_registry: web::Data<Mutex<Registry>>) -> MetricsEncodeResult {
fn encode_metrics(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
let mut buf = String::new();
return if let Ok(metrics_registry) = metrics_registry.lock() {
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
} else {
MetricsEncodeResult::FailedAcquiringRegistryLock
}
};
}