dotfiles/agent/src/system/exec.rs
Nikos Papadakis 5d948f4c19 Add health checking system to agent
Adds a health checking endpoint on the GRPC server. This is a stream that changes whenever a health status update occurs.

Reviewed-on: https://git.nikos.gg/prymn/prymn/pulls/5
Co-authored-by: Nikos Papadakis <nikos@papadakis.xyz>
Co-committed-by: Nikos Papadakis <nikos@papadakis.xyz>
2023-08-12 09:37:01 +00:00

122 lines
2.9 KiB
Rust

use std::{
ffi::OsStr,
process::{ExitStatus, Stdio},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
sync::mpsc,
};
#[derive(Debug)]
pub enum ExecOutput {
Output {
stdout: Option<String>,
stderr: Option<String>,
},
Exit(ExitStatus),
Error(String),
}
pub fn exec<P, A>(program: P, args: &[A]) -> anyhow::Result<mpsc::Receiver<ExecOutput>>
where
P: AsRef<OsStr>,
A: AsRef<OsStr>,
{
let (tx, rx) = mpsc::channel(4);
let command = Command::new(program)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let fut = run_process(command, tx);
tokio::spawn(fut);
Ok(rx)
}
async fn run_process(mut command: Child, sender: mpsc::Sender<ExecOutput>) {
let mut stdout = {
let stdout = command.stdout.take().expect("bug: no pipe for stdout");
BufReader::new(stdout).lines()
};
let mut stderr = {
let stderr = command.stderr.take().expect("bug: no pipe for stderr");
BufReader::new(stderr).lines()
};
loop {
match (stdout.next_line().await, stderr.next_line().await) {
(Ok(None), Ok(None)) => break,
(Ok(stdout), Ok(stderr)) => sender
.send(ExecOutput::Output { stdout, stderr })
.await
.expect("stream closed"),
(Err(err), _) | (_, Err(err)) => sender
.send(ExecOutput::Error(err.to_string()))
.await
.expect("stream closed"),
}
}
match command.wait().await {
Ok(exit_status) => sender
.send(ExecOutput::Exit(exit_status))
.await
.expect("stream closed"),
Err(err) => sender
.send(ExecOutput::Error(err.to_string()))
.await
.expect("stream closed"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn exec_works() {
let mut rx = exec(&"echo", &["1\n2\n3"]).expect("to spawn command");
let mut outputs = vec![];
while let Some(output) = rx.recv().await {
outputs.push(output);
}
assert_eq!(outputs.len(), 4);
let ExecOutput::Output {
ref stdout,
ref stderr,
} = outputs[0]
else {
panic!()
};
assert_eq!(*stdout, Some("1".to_owned()));
assert_eq!(*stderr, None);
let ExecOutput::Output {
ref stdout,
ref stderr,
} = outputs[1]
else {
panic!()
};
assert_eq!(*stdout, Some("2".to_owned()));
assert_eq!(*stderr, None);
let ExecOutput::Output {
ref stdout,
ref stderr,
} = outputs[2]
else {
panic!()
};
assert_eq!(*stdout, Some("3".to_owned()));
assert_eq!(*stderr, None);
}
}