From 67d147c7e3ce555746f7ec1976f3a76de2d3999c Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Mon, 29 Jan 2024 12:44:00 +0200 Subject: [PATCH] uggggh --- Cargo.lock | 49 +++++---- agent/Cargo.toml | 4 +- agent/src/health.rs | 37 ++++++- agent/src/main.rs | 41 ++----- agent/src/messaging.rs | 235 ++++++++++++++++++----------------------- agent/src/services.rs | 91 ++++++++++++++-- 6 files changed, 261 insertions(+), 196 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 841d610..979fbae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,6 +144,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +dependencies = [ + "num-traits", + "serde", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -515,6 +525,15 @@ dependencies = [ "rand", ] +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -666,13 +685,15 @@ dependencies = [ "anyhow", "async-nats", "bytes", + "chrono", + "futures", "serde", "serde_json", "sysinfo", "thiserror", "tokio", "tokio-stream", - "tower", + "tokio-util", "tracing", "tracing-subscriber", ] @@ -1197,39 +1218,25 @@ dependencies = [ ] [[package]] -name = "tower" -version = "0.4.13" +name = "tokio-util" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ + "bytes", "futures-core", - "futures-util", - "pin-project", + "futures-sink", "pin-project-lite", - "tower-layer", - "tower-service", + "tokio", "tracing", ] -[[package]] -name = "tower-layer" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" - -[[package]] -name = "tower-service" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" - [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 4c24298..c19ef3c 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -7,12 +7,14 @@ edition = "2021" anyhow = "1.0.79" async-nats = "0.33.0" bytes = "1.5.0" +chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] } +futures = { version = "0.3.30", default-features = false, features = ["std"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" sysinfo = { version = "0.30.5", default-features = false } thiserror = "1.0.56" tokio = { version = "1.35.1", features = ["full"] } tokio-stream = { version = "0.1.14", default-features = false } -tower = { version = "0.4.13", features = ["steer", "util"] } +tokio-util = { version = "0.7.10", features = ["codec"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["fmt"] } diff --git a/agent/src/health.rs b/agent/src/health.rs index 1183cf0..d9cbc72 100644 --- a/agent/src/health.rs +++ b/agent/src/health.rs @@ -1,10 +1,12 @@ //! System health information and checking -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use tokio::sync::watch; +use crate::messaging::{Client, Message}; + const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; @@ -43,14 +45,14 @@ impl System { } } -#[derive(Debug, Default, PartialEq, Serialize)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub enum Status { #[default] Normal, Critical, } -#[derive(Debug, Default, Serialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct Health { cpu_status: Status, memory_status: Status, @@ -126,3 +128,30 @@ impl Default for HealthMonitor { Self::new() } } + +pub async fn init_health_subsystem(client: Client) -> HealthMonitor { + let health_monitor = HealthMonitor::new(); + let health_monitor_clone = health_monitor.clone(); + let health_monitor_ret = health_monitor.clone(); + let mut system = System::new(); + + // Forever refresh system resources and monitor changes + std::thread::spawn(move || loop { + const REFRESH_INTERVAL: Duration = Duration::from_secs(1); + system.refresh_resources(); + health_monitor.check_system(&system); + std::thread::sleep(REFRESH_INTERVAL); + }); + + tokio::spawn(async move { + let mut recv = health_monitor_clone.monitor(); + + while let Ok(()) = recv.changed().await { + tracing::info!(health = ?&*recv.borrow(), "health watermark"); + let health = recv.borrow().clone(); + client.publish(Message::health(health).unwrap()).await; + } + }); + + health_monitor_ret +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 0aa7177..6c86a03 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,5 +1,6 @@ -use std::time::Duration; - +use health::init_health_subsystem; +use messaging::Client; +use services::init_services; use tracing::Level; mod health; @@ -22,37 +23,15 @@ async fn main() -> anyhow::Result<()> { } async fn run() -> anyhow::Result<()> { - let agent = messaging::Client::connect("demo_agent").await?; - tracing::info!("initialized messaging system"); - init_health_subsystem(agent.clone()); + let client = Client::connect("demo_agent").await?; + + let _health_monitor = init_health_subsystem(client.clone()).await; tracing::info!("initialized health system"); - // services::init_services().await; + + init_services(client).await; + tracing::info!("initialized services"); + tracing::info!("agent is ready"); tokio::signal::ctrl_c().await?; Ok(()) } - -fn init_health_subsystem(agent: messaging::Client) { - let mut system = health::System::new(); - let health_monitor = health::HealthMonitor::new(); - let health_monitor_clone = health_monitor.clone(); - - // Forever refresh system resources and monitor changes - std::thread::spawn(move || loop { - const REFRESH_INTERVAL: Duration = Duration::from_secs(1); - system.refresh_resources(); - health_monitor.check_system(&system); - std::thread::sleep(REFRESH_INTERVAL); - }); - - tokio::spawn(async move { - let mut recv = health_monitor_clone.monitor(); - - while let Ok(()) = recv.changed().await { - tracing::info!(health = ?&*recv.borrow(), "health watermark"); - let payload = serde_json::to_string(&*recv.borrow()).expect("serializable health"); - let message = messaging::OutgoingMessage::Health(payload); - agent.publish(message).await; - } - }); -} diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs index 7a549dd..49b4f85 100644 --- a/agent/src/messaging.rs +++ b/agent/src/messaging.rs @@ -1,102 +1,80 @@ -//! Prymn messaging framework +use std::fmt::Display; -use async_nats::{ConnectError, ConnectOptions, SubscribeError}; -use thiserror::Error; +use anyhow::{anyhow, Result}; +use bytes::Bytes; +use futures::Stream; +use serde::Deserialize; use tokio_stream::StreamExt; -pub use self::v1::Command; -pub use self::v1::OutgoingMessage; +use crate::health::Health; -#[derive(Debug, Error)] -pub enum MessagingError<'a> { - #[error("failed to connect to message broker")] - ConnectError(#[from] ConnectError), - #[error("failed to listen for messages from broker")] - SubscribeError(#[from] SubscribeError), - #[error("unknown subject '{0}'")] - UnknownSubject(&'a str), - #[error("parse error on message format")] - MessageFormatError(#[from] serde_json::Error), -} - -mod v1 { - use bytes::Bytes; - use serde::Deserialize; - - use super::MessagingError; - - pub(super) const PREFIX: &'static str = "agents.v1"; - - #[derive(Debug)] - pub enum OutgoingMessage { - Health(String), - } - - impl OutgoingMessage { - pub fn subject(&self, sender: &str) -> String { - match self { - OutgoingMessage::Health(_) => format!("{}.{}.health", PREFIX, sender), - } - } - - pub fn into_payload(self) -> Bytes { - match self { - OutgoingMessage::Health(data) => Bytes::from(data.into_bytes()), - } - } - } - - #[derive(Debug)] - pub enum Command { - Exec(ExecArgs), - } - - impl Command { - pub fn from_message<'a>( - subject: &'a str, - payload: &'a [u8], - ) -> Result> { - match subject { - "exec" => Ok(Command::Exec(serde_json::from_slice(payload)?)), - _ => Err(MessagingError::UnknownSubject(subject)), - } - } - } - - /// An operating system program execution. - #[derive(Debug, Deserialize)] - pub(super) struct ExecArgs { - user: String, - program: String, - #[serde(default)] - args: Vec, - } - - // /// A system update message received when an OS update is requested - // #[derive(Deserialize)] - // struct SystemUpdate { - // /// `true` when a simulated update should occur - // simulate: bool, - // } - - // /// An agent update message received when an agent binary update is requested - // #[derive(Deserialize)] - // struct AgentUpdate { - // /// `true` when a simulated update should occur - // simulate: bool, - // } -} - -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Client { id: String, - client: async_nats::Client, + nats: async_nats::Client, +} + +#[derive(Debug)] +pub enum Subject { + Health, + Exec, +} + +impl Display for Subject { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Subject::Health => write!(f, "health"), + Subject::Exec => write!(f, "exec"), + } + } +} + +#[derive(Debug)] +pub struct Message { + subject: Subject, + payload: Bytes, + reply: Option, +} + +impl Message { + fn from_transport(msg: async_nats::Message) -> Result { + let suffix = msg.subject.split_terminator('.').last().unwrap_or_default(); + + match suffix { + "exec" => Ok(Message { + subject: Subject::Exec, + payload: msg.payload, + reply: msg.reply, + }), + "health" => Ok(Message { + subject: Subject::Health, + payload: msg.payload, + reply: msg.reply, + }), + _ => Err(anyhow!("unknown subject: {}", msg.subject)), + } + } + + pub fn subject(&self) -> &Subject { + &self.subject + } + + pub fn parse_payload<'a, T: Deserialize<'a>>(&'a self) -> Result { + Ok(serde_json::from_slice(&self.payload[..])?) + } + + pub fn health(health: Health) -> Result { + Ok(Message { + subject: Subject::Health, + payload: Bytes::from_iter(serde_json::to_vec(&health)?), + reply: None, + }) + } } impl Client { - #[tracing::instrument] - pub async fn connect(id: &str) -> Result { - let client = ConnectOptions::new() + pub async fn connect(id: &str) -> Result { + let nats = async_nats::ConnectOptions::new() .name(format!("Prymn Agent {id}")) .custom_inbox_prefix(format!("_INBOX_{id}")) .user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned()) @@ -105,54 +83,45 @@ impl Client { tracing::debug!("connected to NATS"); - let prefix = format!("{}.{}.*", v1::PREFIX, id); - let mut routing_subscriber = client.subscribe(prefix.clone()).await?; - - tokio::spawn(async move { - tracing::debug!("v1 command routing subscribed on {}", &prefix); - - while let Some(data) = routing_subscriber.next().await { - let subject = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]); - - match Command::from_message(subject, &data.payload) { - Ok(command) => { - tracing::debug!(?command); - // TODO: Think if we might want to queue commands - // handle_command(command); - } - Err(err) => { - tracing::warn!(cause = %err, "failed to parse received message"); - tracing::debug!(payload = %String::from_utf8_lossy(&data.payload)); - } - } - } - }); - Ok(Self { id: id.to_owned(), - client, + nats, }) } - #[tracing::instrument(skip(self))] - pub async fn publish(&self, msg: OutgoingMessage) { - let subj = msg.subject(&self.id); - tracing::debug!(subject = &subj, "publish"); - if let Err(err) = self.client.publish(subj, msg.into_payload()).await { - tracing::error!(cause = %err, "publish error"); + pub async fn publish(&self, msg: Message) { + let subject = format!("agents.v1.{}.{}", self.id, msg.subject); + self.nats.publish(subject, msg.payload).await.unwrap(); + } + + pub async fn reply(&self, msg: Message, mut stream: impl Stream + Unpin) { + match msg.reply { + Some(reply) => { + while let Some(data) = stream.next().await { + self.nats.publish(reply.clone(), data.into()).await.unwrap(); + } + } + None => tracing::warn!(?msg, "tried to reply to message without a reply subject"), } } -} -// pub fn receive_requests(agent: Client) { -// let prefix = format!("{}.{}.*", v1::PREFIX, id); -// let mut routing_subscriber = client.subscribe(prefix.clone()).await?; -// let subscriber = agent.subscribe("").await; -// tracing::debug!("v1 command routing subscribed on {}", &prefix); -// } -// -// fn handle_command(command: Command) { -// match command { -// Command::Exec(_) => todo!(), -// } -// } + pub async fn subscribe(&self) -> Result> { + let subject = format!("agents.v1.{}.*", self.id); + + let subscriber = self + .nats + .subscribe(subject.clone()) + .await? + .filter_map(move |data| match Message::from_transport(data) { + Ok(msg) => Some(msg), + Err(err) => { + tracing::warn!("{}", err); + None + } + }); + + tracing::debug!("subscribed on {}", &subject); + + Ok(subscriber) + } +} diff --git a/agent/src/services.rs b/agent/src/services.rs index a4c84bb..a5b1985 100644 --- a/agent/src/services.rs +++ b/agent/src/services.rs @@ -1,10 +1,89 @@ -use tower::steer::Steer; +use std::process::Stdio; -fn router() { - let services = vec![]; +use bytes::Bytes; +use futures::{FutureExt, Stream}; +use serde::Deserialize; +use tokio::process::Command; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, FramedRead}; - let svc = Steer::new(services, |req, services: &[_]| { - // do something with req - 0 +use crate::{ + health::Health, + messaging::{Client, Message, Subject}, +}; + +pub async fn init_services(client: Client) { + let mut message_stream = client.subscribe().await.unwrap(); + + tokio::spawn(async move { + while let Some(message) = message_stream.next().await { + let client = client.clone(); + tokio::spawn(async move { + if let Err(err) = handle_message(client, message).await { + tracing::warn!("{err}"); + } + }); + } }); } + +async fn handle_message(client: Client, message: Message) -> anyhow::Result<()> { + match message.subject() { + Subject::Exec => { + let stream = exec_handler(message.parse_payload()?).await?; + client.reply(message, stream).await; + } + Subject::Health => { + let health: Health = message.parse_payload()?; + tracing::info!(?health, "received a health"); + } + } + + Ok(()) +} + +/// An operating system program execution. +#[derive(Debug, Deserialize)] +struct ExecMessage { + user: String, + program: String, + #[serde(default)] + args: Vec, +} + +async fn exec_handler(req: ExecMessage) -> anyhow::Result + Unpin> { + // TODO: Tasks should be idempontent + // TODO: Root user should be able to run only specific programs + let mut cmd = if req.user != "root" { + let mut cmd = Command::new("sudo"); + cmd.arg("-iu").arg(&req.user).arg("--").arg(&req.program); + cmd + } else { + Command::new(&req.program) + }; + + let mut io = cmd + .args(&req.args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let stdout = FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()) + .map(|stdout| Bytes::from(stdout.unwrap())); + + let stderr = FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()) + .map(|stderr| Bytes::from(stderr.unwrap())); + + let exit = async move { + io.wait() + .await + .map(|exit| { + let exit = exit.to_string(); + Bytes::from(exit) + }) + .unwrap() + } + .into_stream(); + + Ok(Box::pin(stdout.merge(stderr).chain(exit))) +}