From 4c951a5bfeb5ac1ab34a2d06d3e29fec533716e5 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Thu, 25 Jan 2024 17:16:27 +0200 Subject: [PATCH] health draft --- Cargo.lock | 44 +++++++++++++++ agent/Cargo.toml | 2 + agent/src/health.rs | 131 ++++++++++++++++++++++++++++++++++++++++++++ agent/src/main.rs | 119 ++++++++++++---------------------------- 4 files changed, 213 insertions(+), 83 deletions(-) create mode 100644 agent/src/health.rs diff --git a/Cargo.lock b/Cargo.lock index d8524fd..bb1512d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,6 +487,15 @@ dependencies = [ "signatory", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -656,7 +665,9 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats", + "serde", "serde_json", + "sysinfo", "tokio", "tokio-stream", "tracing", @@ -1032,6 +1043,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sysinfo" +version = "0.30.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb4f3438c8f6389c864e61221cbc97e9bca98b4daf39a5beb7bea660f528bb2" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "windows", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -1309,6 +1334,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.0", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index e364e56..63ecf80 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -6,7 +6,9 @@ edition = "2021" [dependencies] anyhow = "1.0.79" async-nats = "0.33.0" +serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" +sysinfo = { version = "0.30.5", default-features = false } tokio = { version = "1.35.1", features = ["full"] } tokio-stream = { version = "0.1.14", default-features = false } tracing = "0.1.40" diff --git a/agent/src/health.rs b/agent/src/health.rs new file mode 100644 index 0000000..cb682e5 --- /dev/null +++ b/agent/src/health.rs @@ -0,0 +1,131 @@ +//! System health information and checking + +use std::sync::Arc; + +use serde::Serialize; +use tokio::sync::watch; + +const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; +const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; +const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; + +pub struct System { + sys: sysinfo::System, + disks: sysinfo::Disks, +} + +impl System { + pub fn new() -> Self { + Self { + sys: sysinfo::System::new(), + disks: sysinfo::Disks::new(), + } + } + + pub fn refresh_resources(&mut self) { + use sysinfo::{CpuRefreshKind, MemoryRefreshKind}; + + self.sys.refresh_specifics( + sysinfo::RefreshKind::new() + .with_memory(MemoryRefreshKind::new().with_ram().with_swap()) + .with_cpu(CpuRefreshKind::new().with_cpu_usage()), + ); + + self.disks.refresh_list(); + } + + pub fn system(&self) -> &sysinfo::System { + &self.sys + } + + pub fn disks(&self) -> &sysinfo::Disks { + &self.disks + } +} + +#[derive(Debug, Default, PartialEq, Serialize)] +pub enum Status { + #[default] + Normal, + MemoryWarning, + MemoryCritical, + CpuWarning, + CpuCritical, + DiskWarning, + DiskCritical, +} + +#[derive(Debug, Serialize)] +pub struct Health { + status: Vec, +} + +#[derive(Clone)] +pub struct HealthMonitor(Arc>); + +impl HealthMonitor { + pub fn new() -> Self { + let (sender, _) = watch::channel(Health { + status: [Status::default()].into(), + }); + + Self(Arc::new(sender)) + } + + pub fn check_system(&self, system: &System) { + let s = system.system(); + + let memory_usage = if s.total_memory() > 0 { + s.used_memory() as f64 * 100.0 / s.total_memory() as f64 + } else { + 0.0 + }; + + let cpu_usage = s.global_cpu_info().cpu_usage(); + + // for d in system.disks().list() { + // let avail = if d.total_space() > 0 { + // (d.available_space() * 100 / d.total_space()) as u8 + // } else { + // 0 as u8 + // }; + // } + + self.0.send_if_modified(|health| { + let cpu_changed = if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD { + health + .status + .iter() + .find(|&it| *it == Status::CpuCritical) + .and(None) + .unwrap_or_else(|| { + health.status.push(Status::CpuCritical); + true + }) + } else { + false + }; + + let mem_changed = if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD { + health + .status + .iter() + .find(|&it| *it == Status::MemoryCritical) + .and(None) + .unwrap_or_else(|| { + health.status.push(Status::MemoryCritical); + true + }) + } else { + false + }; + + dbg!(cpu_changed || mem_changed); + cpu_changed || mem_changed + }); + } + + pub fn monitor(&self) -> watch::Receiver { + self.0.subscribe() + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index c90146c..d0d9f6c 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,7 +1,10 @@ -use async_nats as nats; -use nats::{Client, ConnectOptions}; +use std::time::Duration; + +use async_nats::{Client, ConnectOptions}; use tracing::Level; +mod health; + #[tokio::main] async fn main() -> anyhow::Result<()> { let subscriber = tracing_subscriber::fmt() @@ -20,88 +23,38 @@ async fn main() -> anyhow::Result<()> { .map_err(|err| err)?; tracing::info!("connected to nats server"); - wait_for_commands(client).await; + + init_health_subsystem(client); + + // wait_for_commands(client).await; + + tokio::signal::ctrl_c().await?; Ok(()) } -async fn wait_for_commands(_client: Client) { - // let mut sub = client - // .subscribe("agents.v1.demo_agent.cmd.*") - // .await - // .unwrap(); - // - // let mut command_queue = CommandQueue::default(); - // - // while let Some(msg) = sub.next().await { - // let suffix = msg.subject.trim_start_matches("agents.v1.demo_agent.cmd."); - // - // match suffix { - // "end" => { - // let key = std::str::from_utf8(&msg.payload).unwrap(); - // command_queue.end_command(key); - // } - // key => { - // if let Some(mut receiver) = command_queue.add_command(key) { - // tokio::spawn(async move { - // while let Ok(()) = receiver.changed().await { - // let queue = receiver.borrow(); - // while let Some(cmd) = queue.lock().unwrap().pop_back() { - // handle_command(cmd); - // } - // } - // }); - // } - // } - // } - // } +fn init_health_subsystem(client: Client) { + let system = health::System::new(); + let health_monitor = health::HealthMonitor::new(); + let health_monitor_clone = health_monitor.clone(); + + std::thread::spawn(move || loop { + const REFRESH_INTERVAL: Duration = Duration::from_secs(1); + health_monitor.check_system(&system); + std::thread::sleep(REFRESH_INTERVAL); + }); + + tokio::spawn(async move { + let mut recv = health_monitor_clone.monitor(); + + while let Ok(()) = recv.changed().await { + let payload = serde_json::to_string(&*recv.borrow()).unwrap(); + + tracing::info!(%payload, "sending health event"); + + client + .publish("agents.v1.demo_agent.health", payload.into()) + .await + .unwrap(); + } + }); } -// -// fn handle_command(cmd: Command) { -// println!("{cmd:?}"); -// } -// -// #[derive(Default)] -// struct CommandQueue(HashMap>>>); -// -// impl CommandQueue { -// pub fn add_command(&mut self, key: &str) -> Option>>> { -// match self.0.get_mut(key) { -// Some(sender) => { -// sender.send_modify(|q| q.lock().unwrap().push_back(Command::Foo)); -// None -// } -// None => { -// let (sender, receiver) = watch::channel(Mutex::new(VecDeque::new())); -// sender.send_modify(|q| q.lock().unwrap().push_back(Command::Foo)); -// Some(receiver) -// } -// } -// } -// -// pub fn end_command(&mut self, key: &str) { -// self.0.remove(key); -// } -// } -// -// mod cmd { -// // use std::borrow::Cow; -// -// #[derive(Debug, Clone)] -// pub enum Command { -// Foo, -// } -// -// #[derive(Debug)] -// pub struct UnknownCommand<'a>(&'a str); -// -// impl<'a> TryFrom<&'a str> for Command { -// type Error = UnknownCommand<'a>; -// -// fn try_from(cmd: &'a str) -> Result { -// match cmd { -// "foo" => Ok(Command::Foo), -// _ => Err(UnknownCommand(cmd)), -// } -// } -// } -// }