diff --git a/Cargo.lock b/Cargo.lock index bb1512d..0c8d302 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -665,9 +665,11 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats", + "bytes", "serde", "serde_json", "sysinfo", + "thiserror", "tokio", "tokio-stream", "tracing", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 63ecf80..04104ad 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -6,9 +6,11 @@ edition = "2021" [dependencies] anyhow = "1.0.79" async-nats = "0.33.0" +bytes = "1.5.0" serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" sysinfo = { version = "0.30.5", default-features = false } +thiserror = "1.0.56" tokio = { version = "1.35.1", features = ["full"] } tokio-stream = { version = "0.1.14", default-features = false } tracing = "0.1.40" diff --git a/agent/src/health.rs b/agent/src/health.rs index cb682e5..3f5f7e0 100644 --- a/agent/src/health.rs +++ b/agent/src/health.rs @@ -23,15 +23,15 @@ impl System { } pub fn refresh_resources(&mut self) { - use sysinfo::{CpuRefreshKind, MemoryRefreshKind}; + use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind}; self.sys.refresh_specifics( - sysinfo::RefreshKind::new() + RefreshKind::new() .with_memory(MemoryRefreshKind::new().with_ram().with_swap()) - .with_cpu(CpuRefreshKind::new().with_cpu_usage()), + .with_cpu(CpuRefreshKind::everything()), ); - self.disks.refresh_list(); + // self.disks.refresh_list(); } pub fn system(&self) -> &sysinfo::System { @@ -47,17 +47,14 @@ impl System { pub enum Status { #[default] Normal, - MemoryWarning, - MemoryCritical, - CpuWarning, - CpuCritical, - DiskWarning, - DiskCritical, + Critical, } -#[derive(Debug, Serialize)] +#[derive(Debug, Default, Serialize)] pub struct Health { - status: Vec, + cpu_status: Status, + memory_status: Status, + disk_status: Status, } #[derive(Clone)] @@ -65,26 +62,24 @@ pub struct HealthMonitor(Arc>); impl HealthMonitor { pub fn new() -> Self { - let (sender, _) = watch::channel(Health { - status: [Status::default()].into(), - }); + let (sender, _) = watch::channel(Health::default()); Self(Arc::new(sender)) } pub fn check_system(&self, system: &System) { - let s = system.system(); + let sys = system.system(); - let memory_usage = if s.total_memory() > 0 { - s.used_memory() as f64 * 100.0 / s.total_memory() as f64 + let memory_usage = if sys.total_memory() > 0 { + sys.used_memory() as f64 * 100.0 / sys.total_memory() as f64 } else { 0.0 }; - let cpu_usage = s.global_cpu_info().cpu_usage(); + let cpu_usage = sys.global_cpu_info().cpu_usage(); // for d in system.disks().list() { - // let avail = if d.total_space() > 0 { + // let _avail = if d.total_space() > 0 { // (d.available_space() * 100 / d.total_space()) as u8 // } else { // 0 as u8 @@ -92,36 +87,31 @@ impl HealthMonitor { // } self.0.send_if_modified(|health| { - let cpu_changed = if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD { - health - .status - .iter() - .find(|&it| *it == Status::CpuCritical) - .and(None) - .unwrap_or_else(|| { - health.status.push(Status::CpuCritical); - true - }) - } else { - false + let cpu_changed = match health.cpu_status { + Status::Normal if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD => { + health.cpu_status = Status::Critical; + true + } + Status::Critical if cpu_usage <= CPU_USAGE_CRITICAL_THRESHOLD => { + health.cpu_status = Status::Normal; + true + } + _ => false, }; - let mem_changed = if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD { - health - .status - .iter() - .find(|&it| *it == Status::MemoryCritical) - .and(None) - .unwrap_or_else(|| { - health.status.push(Status::MemoryCritical); - true - }) - } else { - false + let memory_changed = match health.memory_status { + Status::Normal if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD => { + health.memory_status = Status::Critical; + true + } + Status::Critical if memory_usage <= MEMORY_USAGE_CRITICAL_THRESHOLD => { + health.memory_status = Status::Normal; + true + } + _ => false, }; - dbg!(cpu_changed || mem_changed); - cpu_changed || mem_changed + cpu_changed || memory_changed }); } @@ -129,3 +119,10 @@ impl HealthMonitor { self.0.subscribe() } } + +impl Default for HealthMonitor { + #[inline] + fn default() -> Self { + Self::new() + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index d0d9f6c..f52f722 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,9 +1,9 @@ use std::time::Duration; -use async_nats::{Client, ConnectOptions}; use tracing::Level; mod health; +mod messaging; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -14,31 +14,31 @@ async fn main() -> anyhow::Result<()> { tracing::subscriber::set_global_default(subscriber) .expect("to set a tracing global subscriber"); - let client = ConnectOptions::new() - .name("Prymn Agent demo_agent") - .custom_inbox_prefix("_INBOX_demo_agent") - .user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned()) - .connect("localhost") - .await - .map_err(|err| err)?; - - tracing::info!("connected to nats server"); - - init_health_subsystem(client); - - // wait_for_commands(client).await; + run().await.map_err(|err| { + tracing::error!(cause = %err, "could not start agent"); + err + }) +} +async fn run() -> anyhow::Result<()> { + let agent = messaging::Agent::connect("demo_agent").await?; + tracing::info!("initialized messaging system"); + init_health_subsystem(agent.clone()); + tracing::info!("initialized health system"); + tracing::info!("agent is ready"); tokio::signal::ctrl_c().await?; Ok(()) } -fn init_health_subsystem(client: Client) { - let system = health::System::new(); +fn init_health_subsystem(agent: messaging::Agent) { + let mut system = health::System::new(); let health_monitor = health::HealthMonitor::new(); let health_monitor_clone = health_monitor.clone(); + // Forever refresh system resources and monitor changes std::thread::spawn(move || loop { const REFRESH_INTERVAL: Duration = Duration::from_secs(1); + system.refresh_resources(); health_monitor.check_system(&system); std::thread::sleep(REFRESH_INTERVAL); }); @@ -47,14 +47,10 @@ fn init_health_subsystem(client: Client) { let mut recv = health_monitor_clone.monitor(); while let Ok(()) = recv.changed().await { - let payload = serde_json::to_string(&*recv.borrow()).unwrap(); - - tracing::info!(%payload, "sending health event"); - - client - .publish("agents.v1.demo_agent.health", payload.into()) - .await - .unwrap(); + tracing::info!(health = ?&*recv.borrow(), "health watermark"); + let payload = serde_json::to_string(&*recv.borrow()).expect("serializable health"); + let message = messaging::OutgoingMessage::Health(payload); + agent.publish(message).await; } }); } diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs new file mode 100644 index 0000000..5a276a1 --- /dev/null +++ b/agent/src/messaging.rs @@ -0,0 +1,119 @@ +//! Prymn messaging framework + +use async_nats::SubscribeError; +use async_nats::{Client, ConnectError, ConnectOptions}; +use thiserror::Error; +use tokio_stream::StreamExt; + +pub use self::v1::OutgoingMessage; + +mod v1 { + use bytes::Bytes; + + pub(super) const PREFIX: &'static str = "agents.v1"; + + #[derive(Debug)] + pub enum OutgoingMessage { + Health(String), + } + + impl OutgoingMessage { + pub fn subject(&self, sender: &str) -> String { + match self { + OutgoingMessage::Health(_) => format!("{}.{}.health", PREFIX, sender), + } + } + + pub fn into_payload(self) -> Bytes { + match self { + OutgoingMessage::Health(data) => Bytes::from(data.into_bytes()), + } + } + } + + // #[derive(Debug)] + // pub enum Subject { + // Command, + // } + + // /// A system update message received when an OS update is requested + // #[derive(Deserialize)] + // struct SystemUpdate { + // /// `true` when a simulated update should occur + // simulate: bool, + // } + + // /// An agent update message received when an agent binary update is requested + // #[derive(Deserialize)] + // struct AgentUpdate { + // /// `true` when a simulated update should occur + // simulate: bool, + // } + + // /// An operating system program execution. + // #[derive(Deserialize)] + // struct Command<'a> { + // /// The user to which the child process will switch to + // user: &'a str, + // /// Program name or path + // program: &'a str, + // /// Program arguments + // args: Vec<&'a str>, + // } + + // struct OpenTerminal { + + // } +} + +#[derive(Debug, Error)] +pub enum MessagingError { + #[error("failure connecting to message broker")] + ConnectError(#[from] ConnectError), + #[error("failure listening for messages from broker")] + SubscribeError(#[from] SubscribeError), +} + +#[derive(Debug, Clone)] +pub struct Agent { + id: String, + client: Client, +} + +impl Agent { + #[tracing::instrument] + pub async fn connect(id: &str) -> Result { + let client = ConnectOptions::new() + .name(format!("Prymn Agent {id}")) + .custom_inbox_prefix(format!("_INBOX_{id}")) + .user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned()) + .connect("localhost") + .await?; + + tracing::debug!("connected to NATS"); + let prefix = format!("{}.{}.*", v1::PREFIX, id); + let mut routing_subscriber = client.subscribe(prefix.clone()).await?; + tracing::debug!("root message routing initialized on {}", &prefix); + + tokio::spawn(async move { + while let Some(data) = routing_subscriber.next().await { + let command = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]); + dbg!(command); + } + }); + + Ok(Self { + id: id.to_owned(), + client, + }) + } + + #[tracing::instrument(skip(self))] + pub async fn publish(&self, msg: OutgoingMessage) { + let subj = msg.subject(&self.id); + tracing::debug!(subject = &subj, "publish"); + if let Err(err) = self.client.publish(subj, msg.into_payload()).await { + tracing::error!(cause = %err, "publish error"); + } + } +}