From 46b87d2559c7cae420d956677ec0c94c0f4f1124 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Mon, 29 Jan 2024 20:18:29 +0200 Subject: [PATCH] more churn --- Cargo.lock | 45 ++++++++- agent/Cargo.toml | 1 + agent/src/health.rs | 70 +++++++++++--- agent/src/main.rs | 3 +- agent/src/messaging.rs | 46 +++++---- agent/src/pty.rs | 166 +++++++++++++++++++++++++++++++++ agent/src/services.rs | 89 ------------------ agent/src/services/exec.rs | 79 ++++++++++++++++ agent/src/services/mod.rs | 67 +++++++++++++ agent/src/services/terminal.rs | 45 +++++++++ 10 files changed, 486 insertions(+), 125 deletions(-) create mode 100644 agent/src/pty.rs delete mode 100644 agent/src/services.rs create mode 100644 agent/src/services/exec.rs create mode 100644 agent/src/services/mod.rs create mode 100644 agent/src/services/terminal.rs diff --git a/Cargo.lock b/Cargo.lock index 979fbae..8d3f5f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,6 +105,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" + [[package]] name = "block-buffer" version = "0.10.4" @@ -281,6 +287,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "fiat-crypto" version = "0.2.5" @@ -435,9 +451,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" + +[[package]] +name = "linux-raw-sys" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -687,6 +709,7 @@ dependencies = [ "bytes", "chrono", "futures", + "rustix", "serde", "serde_json", "sysinfo", @@ -743,7 +766,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -804,6 +827,20 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +dependencies = [ + "bitflags 2.4.2", + "errno", + "itoa", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.21.10" @@ -884,7 +921,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index c19ef3c..96a6809 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -9,6 +9,7 @@ async-nats = "0.33.0" bytes = "1.5.0" chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] } futures = { version = "0.3.30", default-features = false, features = ["std"] } +rustix = { version = "0.38.30", features = ["termios", "stdio", "pty", "process"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" sysinfo = { version = "0.30.5", default-features = false } diff --git a/agent/src/health.rs b/agent/src/health.rs index d9cbc72..58296e9 100644 --- a/agent/src/health.rs +++ b/agent/src/health.rs @@ -1,6 +1,6 @@ //! System health information and checking -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use tokio::sync::watch; @@ -9,7 +9,7 @@ use crate::messaging::{Client, Message}; const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; -const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; +const DISK_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; pub struct System { sys: sysinfo::System, @@ -33,7 +33,7 @@ impl System { .with_cpu(CpuRefreshKind::everything()), ); - // self.disks.refresh_list(); + self.disks.refresh_list(); } pub fn system(&self) -> &sysinfo::System { @@ -45,18 +45,26 @@ impl System { } } -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub enum Status { #[default] Normal, Critical, } +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub enum DiskStatus { + #[default] + Normal, + // HighUsage, + VeryHighUsage, +} + #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct Health { cpu_status: Status, memory_status: Status, - disk_status: Status, + disk_status: HashMap, } #[derive(Clone)] @@ -80,13 +88,12 @@ impl HealthMonitor { let cpu_usage = sys.global_cpu_info().cpu_usage(); - // for d in system.disks().list() { - // let _avail = if d.total_space() > 0 { - // (d.available_space() * 100 / d.total_space()) as u8 - // } else { - // 0 as u8 - // }; - // } + let disks_usage = system.disks().list().iter().map(|dk| { + ( + dk.name().to_str().unwrap_or(""), + (dk.total_space() - dk.available_space()) as f64 / dk.total_space() as f64 * 100.0, + ) + }); self.0.send_if_modified(|health| { let cpu_changed = match health.cpu_status { @@ -113,7 +120,42 @@ impl HealthMonitor { _ => false, }; - cpu_changed || memory_changed + let mut disk_changed = false; + + disks_usage.for_each(|(name, usage)| match health.disk_status.get_mut(name) { + Some(DiskStatus::Normal) if usage > DISK_USAGE_CRITICAL_THRESHOLD => { + println!("{usage}"); + health + .disk_status + .insert(name.to_owned(), DiskStatus::VeryHighUsage); + + disk_changed = true; + } + Some(DiskStatus::VeryHighUsage) if usage <= DISK_USAGE_CRITICAL_THRESHOLD => { + health + .disk_status + .insert(name.to_owned(), DiskStatus::Normal); + + disk_changed = true; + } + None if usage > DISK_USAGE_CRITICAL_THRESHOLD => { + health + .disk_status + .insert(name.to_owned(), DiskStatus::VeryHighUsage); + + disk_changed = true; + } + None => { + health + .disk_status + .insert(name.to_owned(), DiskStatus::Normal); + + disk_changed = true; + } + _ => {} + }); + + cpu_changed || memory_changed || disk_changed }); } @@ -137,7 +179,7 @@ pub async fn init_health_subsystem(client: Client) -> HealthMonitor { // Forever refresh system resources and monitor changes std::thread::spawn(move || loop { - const REFRESH_INTERVAL: Duration = Duration::from_secs(1); + const REFRESH_INTERVAL: Duration = Duration::from_secs(5); system.refresh_resources(); health_monitor.check_system(&system); std::thread::sleep(REFRESH_INTERVAL); diff --git a/agent/src/main.rs b/agent/src/main.rs index 6c86a03..0ffc009 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -5,6 +5,7 @@ use tracing::Level; mod health; mod messaging; +mod pty; mod services; #[tokio::main] @@ -28,7 +29,7 @@ async fn run() -> anyhow::Result<()> { let _health_monitor = init_health_subsystem(client.clone()).await; tracing::info!("initialized health system"); - init_services(client).await; + init_services(client).await?; tracing::info!("initialized services"); tracing::info!("agent is ready"); diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs index 49b4f85..77b988d 100644 --- a/agent/src/messaging.rs +++ b/agent/src/messaging.rs @@ -1,9 +1,8 @@ -use std::fmt::Display; +use std::fmt::{Debug, Display}; use anyhow::{anyhow, Result}; use bytes::Bytes; use futures::Stream; -use serde::Deserialize; use tokio_stream::StreamExt; use crate::health::Health; @@ -18,6 +17,7 @@ pub struct Client { pub enum Subject { Health, Exec, + OpenTerminal, } impl Display for Subject { @@ -25,10 +25,24 @@ impl Display for Subject { match self { Subject::Health => write!(f, "health"), Subject::Exec => write!(f, "exec"), + Subject::OpenTerminal => write!(f, "open_terminal"), } } } +impl TryFrom<&str> for Subject { + type Error = anyhow::Error; + + fn try_from(value: &str) -> Result { + Ok(match value { + "health" => Subject::Health, + "exec" => Subject::Exec, + "open_terminal" => Subject::OpenTerminal, + _ => return Err(anyhow!("unknown subject '{}'", value)), + }) + } +} + #[derive(Debug)] pub struct Message { subject: Subject, @@ -40,27 +54,19 @@ impl Message { fn from_transport(msg: async_nats::Message) -> Result { let suffix = msg.subject.split_terminator('.').last().unwrap_or_default(); - match suffix { - "exec" => Ok(Message { - subject: Subject::Exec, - payload: msg.payload, - reply: msg.reply, - }), - "health" => Ok(Message { - subject: Subject::Health, - payload: msg.payload, - reply: msg.reply, - }), - _ => Err(anyhow!("unknown subject: {}", msg.subject)), - } + Ok(Message { + subject: suffix.try_into()?, + payload: msg.payload, + reply: msg.reply, + }) } pub fn subject(&self) -> &Subject { &self.subject } - pub fn parse_payload<'a, T: Deserialize<'a>>(&'a self) -> Result { - Ok(serde_json::from_slice(&self.payload[..])?) + pub fn body(&self) -> Bytes { + self.payload.clone() } pub fn health(health: Health) -> Result { @@ -125,3 +131,9 @@ impl Client { Ok(subscriber) } } + +impl Debug for Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Client {{ id: {} }}", self.id) + } +} diff --git a/agent/src/pty.rs b/agent/src/pty.rs new file mode 100644 index 0000000..814e0de --- /dev/null +++ b/agent/src/pty.rs @@ -0,0 +1,166 @@ +use std::{io, task::ready}; + +use rustix::{ + fd::OwnedFd, + fs::{fcntl_getfl, fcntl_setfl, OFlags}, + process::{ioctl_tiocsctty, setsid}, + pty::{grantpt, ioctl_tiocgptpeer, openpt, unlockpt, OpenptFlags}, + stdio::{dup2_stderr, dup2_stdin, dup2_stdout}, + termios::{tcsetwinsize, Winsize}, +}; +use tokio::{ + io::{unix::AsyncFd, AsyncRead, AsyncWrite}, + process::Child, +}; + +#[derive(Debug)] +pub struct Pty { + fd: AsyncFd, +} + +impl Pty { + pub fn open() -> io::Result { + let master = openpt(OpenptFlags::RDWR | OpenptFlags::NOCTTY | OpenptFlags::CLOEXEC)?; + grantpt(&master)?; + unlockpt(&master)?; + + // Set nonblocking + let flags = fcntl_getfl(&master)?; + fcntl_setfl(&master, flags | OFlags::NONBLOCK)?; + + let fd = AsyncFd::new(master)?; + + Ok(Self { fd }) + } + + pub fn child(&self) -> io::Result { + // NOTE: Linux v4.13 and above + let fd = ioctl_tiocgptpeer(&self.fd, OpenptFlags::RDWR | OpenptFlags::NOCTTY)?; + let child = PtyChild { fd }; + + Ok(child) + } + + pub fn resize_window(&self, rows: u16, cols: u16) -> io::Result<()> { + let winsize = Winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + + tcsetwinsize(&self.fd, winsize)?; + + Ok(()) + } + + pub fn try_clone(&self) -> io::Result { + let fd = self.fd.get_ref().try_clone()?; + + Ok(Pty { + fd: AsyncFd::new(fd)?, + }) + } +} + +impl AsyncRead for Pty { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + loop { + let mut guard = ready!(self.fd.poll_read_ready(cx)?); + + match guard.try_io(|inner| { + let fd = inner.get_ref(); + let n = rustix::io::read(fd, buf.initialize_unfilled())?; + buf.advance(n); + + Ok(()) + }) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } + } +} + +impl AsyncWrite for Pty { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + loop { + let mut guard = ready!(self.fd.poll_write_ready(cx))?; + + match guard.try_io(|inner| Ok(rustix::io::write(inner.get_ref(), buf)?)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +#[derive(Debug)] +pub struct PtyChild { + fd: OwnedFd, +} + +impl PtyChild { + pub fn login_tty(&self) -> io::Result<()> { + setsid()?; + ioctl_tiocsctty(&self.fd)?; + dup2_stdin(&self.fd)?; + dup2_stdout(&self.fd)?; + dup2_stderr(&self.fd)?; + + Ok(()) + } +} + +pub fn open_shell(pty_child: PtyChild, shell: &str) -> io::Result { + let mut cmd = tokio::process::Command::new(shell); + + unsafe { + cmd.pre_exec(move || { + pty_child.login_tty()?; + Ok(()) + }); + } + + cmd.spawn() +} + +#[cfg(test)] +mod test { + use rustix::fd::AsRawFd; + + use super::*; + + #[tokio::test] + async fn can_open_pty() { + let pty = Pty::open().unwrap(); + let child = pty.child().unwrap(); + + let master_fd = pty.fd.get_ref().as_raw_fd(); + let child_fd = child.fd.as_raw_fd(); + + assert!(master_fd != child_fd); + } +} diff --git a/agent/src/services.rs b/agent/src/services.rs deleted file mode 100644 index a5b1985..0000000 --- a/agent/src/services.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::process::Stdio; - -use bytes::Bytes; -use futures::{FutureExt, Stream}; -use serde::Deserialize; -use tokio::process::Command; -use tokio_stream::StreamExt; -use tokio_util::codec::{BytesCodec, FramedRead}; - -use crate::{ - health::Health, - messaging::{Client, Message, Subject}, -}; - -pub async fn init_services(client: Client) { - let mut message_stream = client.subscribe().await.unwrap(); - - tokio::spawn(async move { - while let Some(message) = message_stream.next().await { - let client = client.clone(); - tokio::spawn(async move { - if let Err(err) = handle_message(client, message).await { - tracing::warn!("{err}"); - } - }); - } - }); -} - -async fn handle_message(client: Client, message: Message) -> anyhow::Result<()> { - match message.subject() { - Subject::Exec => { - let stream = exec_handler(message.parse_payload()?).await?; - client.reply(message, stream).await; - } - Subject::Health => { - let health: Health = message.parse_payload()?; - tracing::info!(?health, "received a health"); - } - } - - Ok(()) -} - -/// An operating system program execution. -#[derive(Debug, Deserialize)] -struct ExecMessage { - user: String, - program: String, - #[serde(default)] - args: Vec, -} - -async fn exec_handler(req: ExecMessage) -> anyhow::Result + Unpin> { - // TODO: Tasks should be idempontent - // TODO: Root user should be able to run only specific programs - let mut cmd = if req.user != "root" { - let mut cmd = Command::new("sudo"); - cmd.arg("-iu").arg(&req.user).arg("--").arg(&req.program); - cmd - } else { - Command::new(&req.program) - }; - - let mut io = cmd - .args(&req.args) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - let stdout = FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()) - .map(|stdout| Bytes::from(stdout.unwrap())); - - let stderr = FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()) - .map(|stderr| Bytes::from(stderr.unwrap())); - - let exit = async move { - io.wait() - .await - .map(|exit| { - let exit = exit.to_string(); - Bytes::from(exit) - }) - .unwrap() - } - .into_stream(); - - Ok(Box::pin(stdout.merge(stderr).chain(exit))) -} diff --git a/agent/src/services/exec.rs b/agent/src/services/exec.rs new file mode 100644 index 0000000..0609244 --- /dev/null +++ b/agent/src/services/exec.rs @@ -0,0 +1,79 @@ +use std::process::Stdio; + +use bytes::Bytes; +use futures::FutureExt; +use serde::Deserialize; +use tokio::process::Command; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, FramedRead}; + +use super::Ctx; + +#[derive(Debug, Deserialize)] +pub struct ExecReq { + user: String, + program: String, + #[serde(default)] + args: Vec, +} + +impl TryFrom for ExecReq { + type Error = serde_json::Error; + + fn try_from(value: Bytes) -> Result { + serde_json::from_slice(&value[..]) + } +} + +/// An operating system program execution. +pub async fn exec(ctx: Ctx) -> anyhow::Result<()> { + // TODO: Tasks should be idempontent + // TODO: Root user should be able to run only specific programs + + let mut cmd = if &ctx.body.user != "root" { + let mut cmd = Command::new("sudo"); + cmd.arg("-iu") + .arg(&ctx.body.user) + .arg("--") + .arg(&ctx.body.program); + cmd + } else { + Command::new(&ctx.body.program) + }; + + let mut io = cmd + .args(&ctx.body.args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let stdout = + FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| { + stdout + .map(|bytes| bytes.freeze()) + .map_err(|err| tracing::error!(%err, "read error on stdout")) + .ok() + }); + + let stderr = + FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| { + stderr + .map(|bytes| bytes.freeze()) + .map_err(|err| tracing::error!(%err, "read error on stderr")) + .ok() + }); + + let exit = async move { + io.wait() + .await + .map(|exit| { + let exit = exit.to_string(); + Bytes::from(exit) + }) + .unwrap() + } + .into_stream(); + + // Ok(Box::pin(stdout.merge(stderr).chain(exit))) + Ok(()) +} diff --git a/agent/src/services/mod.rs b/agent/src/services/mod.rs new file mode 100644 index 0000000..876657c --- /dev/null +++ b/agent/src/services/mod.rs @@ -0,0 +1,67 @@ +use anyhow::Context; +use bytes::Bytes; +use thiserror::Error; +use tokio_stream::StreamExt; + +mod exec; +mod terminal; + +#[derive(Debug, Error)] +enum ServiceError { + #[error("received an invalid body format for a valid message")] + BodyFormatError, +} + +struct Ctx { + body: T, +} + +impl Ctx +where + T: TryFrom, +{ + fn with_body(client: crate::messaging::Client, body: Bytes) -> Result { + Ok(Self { + body: body + .try_into() + .map_err(|_err| ServiceError::BodyFormatError)?, + }) + } +} + +async fn route_message( + client: crate::messaging::Client, + message: crate::messaging::Message, +) -> Result<(), ServiceError> { + match message.subject() { + crate::messaging::Subject::Health => {} + crate::messaging::Subject::Exec => { + let ctx = Ctx::with_body(client, message.body())?; + let _ = self::exec::exec(ctx).await; + } + crate::messaging::Subject::OpenTerminal => { + let ctx = Ctx::with_body(client, message.body())?; + let _ = self::terminal::open_terminal(ctx).await; + } + } + + Ok(()) +} + +pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> { + let mut message_stream = client + .subscribe() + .await + .context("could not initialize services system")?; + + tokio::spawn(async move { + while let Some(message) = message_stream.next().await { + // TODO: How do i handle this error? + if let Err(err) = route_message(client.clone(), message).await { + tracing::warn!("{}", err); + }; + } + }); + + Ok(()) +} diff --git a/agent/src/services/terminal.rs b/agent/src/services/terminal.rs new file mode 100644 index 0000000..2553b4e --- /dev/null +++ b/agent/src/services/terminal.rs @@ -0,0 +1,45 @@ +use bytes::Bytes; +use futures::Stream; +use serde::Deserialize; +use tokio::io::AsyncWriteExt; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, FramedRead}; + +use super::Ctx; + +#[derive(Debug, Deserialize)] +pub struct OpenTerminalMessage { + id: String, +} + +impl TryFrom for OpenTerminalMessage { + type Error = serde_json::Error; + + fn try_from(value: Bytes) -> Result { + serde_json::from_slice(&value[..]) + } +} + +pub async fn open_terminal(ctx: Ctx) -> anyhow::Result<()> { + let pty = crate::pty::Pty::open()?; + let mut pty_clone = pty.try_clone()?; + + tokio::spawn(async move { + while let Some(data) = tokio_stream::once(b"foo").next().await { + if let Err(err) = pty_clone.write_all(&data[..]).await { + tracing::warn!(%err, "pseudoterminal write error"); + } + } + }); + + let _out_stream = FramedRead::new(pty, BytesCodec::new()).filter_map(|inner| { + inner + .map(|bytes| bytes.freeze()) + .map_err(|err| { + tracing::warn!(%err, "pseudoterminal read error"); + }) + .ok() + }); + + Ok(()) +}