make it bad, so to make it good later

This commit is contained in:
Nikos Papadakis 2024-01-31 22:37:05 +02:00
parent b7a29405f0
commit f59cfee792
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
10 changed files with 402 additions and 464 deletions

View file

@ -1,12 +1,10 @@
//! System health information and checking
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::messaging::Message;
const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
const DISK_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
@ -77,7 +75,8 @@ impl HealthMonitor {
Self(Arc::new(sender))
}
pub fn check_system(&self, system: &System) {
pub fn check_system(&self, system: &mut System) {
system.refresh_resources();
let sys = system.system();
let memory_usage = if sys.total_memory() > 0 {
@ -124,7 +123,6 @@ impl HealthMonitor {
disks_usage.for_each(|(name, usage)| match health.disk_status.get_mut(name) {
Some(DiskStatus::Normal) if usage > DISK_USAGE_CRITICAL_THRESHOLD => {
println!("{usage}");
health
.disk_status
.insert(name.to_owned(), DiskStatus::VeryHighUsage);
@ -170,30 +168,3 @@ impl Default for HealthMonitor {
Self::new()
}
}
pub async fn init_health_subsystem(client: crate::messaging::Client) -> HealthMonitor {
let health_monitor = HealthMonitor::new();
let health_monitor_clone = health_monitor.clone();
let health_monitor_ret = health_monitor.clone();
let mut system = System::new();
// Forever refresh system resources and monitor changes
std::thread::spawn(move || loop {
const REFRESH_INTERVAL: Duration = Duration::from_secs(5);
system.refresh_resources();
health_monitor.check_system(&system);
std::thread::sleep(REFRESH_INTERVAL);
});
tokio::spawn(async move {
let mut recv = health_monitor_clone.monitor();
while let Ok(()) = recv.changed().await {
tracing::info!(health = ?&*recv.borrow(), "health watermark");
let health = recv.borrow().clone();
client.publish(Message::health(health).unwrap()).await;
}
});
health_monitor_ret
}

View file

@ -1,38 +1,63 @@
use health::init_health_subsystem;
use messaging::Client;
use services::init_services;
use tracing::Level;
use tokio::signal::ctrl_c;
mod health;
mod messaging;
mod pty;
mod services;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("to set a tracing global subscriber");
run().await.map_err(|err| {
tracing::error!(cause = %err, "could not start agent");
err
tracing::error!(%err, "failed to start the agent!");
err.context("failed to start the Prymn agent")
})
}
async fn run() -> anyhow::Result<()> {
let client = Client::connect("demo_agent").await?;
let client = messaging::Client::connect("demo_agent").await?;
let _health_monitor = init_health_subsystem(client.clone()).await;
tracing::info!("initialized health system");
init_health_subsystem(client.clone());
tracing::info!("initialized health subsystem");
init_services(client).await?;
crate::messaging::init_services(client).await?;
tracing::info!("initialized services");
tracing::info!("agent is ready");
tokio::signal::ctrl_c().await?;
ctrl_c().await?;
Ok(())
}
fn init_health_subsystem(client: messaging::Client) {
let health = health::HealthMonitor::new();
let mut health_monitor = health.monitor();
tokio::spawn(async move {
while health_monitor.changed().await.is_ok() {
let data = {
let health = &*health_monitor.borrow();
tracing::info!(?health, "health watermark");
serde_json::to_vec(health).unwrap()
};
if let Err(err) = client.publish("health", data).await {
// TODO: Error recovery?
tracing::warn!(cause = %err, "health subsystem publishing failed");
}
}
});
std::thread::spawn(move || {
const INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
let mut system = health::System::new();
loop {
health.check_system(&mut system);
std::thread::sleep(INTERVAL);
}
});
}

View file

@ -1,194 +0,0 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
const PREFIX: &'static str = "agents.v1";
#[derive(Debug)]
pub enum Subject {
Health,
Exec,
OpenTerminal,
}
impl Display for Subject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Subject::Health => write!(f, "health"),
Subject::Exec => write!(f, "exec"),
Subject::OpenTerminal => write!(f, "open_terminal"),
}
}
}
impl TryFrom<&str> for Subject {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self> {
Ok(match value {
"health" => Subject::Health,
"exec" => Subject::Exec,
"open_terminal" => Subject::OpenTerminal,
_ => return Err(anyhow!("unknown subject '{}'", value)),
})
}
}
#[derive(Debug)]
pub struct Message {
subject: Subject,
payload: Bytes,
// reply: Option<async_nats::Subject>,
}
impl Message {
fn from_transport(subject: &str, payload: Bytes) -> Result<Self> {
let subject = subject.try_into()?;
// msg.headers.unwrap().get("x-idempotent-key");
Ok(Message {
subject,
payload,
// reply: None,
})
}
pub fn subject(&self) -> &Subject {
&self.subject
}
pub fn body(&self) -> Bytes {
self.payload.clone()
}
pub fn health(health: crate::health::Health) -> Result<Message> {
Ok(Message {
subject: Subject::Health,
payload: Bytes::from_iter(serde_json::to_vec(&health)?),
// reply: None,
})
}
}
#[derive(Clone)]
pub struct Client {
id: Arc<String>,
nats: async_nats::Client,
subj_prefix: Arc<String>,
}
impl Client {
pub async fn connect(id: &str) -> Result<Self> {
let nats = async_nats::ConnectOptions::new()
.name(format!("Prymn Agent {id}"))
.custom_inbox_prefix(format!("_INBOX_{id}"))
.user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned())
.connect("localhost")
.await?;
tracing::debug!("connected to NATS");
Ok(Self {
nats,
id: Arc::new(id.to_owned()),
subj_prefix: Arc::new(format!("{}.{}", PREFIX, id)),
})
}
pub async fn publish(&self, msg: Message) {
let subject = format!("agents.v1.{}.{}", self.id, msg.subject);
self.nats.publish(subject, msg.payload).await.unwrap();
}
// pub async fn reply(&self, msg: Message, mut stream: impl Stream<Item = Bytes> + Unpin) {
// match msg.reply {
// Some(reply) => {
// while let Some(data) = stream.next().await {
// self.nats.publish(reply.clone(), data.into()).await.unwrap();
// }
// }
// None => tracing::warn!(?msg, "tried to reply to message without a reply subject"),
// }
// }
pub async fn messages_channel(&self) -> Result<mpsc::Receiver<Message>> {
let (sender, receiver) = mpsc::channel(100);
let mut stream = self.subscribe("*").await?;
let self_clone = self.clone();
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
self_clone
.clone()
.open_subchannels(&msg, sender.clone())
.await;
sender.send(msg).await.unwrap();
}
});
Ok(receiver)
}
async fn subscribe(&self, subject: &str) -> Result<impl Stream<Item = Message>> {
let prefix = Arc::clone(&self.subj_prefix);
let stream = self
.nats
.subscribe(format!("{}.{}", self.subj_prefix, subject))
.await?
.filter_map(move |data| {
let subject = data.subject[..].trim_start_matches(&*prefix);
match Message::from_transport(subject, data.payload) {
Ok(msg) => Some(msg),
Err(err) => {
tracing::warn!("{}", err);
None
}
}
});
tracing::debug!("subscribed on {}", subject);
Ok(stream)
}
async fn open_subchannels(&self, message: &Message, sender: mpsc::Sender<Message>) {
fn send_messages_from_stream(
mut stream: impl Stream<Item = Message> + Send + Unpin + 'static,
sender: mpsc::Sender<Message>,
) {
tokio::spawn(async move {
while let Some(message) = stream.next().await {
sender.send(message).await.unwrap();
}
});
}
match message.subject {
Subject::OpenTerminal => {
let input_stream = self.subscribe("terminal.key.input").await.unwrap();
let resize_stream = self.subscribe("terminal.key.resize").await.unwrap();
send_messages_from_stream(input_stream, sender.clone());
send_messages_from_stream(resize_stream, sender);
}
_ => {}
}
}
}
impl Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Client {{ id: {} }}", self.id)
}
}

View file

@ -0,0 +1,35 @@
use std::convert::Infallible;
use bytes::Bytes;
use thiserror::Error;
mod v1;
pub use v1::Client;
#[derive(Debug, Error)]
pub enum MessagingError {
#[error("failed to connect to messaging transport")]
ConnectError,
#[error("failed to publish message to messaging transport")]
PublishError,
#[error("failed to subscribe to messaging transport")]
SubscribeError,
}
pub struct RawData(Bytes);
impl TryFrom<async_nats::Message> for RawData {
type Error = Infallible;
fn try_from(value: async_nats::Message) -> Result<Self, Self::Error> {
Ok(Self(value.payload))
}
}
pub async fn init_services(client: Client) -> anyhow::Result<()> {
self::v1::terminal::init_terminal_service(client.clone()).await?;
self::v1::exec::init_exec_service(client).await?;
Ok(())
}

View file

@ -0,0 +1,97 @@
use std::process::Stdio;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
#[derive(Deserialize)]
struct ExecReq {
user: String,
program: String,
#[serde(default)]
args: Vec<String>,
}
#[derive(Serialize)]
enum ExecOut {
Stdout(String),
Stderr(String),
Exit(String),
}
impl TryFrom<async_nats::Message> for ExecReq {
type Error = serde_json::Error;
fn try_from(value: async_nats::Message) -> Result<Self, Self::Error> {
serde_json::from_slice(&value.payload[..])
}
}
impl ExecReq {
async fn handle(&self, client: crate::messaging::Client) -> anyhow::Result<()> {
let mut cmd = if self.user != "root" {
let mut cmd = Command::new("sudo");
cmd.arg("-iu").arg(&self.user).arg("--").arg(&self.program);
cmd
} else {
Command::new(&self.program)
};
let mut io = cmd
.args(&self.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout =
FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| {
stdout
.map(|bytes| ExecOut::Stdout(String::from_utf8_lossy(&bytes[..]).to_string()))
.map_err(|err| tracing::error!(%err, "read error on stdout"))
.ok()
});
let stderr =
FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| {
stderr
.map(|bytes| ExecOut::Stderr(String::from_utf8_lossy(&bytes[..]).to_string()))
.map_err(|err| tracing::error!(%err, "read error on stderr"))
.ok()
});
let stream = stdout
.merge(stderr)
.chain(futures::FutureExt::into_stream(async move {
io.wait()
.await
.map(|exit_status| ExecOut::Exit(exit_status.to_string()))
.unwrap()
}));
tokio::spawn(async move {
tokio::pin!(stream);
while let Some(resp) = stream.next().await {
client
.publish("exec.reply", serde_json::to_vec(&resp).unwrap())
.await
.unwrap();
}
});
Ok(())
}
}
pub async fn init_exec_service(client: crate::messaging::Client) -> anyhow::Result<()> {
let mut stream = client.subscribe::<ExecReq>("exec").await?;
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
msg.handle(client.clone()).await.unwrap();
}
});
Ok(())
}

View file

@ -0,0 +1,99 @@
pub mod exec;
pub mod terminal;
use std::sync::Arc;
use bytes::Bytes;
use futures::Stream;
use tokio_stream::StreamExt;
#[derive(Clone)]
pub struct Client {
id: Arc<String>,
prefix: Arc<String>,
nats: async_nats::Client,
}
impl Client {
pub async fn connect(id: &str) -> Result<Self, super::MessagingError> {
let nats = async_nats::ConnectOptions::with_user_and_password(
String::from("demo_agent"),
String::from("demo_agent_password"),
)
.name(format!("Prymn Agent {id}"))
.custom_inbox_prefix(format!("_INBOX_{id}"))
.connect("localhost")
.await
.map_err(|err| {
tracing::debug!("nats error: {}", err);
super::MessagingError::ConnectError
})?;
Ok(Self {
id: Arc::new(String::from(id)),
prefix: Arc::new(format!("agents.v1.{}", id)),
nats,
})
}
pub async fn publish(
&self,
subject: &str,
payload: Vec<u8>,
) -> Result<(), super::MessagingError> {
tracing::trace!("publish {} {} bytes", subject, payload.len());
self.nats
.publish(format!("{}.{}", self.prefix, subject), Bytes::from(payload))
.await
.map_err(|err| {
tracing::debug!("nats error: {}", err);
super::MessagingError::PublishError
})?;
Ok(())
}
pub async fn subscribe<T>(
&self,
subject: &str,
) -> Result<impl Stream<Item = T>, super::MessagingError>
where
T: TryFrom<async_nats::Message>,
<T as TryFrom<async_nats::Message>>::Error: std::fmt::Display,
{
tracing::trace!("subscribe {}", subject);
let stream = self
.nats
.subscribe(format!("{}.{}", self.prefix, subject))
.await
.map_err(|err| {
tracing::debug!("nats error: {}", err);
super::MessagingError::SubscribeError
})?
.filter_map(|data| {
let subject = data.subject.clone();
match T::try_from(data) {
Ok(value) => Some(value),
Err(err) => {
tracing::warn!(
%subject,
"failed to convert payload to concrete type: {}",
err
);
None
}
}
});
Ok(stream)
}
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Client {{ id: {} }}", self.id)
}
}

View file

@ -0,0 +1,129 @@
use serde::Deserialize;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use crate::messaging::MessagingError;
#[derive(Deserialize)]
struct ResizeMessage {
rows: u16,
cols: u16,
}
impl TryFrom<async_nats::Message> for ResizeMessage {
type Error = serde_json::Error;
fn try_from(value: async_nats::Message) -> Result<Self, Self::Error> {
serde_json::from_slice(&value.payload[..])
}
}
#[derive(Deserialize)]
pub struct OpenTerminalMessage {
id: String,
shell: Option<String>,
}
impl TryFrom<async_nats::Message> for OpenTerminalMessage {
type Error = serde_json::Error;
fn try_from(value: async_nats::Message) -> Result<Self, Self::Error> {
serde_json::from_slice(&value.payload[..])
}
}
impl OpenTerminalMessage {
pub async fn handle(&self, client: crate::messaging::Client) -> Result<(), MessagingError> {
let mut input_stream = client
.subscribe::<crate::messaging::RawData>(&format!("terminal.{}.input", self.id))
.await?;
let mut resize_stream = client
.subscribe::<ResizeMessage>(&format!("terminal.{}.resize", self.id))
.await?;
let mut close_stream = client
.subscribe::<crate::messaging::RawData>(&format!("terminal.{}.close", self.id))
.await?;
let mut pty = crate::pty::Pty::open().unwrap();
let pty_clone = pty.try_clone().unwrap();
let pty_child = pty.child().unwrap();
let shell = if let Some(shell) = &self.shell {
shell
} else {
"/bin/bash"
};
let mut child = crate::pty::open_shell(pty_child, shell).unwrap();
tokio::spawn(async move {
loop {
tokio::select! {
input = input_stream.next() => {
if let Some(data) = input {
pty.write_all(&data.0[..]).await.unwrap();
tracing::trace!(data = ?data.0, "terminal: wrote {} bytes", data.0.len());
} else {
tracing::debug!("terminal input ended stream");
break;
}
}
resize = resize_stream.next() => {
if let Some(resize) = resize {
pty.resize_window(resize.rows, resize.cols).unwrap();
tracing::trace!("terminal: resize ({}, {})", resize.rows, resize.cols);
} else {
tracing::debug!("terminal resize ended stream");
break;
}
}
_close = close_stream.next() => {
tracing::info!("closing terminal");
child.kill().await.expect("the child to exit normally");
break;
}
io_result = child.wait() => {
tracing::info!("child process exited: {:?}", io_result);
break;
}
}
}
});
let publish_subject = format!("terminal.{}.output", self.id.clone());
let mut out_stream = FramedRead::new(pty_clone, BytesCodec::new())
.filter_map(|inner| inner.map(|bytes| bytes.freeze()).ok());
tokio::spawn(async move {
while let Some(data) = out_stream.next().await {
client
.publish(&publish_subject, data.to_vec())
.await
.unwrap();
tracing::trace!("terminal: input {} bytes", data.len());
}
});
Ok(())
}
}
pub async fn init_terminal_service(
client: crate::messaging::Client,
) -> anyhow::Result<(), MessagingError> {
let mut subscriber = client
.subscribe::<OpenTerminalMessage>("open_terminal")
.await?;
tokio::spawn(async move {
while let Some(msg) = subscriber.next().await {
msg.handle(client.clone()).await.unwrap();
}
});
Ok(())
}

View file

@ -1,79 +0,0 @@
use std::process::Stdio;
use bytes::Bytes;
use futures::FutureExt;
use serde::Deserialize;
use tokio::process::Command;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use super::Ctx;
#[derive(Debug, Deserialize)]
pub struct ExecReq {
user: String,
program: String,
#[serde(default)]
args: Vec<String>,
}
impl TryFrom<Bytes> for ExecReq {
type Error = serde_json::Error;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
serde_json::from_slice(&value[..])
}
}
/// An operating system program execution.
pub async fn exec(ctx: Ctx<ExecReq>) -> anyhow::Result<()> {
// TODO: Tasks should be idempontent
// TODO: Root user should be able to run only specific programs
let mut cmd = if &ctx.body.user != "root" {
let mut cmd = Command::new("sudo");
cmd.arg("-iu")
.arg(&ctx.body.user)
.arg("--")
.arg(&ctx.body.program);
cmd
} else {
Command::new(&ctx.body.program)
};
let mut io = cmd
.args(&ctx.body.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout =
FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| {
stdout
.map(|bytes| bytes.freeze())
.map_err(|err| tracing::error!(%err, "read error on stdout"))
.ok()
});
let stderr =
FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| {
stderr
.map(|bytes| bytes.freeze())
.map_err(|err| tracing::error!(%err, "read error on stderr"))
.ok()
});
let exit = async move {
io.wait()
.await
.map(|exit| {
let exit = exit.to_string();
Bytes::from(exit)
})
.unwrap()
}
.into_stream();
// Ok(Box::pin(stdout.merge(stderr).chain(exit)))
Ok(())
}

View file

@ -1,79 +0,0 @@
use std::collections::HashMap;
use bytes::Bytes;
use thiserror::Error;
mod exec;
mod terminal;
#[derive(Debug, Error)]
enum ServiceError {
#[error("received an invalid body format for a valid message")]
BodyFormatError,
}
struct Service {
terminals: HashMap<String, ()>,
}
impl Service {
fn serve(&mut self, message: crate::messaging::Message) {
match message.subject() {
crate::messaging::Subject::OpenTerminal => {}
crate::messaging::Subject::Exec => todo!(),
crate::messaging::Subject::Health => todo!(),
}
}
}
async fn route_message(message: crate::messaging::Message) -> Result<(), ServiceError> {
match message.subject() {
crate::messaging::Subject::Health => {}
crate::messaging::Subject::Exec => {
let ctx = Ctx::with_body(message.body())?;
let _ = self::exec::exec(ctx).await;
}
crate::messaging::Subject::OpenTerminal => {
let ctx = Ctx::with_body(message.body())?;
let _ = self::terminal::open_terminal(ctx).await;
}
}
Ok(())
}
struct Ctx<T> {
body: T,
terminals: HashMap<String, (crate::pty::Pty, tokio::process::Child)>,
}
impl<T> Ctx<T>
where
T: TryFrom<Bytes>,
{
fn with_body(body: Bytes) -> Result<Self, ServiceError> {
let body = body
.try_into()
.map_err(|_err| ServiceError::BodyFormatError)?;
Ok(Self {
body,
terminals: HashMap::default(),
})
}
}
pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> {
let mut recv = client.messages_channel().await?;
tokio::spawn(async move {
while let Some(msg) = recv.recv().await {
// TODO: How do i handle this error?
if let Err(err) = route_message(msg).await {
tracing::warn!("{}", err);
};
}
});
Ok(())
}

View file

@ -1,66 +0,0 @@
use std::convert::Infallible;
use bytes::Bytes;
use serde::Deserialize;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use crate::pty::open_shell;
use super::Ctx;
#[derive(Debug, Deserialize)]
pub struct OpenTerminalMessage(Bytes);
impl TryFrom<Bytes> for OpenTerminalMessage {
type Error = Infallible;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
Ok(Self(value))
}
}
#[derive(Debug, Deserialize)]
pub struct TerminalInput(Bytes);
impl TryFrom<Bytes> for TerminalInput {
type Error = Infallible;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
Ok(Self(value))
}
}
pub async fn open_terminal(
mut ctx: Ctx<OpenTerminalMessage>,
) -> anyhow::Result<Ctx<OpenTerminalMessage>> {
let pty = crate::pty::Pty::open()?;
let shell = open_shell(pty.child()?, "/bin/bash")?;
let _out_stream = FramedRead::new(pty.try_clone()?, BytesCodec::new()).filter_map(|inner| {
inner
.map(|bytes| bytes.freeze())
.map_err(|err| {
tracing::warn!(%err, "pseudoterminal read error");
})
.ok()
});
ctx.terminals.insert(String::from("test"), (pty, shell));
Ok(ctx)
}
pub async fn terminal_input(
terminal_id: &str,
mut ctx: Ctx<TerminalInput>,
) -> anyhow::Result<Ctx<TerminalInput>> {
let (pty, _) = ctx.terminals.get_mut(terminal_id).unwrap();
if let Err(err) = pty.write_all(&ctx.body.0[..]).await {
tracing::warn!(%err, "pseudoterminal write error");
}
Ok(ctx)
}