health draft
This commit is contained in:
		
							parent
							
								
									221b4348d8
								
							
						
					
					
						commit
						4c951a5bfe
					
				
					 4 changed files with 213 additions and 83 deletions
				
			
		
							
								
								
									
										44
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										44
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| 
						 | 
				
			
			@ -487,6 +487,15 @@ dependencies = [
 | 
			
		|||
 "signatory",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "ntapi"
 | 
			
		||||
version = "0.4.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "winapi",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "nu-ansi-term"
 | 
			
		||||
version = "0.46.0"
 | 
			
		||||
| 
						 | 
				
			
			@ -656,7 +665,9 @@ version = "0.1.0"
 | 
			
		|||
dependencies = [
 | 
			
		||||
 "anyhow",
 | 
			
		||||
 "async-nats",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "serde_json",
 | 
			
		||||
 "sysinfo",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tokio-stream",
 | 
			
		||||
 "tracing",
 | 
			
		||||
| 
						 | 
				
			
			@ -1032,6 +1043,20 @@ dependencies = [
 | 
			
		|||
 "unicode-ident",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "sysinfo"
 | 
			
		||||
version = "0.30.5"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "1fb4f3438c8f6389c864e61221cbc97e9bca98b4daf39a5beb7bea660f528bb2"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "cfg-if",
 | 
			
		||||
 "core-foundation-sys",
 | 
			
		||||
 "libc",
 | 
			
		||||
 "ntapi",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "windows",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "thiserror"
 | 
			
		||||
version = "1.0.56"
 | 
			
		||||
| 
						 | 
				
			
			@ -1309,6 +1334,25 @@ version = "0.4.0"
 | 
			
		|||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "windows"
 | 
			
		||||
version = "0.52.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "windows-core",
 | 
			
		||||
 "windows-targets 0.52.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "windows-core"
 | 
			
		||||
version = "0.52.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "windows-targets 0.52.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "windows-sys"
 | 
			
		||||
version = "0.48.0"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,7 +6,9 @@ edition = "2021"
 | 
			
		|||
[dependencies]
 | 
			
		||||
anyhow = "1.0.79"
 | 
			
		||||
async-nats = "0.33.0"
 | 
			
		||||
serde = { version = "1.0.195", features = ["derive"] }
 | 
			
		||||
serde_json = "1.0.111"
 | 
			
		||||
sysinfo = { version = "0.30.5", default-features = false }
 | 
			
		||||
tokio = { version = "1.35.1", features = ["full"] }
 | 
			
		||||
tokio-stream = { version = "0.1.14", default-features = false }
 | 
			
		||||
tracing = "0.1.40"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										131
									
								
								agent/src/health.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										131
									
								
								agent/src/health.rs
									
									
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,131 @@
 | 
			
		|||
//! System health information and checking
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
use serde::Serialize;
 | 
			
		||||
use tokio::sync::watch;
 | 
			
		||||
 | 
			
		||||
const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
 | 
			
		||||
const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
 | 
			
		||||
const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
 | 
			
		||||
 | 
			
		||||
pub struct System {
 | 
			
		||||
    sys: sysinfo::System,
 | 
			
		||||
    disks: sysinfo::Disks,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl System {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            sys: sysinfo::System::new(),
 | 
			
		||||
            disks: sysinfo::Disks::new(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn refresh_resources(&mut self) {
 | 
			
		||||
        use sysinfo::{CpuRefreshKind, MemoryRefreshKind};
 | 
			
		||||
 | 
			
		||||
        self.sys.refresh_specifics(
 | 
			
		||||
            sysinfo::RefreshKind::new()
 | 
			
		||||
                .with_memory(MemoryRefreshKind::new().with_ram().with_swap())
 | 
			
		||||
                .with_cpu(CpuRefreshKind::new().with_cpu_usage()),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        self.disks.refresh_list();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn system(&self) -> &sysinfo::System {
 | 
			
		||||
        &self.sys
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn disks(&self) -> &sysinfo::Disks {
 | 
			
		||||
        &self.disks
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Default, PartialEq, Serialize)]
 | 
			
		||||
pub enum Status {
 | 
			
		||||
    #[default]
 | 
			
		||||
    Normal,
 | 
			
		||||
    MemoryWarning,
 | 
			
		||||
    MemoryCritical,
 | 
			
		||||
    CpuWarning,
 | 
			
		||||
    CpuCritical,
 | 
			
		||||
    DiskWarning,
 | 
			
		||||
    DiskCritical,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Serialize)]
 | 
			
		||||
pub struct Health {
 | 
			
		||||
    status: Vec<Status>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct HealthMonitor(Arc<watch::Sender<Health>>);
 | 
			
		||||
 | 
			
		||||
impl HealthMonitor {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        let (sender, _) = watch::channel(Health {
 | 
			
		||||
            status: [Status::default()].into(),
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        Self(Arc::new(sender))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_system(&self, system: &System) {
 | 
			
		||||
        let s = system.system();
 | 
			
		||||
 | 
			
		||||
        let memory_usage = if s.total_memory() > 0 {
 | 
			
		||||
            s.used_memory() as f64 * 100.0 / s.total_memory() as f64
 | 
			
		||||
        } else {
 | 
			
		||||
            0.0
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let cpu_usage = s.global_cpu_info().cpu_usage();
 | 
			
		||||
 | 
			
		||||
        // for d in system.disks().list() {
 | 
			
		||||
        //     let avail = if d.total_space() > 0 {
 | 
			
		||||
        //         (d.available_space() * 100 / d.total_space()) as u8
 | 
			
		||||
        //     } else {
 | 
			
		||||
        //         0 as u8
 | 
			
		||||
        //     };
 | 
			
		||||
        // }
 | 
			
		||||
 | 
			
		||||
        self.0.send_if_modified(|health| {
 | 
			
		||||
            let cpu_changed = if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD {
 | 
			
		||||
                health
 | 
			
		||||
                    .status
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .find(|&it| *it == Status::CpuCritical)
 | 
			
		||||
                    .and(None)
 | 
			
		||||
                    .unwrap_or_else(|| {
 | 
			
		||||
                        health.status.push(Status::CpuCritical);
 | 
			
		||||
                        true
 | 
			
		||||
                    })
 | 
			
		||||
            } else {
 | 
			
		||||
                false
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            let mem_changed = if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD {
 | 
			
		||||
                health
 | 
			
		||||
                    .status
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .find(|&it| *it == Status::MemoryCritical)
 | 
			
		||||
                    .and(None)
 | 
			
		||||
                    .unwrap_or_else(|| {
 | 
			
		||||
                        health.status.push(Status::MemoryCritical);
 | 
			
		||||
                        true
 | 
			
		||||
                    })
 | 
			
		||||
            } else {
 | 
			
		||||
                false
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            dbg!(cpu_changed || mem_changed);
 | 
			
		||||
            cpu_changed || mem_changed
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn monitor(&self) -> watch::Receiver<Health> {
 | 
			
		||||
        self.0.subscribe()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,7 +1,10 @@
 | 
			
		|||
use async_nats as nats;
 | 
			
		||||
use nats::{Client, ConnectOptions};
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
 | 
			
		||||
use async_nats::{Client, ConnectOptions};
 | 
			
		||||
use tracing::Level;
 | 
			
		||||
 | 
			
		||||
mod health;
 | 
			
		||||
 | 
			
		||||
#[tokio::main]
 | 
			
		||||
async fn main() -> anyhow::Result<()> {
 | 
			
		||||
    let subscriber = tracing_subscriber::fmt()
 | 
			
		||||
| 
						 | 
				
			
			@ -20,88 +23,38 @@ async fn main() -> anyhow::Result<()> {
 | 
			
		|||
        .map_err(|err| err)?;
 | 
			
		||||
 | 
			
		||||
    tracing::info!("connected to nats server");
 | 
			
		||||
    wait_for_commands(client).await;
 | 
			
		||||
 | 
			
		||||
    init_health_subsystem(client);
 | 
			
		||||
 | 
			
		||||
    // wait_for_commands(client).await;
 | 
			
		||||
 | 
			
		||||
    tokio::signal::ctrl_c().await?;
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn wait_for_commands(_client: Client) {
 | 
			
		||||
    //    let mut sub = client
 | 
			
		||||
    //        .subscribe("agents.v1.demo_agent.cmd.*")
 | 
			
		||||
    //        .await
 | 
			
		||||
    //        .unwrap();
 | 
			
		||||
    //
 | 
			
		||||
    //    let mut command_queue = CommandQueue::default();
 | 
			
		||||
    //
 | 
			
		||||
    //    while let Some(msg) = sub.next().await {
 | 
			
		||||
    //        let suffix = msg.subject.trim_start_matches("agents.v1.demo_agent.cmd.");
 | 
			
		||||
    //
 | 
			
		||||
    //        match suffix {
 | 
			
		||||
    //            "end" => {
 | 
			
		||||
    //                let key = std::str::from_utf8(&msg.payload).unwrap();
 | 
			
		||||
    //                command_queue.end_command(key);
 | 
			
		||||
    //            }
 | 
			
		||||
    //            key => {
 | 
			
		||||
    //                if let Some(mut receiver) = command_queue.add_command(key) {
 | 
			
		||||
    //                    tokio::spawn(async move {
 | 
			
		||||
    //                        while let Ok(()) = receiver.changed().await {
 | 
			
		||||
    //                            let queue = receiver.borrow();
 | 
			
		||||
    //                            while let Some(cmd) = queue.lock().unwrap().pop_back() {
 | 
			
		||||
    //                                handle_command(cmd);
 | 
			
		||||
    //                            }
 | 
			
		||||
    //                        }
 | 
			
		||||
    //                    });
 | 
			
		||||
    //                }
 | 
			
		||||
    //            }
 | 
			
		||||
    //        }
 | 
			
		||||
    //    }
 | 
			
		||||
fn init_health_subsystem(client: Client) {
 | 
			
		||||
    let system = health::System::new();
 | 
			
		||||
    let health_monitor = health::HealthMonitor::new();
 | 
			
		||||
    let health_monitor_clone = health_monitor.clone();
 | 
			
		||||
 | 
			
		||||
    std::thread::spawn(move || loop {
 | 
			
		||||
        const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
 | 
			
		||||
        health_monitor.check_system(&system);
 | 
			
		||||
        std::thread::sleep(REFRESH_INTERVAL);
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let mut recv = health_monitor_clone.monitor();
 | 
			
		||||
 | 
			
		||||
        while let Ok(()) = recv.changed().await {
 | 
			
		||||
            let payload = serde_json::to_string(&*recv.borrow()).unwrap();
 | 
			
		||||
 | 
			
		||||
            tracing::info!(%payload, "sending health event");
 | 
			
		||||
 | 
			
		||||
            client
 | 
			
		||||
                .publish("agents.v1.demo_agent.health", payload.into())
 | 
			
		||||
                .await
 | 
			
		||||
                .unwrap();
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
//
 | 
			
		||||
// fn handle_command(cmd: Command) {
 | 
			
		||||
//     println!("{cmd:?}");
 | 
			
		||||
// }
 | 
			
		||||
//
 | 
			
		||||
// #[derive(Default)]
 | 
			
		||||
// struct CommandQueue(HashMap<String, watch::Sender<Mutex<VecDeque<Command>>>>);
 | 
			
		||||
//
 | 
			
		||||
// impl CommandQueue {
 | 
			
		||||
//     pub fn add_command(&mut self, key: &str) -> Option<watch::Receiver<Mutex<VecDeque<Command>>>> {
 | 
			
		||||
//         match self.0.get_mut(key) {
 | 
			
		||||
//             Some(sender) => {
 | 
			
		||||
//                 sender.send_modify(|q| q.lock().unwrap().push_back(Command::Foo));
 | 
			
		||||
//                 None
 | 
			
		||||
//             }
 | 
			
		||||
//             None => {
 | 
			
		||||
//                 let (sender, receiver) = watch::channel(Mutex::new(VecDeque::new()));
 | 
			
		||||
//                 sender.send_modify(|q| q.lock().unwrap().push_back(Command::Foo));
 | 
			
		||||
//                 Some(receiver)
 | 
			
		||||
//             }
 | 
			
		||||
//         }
 | 
			
		||||
//     }
 | 
			
		||||
//
 | 
			
		||||
//     pub fn end_command(&mut self, key: &str) {
 | 
			
		||||
//         self.0.remove(key);
 | 
			
		||||
//     }
 | 
			
		||||
// }
 | 
			
		||||
//
 | 
			
		||||
// mod cmd {
 | 
			
		||||
//     // use std::borrow::Cow;
 | 
			
		||||
//
 | 
			
		||||
//     #[derive(Debug, Clone)]
 | 
			
		||||
//     pub enum Command {
 | 
			
		||||
//         Foo,
 | 
			
		||||
//     }
 | 
			
		||||
//
 | 
			
		||||
//     #[derive(Debug)]
 | 
			
		||||
//     pub struct UnknownCommand<'a>(&'a str);
 | 
			
		||||
//
 | 
			
		||||
//     impl<'a> TryFrom<&'a str> for Command {
 | 
			
		||||
//         type Error = UnknownCommand<'a>;
 | 
			
		||||
//
 | 
			
		||||
//         fn try_from(cmd: &'a str) -> Result<Self, Self::Error> {
 | 
			
		||||
//             match cmd {
 | 
			
		||||
//                 "foo" => Ok(Command::Foo),
 | 
			
		||||
//                 _ => Err(UnknownCommand(cmd)),
 | 
			
		||||
//             }
 | 
			
		||||
//         }
 | 
			
		||||
//     }
 | 
			
		||||
// }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue