From 01a0b038f7bf84eeb4ffe82385f0edcf4eb349bb Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Thu, 15 Jun 2023 13:55:24 +0300 Subject: [PATCH] Improve exec module --- agent/Cargo.toml | 2 +- agent/build.rs | 1 + agent/proto/agent.proto | 12 ++++- agent/src/server/exec.rs | 98 ++++++++++++++++++++++++++++++++-------- agent/src/server/mod.rs | 52 ++++++++++++++++++--- 5 files changed, 136 insertions(+), 29 deletions(-) diff --git a/agent/Cargo.toml b/agent/Cargo.toml index c5f4a99..6220e24 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -8,7 +8,7 @@ anyhow = "1.0.71" prost = "0.11.9" sysinfo = { version = "0.29.2", default-features = false } tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros", "io-util", "process"] } -tokio-stream = "0.1.14" +tokio-stream = { version = "0.1.14", features = ["net"] } tonic = "0.9.2" [build-dependencies] diff --git a/agent/build.rs b/agent/build.rs index 0dac5bc..5edc05f 100644 --- a/agent/build.rs +++ b/agent/build.rs @@ -1,5 +1,6 @@ fn main() { tonic_build::configure() + .build_client(true) .compile(&["proto/agent.proto"], &["proto"]) .unwrap(); } diff --git a/agent/proto/agent.proto b/agent/proto/agent.proto index 5474ebe..c935b0e 100644 --- a/agent/proto/agent.proto +++ b/agent/proto/agent.proto @@ -42,8 +42,16 @@ message ExecRequest { } message ExecResponse { - string stdout = 1; - string stderr = 2; + message Output { + string stdout = 1; + string stderr = 2; + } + + oneof response { + Output output = 1; + int32 exit_code = 2; + string error = 3; + } } service Agent { diff --git a/agent/src/server/exec.rs b/agent/src/server/exec.rs index 0d6d313..1159b1c 100644 --- a/agent/src/server/exec.rs +++ b/agent/src/server/exec.rs @@ -1,4 +1,7 @@ -use std::{ffi::OsStr, process::Stdio}; +use std::{ + ffi::OsStr, + process::{ExitStatus, Stdio}, +}; use tokio::{ io::{AsyncBufReadExt, BufReader}, @@ -6,15 +9,22 @@ use tokio::{ sync::mpsc, }; +use super::rpc; + #[derive(Debug)] -pub(super) struct ExecOutput { - pub(super) stdout: Option, - pub(super) stderr: Option, +pub(super) enum ExecOutput { + Output { + stdout: Option, + stderr: Option, + }, + Exit(ExitStatus), + Error(String), } -pub(super) fn exec(program: &S, args: &[S]) -> anyhow::Result> +pub(super) fn exec(program: P, args: &[A]) -> anyhow::Result> where - S: AsRef, + P: AsRef, + A: AsRef, { let (tx, rx) = mpsc::channel(4); @@ -24,12 +34,12 @@ where .stderr(Stdio::piped()) .spawn()?; - tokio::spawn(async move { run_command(command, tx).await }); - + let fut = run_process(command, tx); + tokio::spawn(fut); Ok(rx) } -async fn run_command(mut command: Child, tx: mpsc::Sender) { +async fn run_process(mut command: Child, sender: mpsc::Sender) { let mut stdout = { let stdout = command.stdout.take().expect("bug: no pipe for stdout"); BufReader::new(stdout).lines() @@ -42,20 +52,68 @@ async fn run_command(mut command: Child, tx: mpsc::Sender) { loop { match (stdout.next_line().await, stderr.next_line().await) { - // TODO: Handle errors - (Err(_err), _) | (_, Err(_err)) => break, - (stdout, stderr) => tx - .send(ExecOutput { - stdout: stdout.unwrap(), - stderr: stderr.unwrap(), - }) + (Ok(None), Ok(None)) => break, + (Ok(stdout), Ok(stderr)) => sender + .send(ExecOutput::Output { stdout, stderr }) .await - .expect("bug: channel closed"), + .expect("stream closed"), + (Err(err), _) | (_, Err(err)) => sender + .send(ExecOutput::Error(err.to_string())) + .await + .expect("stream closed"), } } match command.wait().await { - Ok(exit_status) => println!("exit: {}", exit_status), - Err(err) => panic!("errorrrrr {}", err), - }; + Ok(exit_status) => sender + .send(ExecOutput::Exit(exit_status)) + .await + .expect("stream closed"), + Err(err) => sender + .send(ExecOutput::Error(err.to_string())) + .await + .expect("stream closed"), + } +} + +impl From for rpc::exec_response::Response { + fn from(value: ExecOutput) -> Self { + match value { + ExecOutput::Output { stdout, stderr } => Self::Output(rpc::exec_response::Output { + stdout: stdout.unwrap_or_default(), + stderr: stderr.unwrap_or_default(), + }), + ExecOutput::Exit(code) => Self::ExitCode(code.code().unwrap_or_default()), + ExecOutput::Error(err) => Self::Error(err), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn exec_works() { + let mut rx = exec(&"echo", &["1\n2\n3"]).expect("to spawn command"); + + let mut outputs = vec![]; + while let Some(output) = rx.recv().await { + outputs.push(output); + } + + assert_eq!(outputs.len(), 4); + + let ExecOutput::Output { ref stdout, ref stderr } = outputs[0] else { panic!() }; + assert_eq!(*stdout, Some("1".to_owned())); + assert_eq!(*stderr, None); + + let ExecOutput::Output { ref stdout, ref stderr } = &outputs[1] else { panic!() }; + assert_eq!(*stdout, Some("2".to_owned())); + assert_eq!(*stderr, None); + + let ExecOutput::Output { ref stdout, ref stderr } = &outputs[2] else { panic!() }; + assert_eq!(*stdout, Some("3".to_owned())); + assert_eq!(*stderr, None); + } } diff --git a/agent/src/server/mod.rs b/agent/src/server/mod.rs index 6ffd939..7dd54f7 100644 --- a/agent/src/server/mod.rs +++ b/agent/src/server/mod.rs @@ -1,7 +1,5 @@ mod exec; -mod rpc { - tonic::include_proto!("prymn"); -} +mod rpc; use std::{ pin::Pin, @@ -67,14 +65,15 @@ impl rpc::agent_server::Agent for Server { type ExecStream = Pin> + Send + Sync>>; async fn exec(&self, req: Request) -> Result> { + use exec::*; + let rpc::ExecRequest { program, args } = req.into_inner(); - match exec::exec(&program, &args) { + match exec(&program, &args) { Ok(receiver) => { let stream = ReceiverStream::new(receiver).map(|inner| { Ok(rpc::ExecResponse { - stdout: inner.stdout.unwrap_or_default(), - stderr: inner.stderr.unwrap_or_default(), + response: Some(inner.into()), }) }); @@ -92,3 +91,44 @@ pub fn new_server() -> Router { tonic::transport::Server::builder().add_service(rpc::agent_server::AgentServer::new(server)) } + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + use tokio::net::TcpListener; + use tokio_stream::wrappers::TcpListenerStream; + + use super::*; + + async fn spawn_server() -> SocketAddr { + let listener = TcpListener::bind("[::]:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + + let listener = TcpListenerStream::new(listener); + + tokio::spawn(async move { new_server().serve_with_incoming(listener).await.unwrap() }); + + address + } + + #[tokio::test] + async fn echo_works() { + let addr = spawn_server().await; + + let mut client = + rpc::agent_client::AgentClient::connect(format!("http://[::]:{}", addr.port())) + .await + .unwrap(); + + let message = "Hello!".to_owned(); + let response = client + .echo(rpc::EchoRequest { + message: message.clone(), + }) + .await + .expect("to respond"); + + assert_eq!(rpc::EchoResponse { message }, response.into_inner()); + } +}