diff --git a/Cargo.lock b/Cargo.lock index 0c8d302..841d610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,6 +672,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tower", "tracing", "tracing-subscriber", ] @@ -1195,12 +1196,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 04104ad..4c24298 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -13,5 +13,6 @@ sysinfo = { version = "0.30.5", default-features = false } thiserror = "1.0.56" tokio = { version = "1.35.1", features = ["full"] } tokio-stream = { version = "0.1.14", default-features = false } +tower = { version = "0.4.13", features = ["steer", "util"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["fmt"] } diff --git a/agent/src/health.rs b/agent/src/health.rs index 3f5f7e0..1183cf0 100644 --- a/agent/src/health.rs +++ b/agent/src/health.rs @@ -27,7 +27,7 @@ impl System { self.sys.refresh_specifics( RefreshKind::new() - .with_memory(MemoryRefreshKind::new().with_ram().with_swap()) + .with_memory(MemoryRefreshKind::everything()) .with_cpu(CpuRefreshKind::everything()), ); diff --git a/agent/src/main.rs b/agent/src/main.rs index f52f722..0aa7177 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -4,6 +4,7 @@ use tracing::Level; mod health; mod messaging; +mod services; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -21,16 +22,17 @@ async fn main() -> anyhow::Result<()> { } async fn run() -> anyhow::Result<()> { - let agent = messaging::Agent::connect("demo_agent").await?; + let agent = messaging::Client::connect("demo_agent").await?; tracing::info!("initialized messaging system"); init_health_subsystem(agent.clone()); tracing::info!("initialized health system"); + // services::init_services().await; tracing::info!("agent is ready"); tokio::signal::ctrl_c().await?; Ok(()) } -fn init_health_subsystem(agent: messaging::Agent) { +fn init_health_subsystem(agent: messaging::Client) { let mut system = health::System::new(); let health_monitor = health::HealthMonitor::new(); let health_monitor_clone = health_monitor.clone(); diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs index 5a276a1..7a549dd 100644 --- a/agent/src/messaging.rs +++ b/agent/src/messaging.rs @@ -1,14 +1,29 @@ //! Prymn messaging framework -use async_nats::SubscribeError; -use async_nats::{Client, ConnectError, ConnectOptions}; +use async_nats::{ConnectError, ConnectOptions, SubscribeError}; use thiserror::Error; use tokio_stream::StreamExt; +pub use self::v1::Command; pub use self::v1::OutgoingMessage; +#[derive(Debug, Error)] +pub enum MessagingError<'a> { + #[error("failed to connect to message broker")] + ConnectError(#[from] ConnectError), + #[error("failed to listen for messages from broker")] + SubscribeError(#[from] SubscribeError), + #[error("unknown subject '{0}'")] + UnknownSubject(&'a str), + #[error("parse error on message format")] + MessageFormatError(#[from] serde_json::Error), +} + mod v1 { use bytes::Bytes; + use serde::Deserialize; + + use super::MessagingError; pub(super) const PREFIX: &'static str = "agents.v1"; @@ -31,10 +46,31 @@ mod v1 { } } - // #[derive(Debug)] - // pub enum Subject { - // Command, - // } + #[derive(Debug)] + pub enum Command { + Exec(ExecArgs), + } + + impl Command { + pub fn from_message<'a>( + subject: &'a str, + payload: &'a [u8], + ) -> Result> { + match subject { + "exec" => Ok(Command::Exec(serde_json::from_slice(payload)?)), + _ => Err(MessagingError::UnknownSubject(subject)), + } + } + } + + /// An operating system program execution. + #[derive(Debug, Deserialize)] + pub(super) struct ExecArgs { + user: String, + program: String, + #[serde(default)] + args: Vec, + } // /// A system update message received when an OS update is requested // #[derive(Deserialize)] @@ -49,38 +85,15 @@ mod v1 { // /// `true` when a simulated update should occur // simulate: bool, // } - - // /// An operating system program execution. - // #[derive(Deserialize)] - // struct Command<'a> { - // /// The user to which the child process will switch to - // user: &'a str, - // /// Program name or path - // program: &'a str, - // /// Program arguments - // args: Vec<&'a str>, - // } - - // struct OpenTerminal { - - // } -} - -#[derive(Debug, Error)] -pub enum MessagingError { - #[error("failure connecting to message broker")] - ConnectError(#[from] ConnectError), - #[error("failure listening for messages from broker")] - SubscribeError(#[from] SubscribeError), } #[derive(Debug, Clone)] -pub struct Agent { +pub struct Client { id: String, - client: Client, + client: async_nats::Client, } -impl Agent { +impl Client { #[tracing::instrument] pub async fn connect(id: &str) -> Result { let client = ConnectOptions::new() @@ -91,14 +104,27 @@ impl Agent { .await?; tracing::debug!("connected to NATS"); + let prefix = format!("{}.{}.*", v1::PREFIX, id); let mut routing_subscriber = client.subscribe(prefix.clone()).await?; - tracing::debug!("root message routing initialized on {}", &prefix); tokio::spawn(async move { + tracing::debug!("v1 command routing subscribed on {}", &prefix); + while let Some(data) = routing_subscriber.next().await { - let command = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]); - dbg!(command); + let subject = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]); + + match Command::from_message(subject, &data.payload) { + Ok(command) => { + tracing::debug!(?command); + // TODO: Think if we might want to queue commands + // handle_command(command); + } + Err(err) => { + tracing::warn!(cause = %err, "failed to parse received message"); + tracing::debug!(payload = %String::from_utf8_lossy(&data.payload)); + } + } } }); @@ -117,3 +143,16 @@ impl Agent { } } } + +// pub fn receive_requests(agent: Client) { +// let prefix = format!("{}.{}.*", v1::PREFIX, id); +// let mut routing_subscriber = client.subscribe(prefix.clone()).await?; +// let subscriber = agent.subscribe("").await; +// tracing::debug!("v1 command routing subscribed on {}", &prefix); +// } +// +// fn handle_command(command: Command) { +// match command { +// Command::Exec(_) => todo!(), +// } +// } diff --git a/agent/src/services.rs b/agent/src/services.rs new file mode 100644 index 0000000..a4c84bb --- /dev/null +++ b/agent/src/services.rs @@ -0,0 +1,10 @@ +use tower::steer::Steer; + +fn router() { + let services = vec![]; + + let svc = Steer::new(services, |req, services: &[_]| { + // do something with req + 0 + }); +}