From f59cfee7922dcc45d62c73f7d2797ba1fb863877 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Wed, 31 Jan 2024 22:37:05 +0200 Subject: [PATCH] make it bad, so to make it good later --- agent/src/health.rs | 35 +----- agent/src/main.rs | 53 +++++--- agent/src/messaging.rs | 194 ----------------------------- agent/src/messaging/mod.rs | 35 ++++++ agent/src/messaging/v1/exec.rs | 97 +++++++++++++++ agent/src/messaging/v1/mod.rs | 99 +++++++++++++++ agent/src/messaging/v1/terminal.rs | 129 +++++++++++++++++++ agent/src/services/exec.rs | 79 ------------ agent/src/services/mod.rs | 79 ------------ agent/src/services/terminal.rs | 66 ---------- 10 files changed, 402 insertions(+), 464 deletions(-) delete mode 100644 agent/src/messaging.rs create mode 100644 agent/src/messaging/mod.rs create mode 100644 agent/src/messaging/v1/exec.rs create mode 100644 agent/src/messaging/v1/mod.rs create mode 100644 agent/src/messaging/v1/terminal.rs delete mode 100644 agent/src/services/exec.rs delete mode 100644 agent/src/services/mod.rs delete mode 100644 agent/src/services/terminal.rs diff --git a/agent/src/health.rs b/agent/src/health.rs index 321af35..8c1f303 100644 --- a/agent/src/health.rs +++ b/agent/src/health.rs @@ -1,12 +1,10 @@ //! System health information and checking -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use serde::{Deserialize, Serialize}; use tokio::sync::watch; -use crate::messaging::Message; - const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; const DISK_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; @@ -77,7 +75,8 @@ impl HealthMonitor { Self(Arc::new(sender)) } - pub fn check_system(&self, system: &System) { + pub fn check_system(&self, system: &mut System) { + system.refresh_resources(); let sys = system.system(); let memory_usage = if sys.total_memory() > 0 { @@ -124,7 +123,6 @@ impl HealthMonitor { disks_usage.for_each(|(name, usage)| match health.disk_status.get_mut(name) { Some(DiskStatus::Normal) if usage > DISK_USAGE_CRITICAL_THRESHOLD => { - println!("{usage}"); health .disk_status .insert(name.to_owned(), DiskStatus::VeryHighUsage); @@ -170,30 +168,3 @@ impl Default for HealthMonitor { Self::new() } } - -pub async fn init_health_subsystem(client: crate::messaging::Client) -> HealthMonitor { - let health_monitor = HealthMonitor::new(); - let health_monitor_clone = health_monitor.clone(); - let health_monitor_ret = health_monitor.clone(); - let mut system = System::new(); - - // Forever refresh system resources and monitor changes - std::thread::spawn(move || loop { - const REFRESH_INTERVAL: Duration = Duration::from_secs(5); - system.refresh_resources(); - health_monitor.check_system(&system); - std::thread::sleep(REFRESH_INTERVAL); - }); - - tokio::spawn(async move { - let mut recv = health_monitor_clone.monitor(); - - while let Ok(()) = recv.changed().await { - tracing::info!(health = ?&*recv.borrow(), "health watermark"); - let health = recv.borrow().clone(); - client.publish(Message::health(health).unwrap()).await; - } - }); - - health_monitor_ret -} diff --git a/agent/src/main.rs b/agent/src/main.rs index 0ffc009..bd43804 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,38 +1,63 @@ -use health::init_health_subsystem; -use messaging::Client; -use services::init_services; -use tracing::Level; +use tokio::signal::ctrl_c; mod health; mod messaging; mod pty; -mod services; #[tokio::main] async fn main() -> anyhow::Result<()> { let subscriber = tracing_subscriber::fmt() - .with_max_level(Level::TRACE) + .with_max_level(tracing::Level::TRACE) .finish(); tracing::subscriber::set_global_default(subscriber) .expect("to set a tracing global subscriber"); run().await.map_err(|err| { - tracing::error!(cause = %err, "could not start agent"); - err + tracing::error!(%err, "failed to start the agent!"); + err.context("failed to start the Prymn agent") }) } async fn run() -> anyhow::Result<()> { - let client = Client::connect("demo_agent").await?; + let client = messaging::Client::connect("demo_agent").await?; - let _health_monitor = init_health_subsystem(client.clone()).await; - tracing::info!("initialized health system"); + init_health_subsystem(client.clone()); + tracing::info!("initialized health subsystem"); - init_services(client).await?; + crate::messaging::init_services(client).await?; tracing::info!("initialized services"); - tracing::info!("agent is ready"); - tokio::signal::ctrl_c().await?; + ctrl_c().await?; Ok(()) } + +fn init_health_subsystem(client: messaging::Client) { + let health = health::HealthMonitor::new(); + let mut health_monitor = health.monitor(); + + tokio::spawn(async move { + while health_monitor.changed().await.is_ok() { + let data = { + let health = &*health_monitor.borrow(); + tracing::info!(?health, "health watermark"); + serde_json::to_vec(health).unwrap() + }; + + if let Err(err) = client.publish("health", data).await { + // TODO: Error recovery? + tracing::warn!(cause = %err, "health subsystem publishing failed"); + } + } + }); + + std::thread::spawn(move || { + const INTERVAL: std::time::Duration = std::time::Duration::from_secs(5); + let mut system = health::System::new(); + + loop { + health.check_system(&mut system); + std::thread::sleep(INTERVAL); + } + }); +} diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs deleted file mode 100644 index 65f502e..0000000 --- a/agent/src/messaging.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::{ - fmt::{Debug, Display}, - sync::Arc, -}; - -use anyhow::{anyhow, Result}; -use bytes::Bytes; -use futures::Stream; -use tokio::sync::mpsc; -use tokio_stream::StreamExt; - -const PREFIX: &'static str = "agents.v1"; - -#[derive(Debug)] -pub enum Subject { - Health, - Exec, - OpenTerminal, -} - -impl Display for Subject { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Subject::Health => write!(f, "health"), - Subject::Exec => write!(f, "exec"), - Subject::OpenTerminal => write!(f, "open_terminal"), - } - } -} - -impl TryFrom<&str> for Subject { - type Error = anyhow::Error; - - fn try_from(value: &str) -> Result { - Ok(match value { - "health" => Subject::Health, - "exec" => Subject::Exec, - "open_terminal" => Subject::OpenTerminal, - _ => return Err(anyhow!("unknown subject '{}'", value)), - }) - } -} - -#[derive(Debug)] -pub struct Message { - subject: Subject, - payload: Bytes, - // reply: Option, -} - -impl Message { - fn from_transport(subject: &str, payload: Bytes) -> Result { - let subject = subject.try_into()?; - - // msg.headers.unwrap().get("x-idempotent-key"); - - Ok(Message { - subject, - payload, - // reply: None, - }) - } - - pub fn subject(&self) -> &Subject { - &self.subject - } - - pub fn body(&self) -> Bytes { - self.payload.clone() - } - - pub fn health(health: crate::health::Health) -> Result { - Ok(Message { - subject: Subject::Health, - payload: Bytes::from_iter(serde_json::to_vec(&health)?), - // reply: None, - }) - } -} - -#[derive(Clone)] -pub struct Client { - id: Arc, - nats: async_nats::Client, - subj_prefix: Arc, -} - -impl Client { - pub async fn connect(id: &str) -> Result { - let nats = async_nats::ConnectOptions::new() - .name(format!("Prymn Agent {id}")) - .custom_inbox_prefix(format!("_INBOX_{id}")) - .user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned()) - .connect("localhost") - .await?; - - tracing::debug!("connected to NATS"); - - Ok(Self { - nats, - id: Arc::new(id.to_owned()), - subj_prefix: Arc::new(format!("{}.{}", PREFIX, id)), - }) - } - - pub async fn publish(&self, msg: Message) { - let subject = format!("agents.v1.{}.{}", self.id, msg.subject); - self.nats.publish(subject, msg.payload).await.unwrap(); - } - - // pub async fn reply(&self, msg: Message, mut stream: impl Stream + Unpin) { - // match msg.reply { - // Some(reply) => { - // while let Some(data) = stream.next().await { - // self.nats.publish(reply.clone(), data.into()).await.unwrap(); - // } - // } - // None => tracing::warn!(?msg, "tried to reply to message without a reply subject"), - // } - // } - - pub async fn messages_channel(&self) -> Result> { - let (sender, receiver) = mpsc::channel(100); - let mut stream = self.subscribe("*").await?; - - let self_clone = self.clone(); - tokio::spawn(async move { - while let Some(msg) = stream.next().await { - self_clone - .clone() - .open_subchannels(&msg, sender.clone()) - .await; - - sender.send(msg).await.unwrap(); - } - }); - - Ok(receiver) - } - - async fn subscribe(&self, subject: &str) -> Result> { - let prefix = Arc::clone(&self.subj_prefix); - - let stream = self - .nats - .subscribe(format!("{}.{}", self.subj_prefix, subject)) - .await? - .filter_map(move |data| { - let subject = data.subject[..].trim_start_matches(&*prefix); - - match Message::from_transport(subject, data.payload) { - Ok(msg) => Some(msg), - Err(err) => { - tracing::warn!("{}", err); - None - } - } - }); - - tracing::debug!("subscribed on {}", subject); - - Ok(stream) - } - - async fn open_subchannels(&self, message: &Message, sender: mpsc::Sender) { - fn send_messages_from_stream( - mut stream: impl Stream + Send + Unpin + 'static, - sender: mpsc::Sender, - ) { - tokio::spawn(async move { - while let Some(message) = stream.next().await { - sender.send(message).await.unwrap(); - } - }); - } - - match message.subject { - Subject::OpenTerminal => { - let input_stream = self.subscribe("terminal.key.input").await.unwrap(); - let resize_stream = self.subscribe("terminal.key.resize").await.unwrap(); - - send_messages_from_stream(input_stream, sender.clone()); - send_messages_from_stream(resize_stream, sender); - } - _ => {} - } - } -} - -impl Debug for Client { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Client {{ id: {} }}", self.id) - } -} diff --git a/agent/src/messaging/mod.rs b/agent/src/messaging/mod.rs new file mode 100644 index 0000000..a5399e0 --- /dev/null +++ b/agent/src/messaging/mod.rs @@ -0,0 +1,35 @@ +use std::convert::Infallible; + +use bytes::Bytes; +use thiserror::Error; + +mod v1; + +pub use v1::Client; + +#[derive(Debug, Error)] +pub enum MessagingError { + #[error("failed to connect to messaging transport")] + ConnectError, + #[error("failed to publish message to messaging transport")] + PublishError, + #[error("failed to subscribe to messaging transport")] + SubscribeError, +} + +pub struct RawData(Bytes); + +impl TryFrom for RawData { + type Error = Infallible; + + fn try_from(value: async_nats::Message) -> Result { + Ok(Self(value.payload)) + } +} + +pub async fn init_services(client: Client) -> anyhow::Result<()> { + self::v1::terminal::init_terminal_service(client.clone()).await?; + self::v1::exec::init_exec_service(client).await?; + + Ok(()) +} diff --git a/agent/src/messaging/v1/exec.rs b/agent/src/messaging/v1/exec.rs new file mode 100644 index 0000000..3223c1e --- /dev/null +++ b/agent/src/messaging/v1/exec.rs @@ -0,0 +1,97 @@ +use std::process::Stdio; + +use serde::{Deserialize, Serialize}; +use tokio::process::Command; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, FramedRead}; + +#[derive(Deserialize)] +struct ExecReq { + user: String, + program: String, + #[serde(default)] + args: Vec, +} + +#[derive(Serialize)] +enum ExecOut { + Stdout(String), + Stderr(String), + Exit(String), +} + +impl TryFrom for ExecReq { + type Error = serde_json::Error; + + fn try_from(value: async_nats::Message) -> Result { + serde_json::from_slice(&value.payload[..]) + } +} + +impl ExecReq { + async fn handle(&self, client: crate::messaging::Client) -> anyhow::Result<()> { + let mut cmd = if self.user != "root" { + let mut cmd = Command::new("sudo"); + cmd.arg("-iu").arg(&self.user).arg("--").arg(&self.program); + cmd + } else { + Command::new(&self.program) + }; + + let mut io = cmd + .args(&self.args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let stdout = + FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| { + stdout + .map(|bytes| ExecOut::Stdout(String::from_utf8_lossy(&bytes[..]).to_string())) + .map_err(|err| tracing::error!(%err, "read error on stdout")) + .ok() + }); + + let stderr = + FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| { + stderr + .map(|bytes| ExecOut::Stderr(String::from_utf8_lossy(&bytes[..]).to_string())) + .map_err(|err| tracing::error!(%err, "read error on stderr")) + .ok() + }); + + let stream = stdout + .merge(stderr) + .chain(futures::FutureExt::into_stream(async move { + io.wait() + .await + .map(|exit_status| ExecOut::Exit(exit_status.to_string())) + .unwrap() + })); + + tokio::spawn(async move { + tokio::pin!(stream); + + while let Some(resp) = stream.next().await { + client + .publish("exec.reply", serde_json::to_vec(&resp).unwrap()) + .await + .unwrap(); + } + }); + + Ok(()) + } +} + +pub async fn init_exec_service(client: crate::messaging::Client) -> anyhow::Result<()> { + let mut stream = client.subscribe::("exec").await?; + + tokio::spawn(async move { + while let Some(msg) = stream.next().await { + msg.handle(client.clone()).await.unwrap(); + } + }); + + Ok(()) +} diff --git a/agent/src/messaging/v1/mod.rs b/agent/src/messaging/v1/mod.rs new file mode 100644 index 0000000..a80f3df --- /dev/null +++ b/agent/src/messaging/v1/mod.rs @@ -0,0 +1,99 @@ +pub mod exec; +pub mod terminal; + +use std::sync::Arc; + +use bytes::Bytes; +use futures::Stream; +use tokio_stream::StreamExt; + +#[derive(Clone)] +pub struct Client { + id: Arc, + prefix: Arc, + nats: async_nats::Client, +} + +impl Client { + pub async fn connect(id: &str) -> Result { + let nats = async_nats::ConnectOptions::with_user_and_password( + String::from("demo_agent"), + String::from("demo_agent_password"), + ) + .name(format!("Prymn Agent {id}")) + .custom_inbox_prefix(format!("_INBOX_{id}")) + .connect("localhost") + .await + .map_err(|err| { + tracing::debug!("nats error: {}", err); + super::MessagingError::ConnectError + })?; + + Ok(Self { + id: Arc::new(String::from(id)), + prefix: Arc::new(format!("agents.v1.{}", id)), + nats, + }) + } + + pub async fn publish( + &self, + subject: &str, + payload: Vec, + ) -> Result<(), super::MessagingError> { + tracing::trace!("publish {} {} bytes", subject, payload.len()); + + self.nats + .publish(format!("{}.{}", self.prefix, subject), Bytes::from(payload)) + .await + .map_err(|err| { + tracing::debug!("nats error: {}", err); + super::MessagingError::PublishError + })?; + + Ok(()) + } + + pub async fn subscribe( + &self, + subject: &str, + ) -> Result, super::MessagingError> + where + T: TryFrom, + >::Error: std::fmt::Display, + { + tracing::trace!("subscribe {}", subject); + + let stream = self + .nats + .subscribe(format!("{}.{}", self.prefix, subject)) + .await + .map_err(|err| { + tracing::debug!("nats error: {}", err); + super::MessagingError::SubscribeError + })? + .filter_map(|data| { + let subject = data.subject.clone(); + + match T::try_from(data) { + Ok(value) => Some(value), + Err(err) => { + tracing::warn!( + %subject, + "failed to convert payload to concrete type: {}", + err + ); + None + } + } + }); + + Ok(stream) + } +} + +impl std::fmt::Debug for Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Client {{ id: {} }}", self.id) + } +} diff --git a/agent/src/messaging/v1/terminal.rs b/agent/src/messaging/v1/terminal.rs new file mode 100644 index 0000000..2f35487 --- /dev/null +++ b/agent/src/messaging/v1/terminal.rs @@ -0,0 +1,129 @@ +use serde::Deserialize; +use tokio::io::AsyncWriteExt; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, FramedRead}; + +use crate::messaging::MessagingError; + +#[derive(Deserialize)] +struct ResizeMessage { + rows: u16, + cols: u16, +} + +impl TryFrom for ResizeMessage { + type Error = serde_json::Error; + + fn try_from(value: async_nats::Message) -> Result { + serde_json::from_slice(&value.payload[..]) + } +} + +#[derive(Deserialize)] +pub struct OpenTerminalMessage { + id: String, + shell: Option, +} + +impl TryFrom for OpenTerminalMessage { + type Error = serde_json::Error; + + fn try_from(value: async_nats::Message) -> Result { + serde_json::from_slice(&value.payload[..]) + } +} + +impl OpenTerminalMessage { + pub async fn handle(&self, client: crate::messaging::Client) -> Result<(), MessagingError> { + let mut input_stream = client + .subscribe::(&format!("terminal.{}.input", self.id)) + .await?; + + let mut resize_stream = client + .subscribe::(&format!("terminal.{}.resize", self.id)) + .await?; + + let mut close_stream = client + .subscribe::(&format!("terminal.{}.close", self.id)) + .await?; + + let mut pty = crate::pty::Pty::open().unwrap(); + let pty_clone = pty.try_clone().unwrap(); + let pty_child = pty.child().unwrap(); + + let shell = if let Some(shell) = &self.shell { + shell + } else { + "/bin/bash" + }; + + let mut child = crate::pty::open_shell(pty_child, shell).unwrap(); + + tokio::spawn(async move { + loop { + tokio::select! { + input = input_stream.next() => { + if let Some(data) = input { + pty.write_all(&data.0[..]).await.unwrap(); + tracing::trace!(data = ?data.0, "terminal: wrote {} bytes", data.0.len()); + } else { + tracing::debug!("terminal input ended stream"); + break; + } + } + resize = resize_stream.next() => { + if let Some(resize) = resize { + pty.resize_window(resize.rows, resize.cols).unwrap(); + tracing::trace!("terminal: resize ({}, {})", resize.rows, resize.cols); + } else { + tracing::debug!("terminal resize ended stream"); + break; + } + } + _close = close_stream.next() => { + tracing::info!("closing terminal"); + child.kill().await.expect("the child to exit normally"); + break; + } + io_result = child.wait() => { + tracing::info!("child process exited: {:?}", io_result); + break; + } + } + } + }); + + let publish_subject = format!("terminal.{}.output", self.id.clone()); + let mut out_stream = FramedRead::new(pty_clone, BytesCodec::new()) + .filter_map(|inner| inner.map(|bytes| bytes.freeze()).ok()); + + tokio::spawn(async move { + while let Some(data) = out_stream.next().await { + client + .publish(&publish_subject, data.to_vec()) + .await + .unwrap(); + + tracing::trace!("terminal: input {} bytes", data.len()); + } + }); + + Ok(()) + } +} + +pub async fn init_terminal_service( + client: crate::messaging::Client, +) -> anyhow::Result<(), MessagingError> { + let mut subscriber = client + .subscribe::("open_terminal") + .await?; + + tokio::spawn(async move { + while let Some(msg) = subscriber.next().await { + msg.handle(client.clone()).await.unwrap(); + } + }); + + Ok(()) +} diff --git a/agent/src/services/exec.rs b/agent/src/services/exec.rs deleted file mode 100644 index 0609244..0000000 --- a/agent/src/services/exec.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::process::Stdio; - -use bytes::Bytes; -use futures::FutureExt; -use serde::Deserialize; -use tokio::process::Command; -use tokio_stream::StreamExt; -use tokio_util::codec::{BytesCodec, FramedRead}; - -use super::Ctx; - -#[derive(Debug, Deserialize)] -pub struct ExecReq { - user: String, - program: String, - #[serde(default)] - args: Vec, -} - -impl TryFrom for ExecReq { - type Error = serde_json::Error; - - fn try_from(value: Bytes) -> Result { - serde_json::from_slice(&value[..]) - } -} - -/// An operating system program execution. -pub async fn exec(ctx: Ctx) -> anyhow::Result<()> { - // TODO: Tasks should be idempontent - // TODO: Root user should be able to run only specific programs - - let mut cmd = if &ctx.body.user != "root" { - let mut cmd = Command::new("sudo"); - cmd.arg("-iu") - .arg(&ctx.body.user) - .arg("--") - .arg(&ctx.body.program); - cmd - } else { - Command::new(&ctx.body.program) - }; - - let mut io = cmd - .args(&ctx.body.args) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - let stdout = - FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| { - stdout - .map(|bytes| bytes.freeze()) - .map_err(|err| tracing::error!(%err, "read error on stdout")) - .ok() - }); - - let stderr = - FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| { - stderr - .map(|bytes| bytes.freeze()) - .map_err(|err| tracing::error!(%err, "read error on stderr")) - .ok() - }); - - let exit = async move { - io.wait() - .await - .map(|exit| { - let exit = exit.to_string(); - Bytes::from(exit) - }) - .unwrap() - } - .into_stream(); - - // Ok(Box::pin(stdout.merge(stderr).chain(exit))) - Ok(()) -} diff --git a/agent/src/services/mod.rs b/agent/src/services/mod.rs deleted file mode 100644 index d6df248..0000000 --- a/agent/src/services/mod.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::collections::HashMap; - -use bytes::Bytes; -use thiserror::Error; - -mod exec; -mod terminal; - -#[derive(Debug, Error)] -enum ServiceError { - #[error("received an invalid body format for a valid message")] - BodyFormatError, -} - -struct Service { - terminals: HashMap, -} - -impl Service { - fn serve(&mut self, message: crate::messaging::Message) { - match message.subject() { - crate::messaging::Subject::OpenTerminal => {} - crate::messaging::Subject::Exec => todo!(), - crate::messaging::Subject::Health => todo!(), - } - } -} - -async fn route_message(message: crate::messaging::Message) -> Result<(), ServiceError> { - match message.subject() { - crate::messaging::Subject::Health => {} - crate::messaging::Subject::Exec => { - let ctx = Ctx::with_body(message.body())?; - let _ = self::exec::exec(ctx).await; - } - crate::messaging::Subject::OpenTerminal => { - let ctx = Ctx::with_body(message.body())?; - let _ = self::terminal::open_terminal(ctx).await; - } - } - - Ok(()) -} - -struct Ctx { - body: T, - terminals: HashMap, -} - -impl Ctx -where - T: TryFrom, -{ - fn with_body(body: Bytes) -> Result { - let body = body - .try_into() - .map_err(|_err| ServiceError::BodyFormatError)?; - - Ok(Self { - body, - terminals: HashMap::default(), - }) - } -} - -pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> { - let mut recv = client.messages_channel().await?; - - tokio::spawn(async move { - while let Some(msg) = recv.recv().await { - // TODO: How do i handle this error? - if let Err(err) = route_message(msg).await { - tracing::warn!("{}", err); - }; - } - }); - - Ok(()) -} diff --git a/agent/src/services/terminal.rs b/agent/src/services/terminal.rs deleted file mode 100644 index 6511b11..0000000 --- a/agent/src/services/terminal.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::convert::Infallible; - -use bytes::Bytes; -use serde::Deserialize; -use tokio::io::AsyncWriteExt; -use tokio_stream::StreamExt; -use tokio_util::codec::{BytesCodec, FramedRead}; - -use crate::pty::open_shell; - -use super::Ctx; - -#[derive(Debug, Deserialize)] -pub struct OpenTerminalMessage(Bytes); - -impl TryFrom for OpenTerminalMessage { - type Error = Infallible; - - fn try_from(value: Bytes) -> Result { - Ok(Self(value)) - } -} - -#[derive(Debug, Deserialize)] -pub struct TerminalInput(Bytes); - -impl TryFrom for TerminalInput { - type Error = Infallible; - - fn try_from(value: Bytes) -> Result { - Ok(Self(value)) - } -} - -pub async fn open_terminal( - mut ctx: Ctx, -) -> anyhow::Result> { - let pty = crate::pty::Pty::open()?; - let shell = open_shell(pty.child()?, "/bin/bash")?; - - let _out_stream = FramedRead::new(pty.try_clone()?, BytesCodec::new()).filter_map(|inner| { - inner - .map(|bytes| bytes.freeze()) - .map_err(|err| { - tracing::warn!(%err, "pseudoterminal read error"); - }) - .ok() - }); - - ctx.terminals.insert(String::from("test"), (pty, shell)); - - Ok(ctx) -} - -pub async fn terminal_input( - terminal_id: &str, - mut ctx: Ctx, -) -> anyhow::Result> { - let (pty, _) = ctx.terminals.get_mut(terminal_id).unwrap(); - - if let Err(err) = pty.write_all(&ctx.body.0[..]).await { - tracing::warn!(%err, "pseudoterminal write error"); - } - - Ok(ctx) -}