Improve exec module

This commit is contained in:
Nikos Papadakis 2023-06-15 13:55:24 +03:00
parent 1e6ef4ef45
commit 01a0b038f7
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
5 changed files with 136 additions and 29 deletions

View file

@ -8,7 +8,7 @@ anyhow = "1.0.71"
prost = "0.11.9" prost = "0.11.9"
sysinfo = { version = "0.29.2", default-features = false } sysinfo = { version = "0.29.2", default-features = false }
tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros", "io-util", "process"] } tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros", "io-util", "process"] }
tokio-stream = "0.1.14" tokio-stream = { version = "0.1.14", features = ["net"] }
tonic = "0.9.2" tonic = "0.9.2"
[build-dependencies] [build-dependencies]

View file

@ -1,5 +1,6 @@
fn main() { fn main() {
tonic_build::configure() tonic_build::configure()
.build_client(true)
.compile(&["proto/agent.proto"], &["proto"]) .compile(&["proto/agent.proto"], &["proto"])
.unwrap(); .unwrap();
} }

View file

@ -42,8 +42,16 @@ message ExecRequest {
} }
message ExecResponse { message ExecResponse {
message Output {
string stdout = 1; string stdout = 1;
string stderr = 2; string stderr = 2;
}
oneof response {
Output output = 1;
int32 exit_code = 2;
string error = 3;
}
} }
service Agent { service Agent {

View file

@ -1,4 +1,7 @@
use std::{ffi::OsStr, process::Stdio}; use std::{
ffi::OsStr,
process::{ExitStatus, Stdio},
};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, BufReader}, io::{AsyncBufReadExt, BufReader},
@ -6,15 +9,22 @@ use tokio::{
sync::mpsc, sync::mpsc,
}; };
use super::rpc;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct ExecOutput { pub(super) enum ExecOutput {
pub(super) stdout: Option<String>, Output {
pub(super) stderr: Option<String>, stdout: Option<String>,
stderr: Option<String>,
},
Exit(ExitStatus),
Error(String),
} }
pub(super) fn exec<S>(program: &S, args: &[S]) -> anyhow::Result<mpsc::Receiver<ExecOutput>> pub(super) fn exec<P, A>(program: P, args: &[A]) -> anyhow::Result<mpsc::Receiver<ExecOutput>>
where where
S: AsRef<OsStr>, P: AsRef<OsStr>,
A: AsRef<OsStr>,
{ {
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
@ -24,12 +34,12 @@ where
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.spawn()?; .spawn()?;
tokio::spawn(async move { run_command(command, tx).await }); let fut = run_process(command, tx);
tokio::spawn(fut);
Ok(rx) Ok(rx)
} }
async fn run_command(mut command: Child, tx: mpsc::Sender<ExecOutput>) { async fn run_process(mut command: Child, sender: mpsc::Sender<ExecOutput>) {
let mut stdout = { let mut stdout = {
let stdout = command.stdout.take().expect("bug: no pipe for stdout"); let stdout = command.stdout.take().expect("bug: no pipe for stdout");
BufReader::new(stdout).lines() BufReader::new(stdout).lines()
@ -42,20 +52,68 @@ async fn run_command(mut command: Child, tx: mpsc::Sender<ExecOutput>) {
loop { loop {
match (stdout.next_line().await, stderr.next_line().await) { match (stdout.next_line().await, stderr.next_line().await) {
// TODO: Handle errors (Ok(None), Ok(None)) => break,
(Err(_err), _) | (_, Err(_err)) => break, (Ok(stdout), Ok(stderr)) => sender
(stdout, stderr) => tx .send(ExecOutput::Output { stdout, stderr })
.send(ExecOutput {
stdout: stdout.unwrap(),
stderr: stderr.unwrap(),
})
.await .await
.expect("bug: channel closed"), .expect("stream closed"),
(Err(err), _) | (_, Err(err)) => sender
.send(ExecOutput::Error(err.to_string()))
.await
.expect("stream closed"),
} }
} }
match command.wait().await { match command.wait().await {
Ok(exit_status) => println!("exit: {}", exit_status), Ok(exit_status) => sender
Err(err) => panic!("errorrrrr {}", err), .send(ExecOutput::Exit(exit_status))
}; .await
.expect("stream closed"),
Err(err) => sender
.send(ExecOutput::Error(err.to_string()))
.await
.expect("stream closed"),
}
}
impl From<ExecOutput> for rpc::exec_response::Response {
fn from(value: ExecOutput) -> Self {
match value {
ExecOutput::Output { stdout, stderr } => Self::Output(rpc::exec_response::Output {
stdout: stdout.unwrap_or_default(),
stderr: stderr.unwrap_or_default(),
}),
ExecOutput::Exit(code) => Self::ExitCode(code.code().unwrap_or_default()),
ExecOutput::Error(err) => Self::Error(err),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn exec_works() {
let mut rx = exec(&"echo", &["1\n2\n3"]).expect("to spawn command");
let mut outputs = vec![];
while let Some(output) = rx.recv().await {
outputs.push(output);
}
assert_eq!(outputs.len(), 4);
let ExecOutput::Output { ref stdout, ref stderr } = outputs[0] else { panic!() };
assert_eq!(*stdout, Some("1".to_owned()));
assert_eq!(*stderr, None);
let ExecOutput::Output { ref stdout, ref stderr } = &outputs[1] else { panic!() };
assert_eq!(*stdout, Some("2".to_owned()));
assert_eq!(*stderr, None);
let ExecOutput::Output { ref stdout, ref stderr } = &outputs[2] else { panic!() };
assert_eq!(*stdout, Some("3".to_owned()));
assert_eq!(*stderr, None);
}
} }

View file

@ -1,7 +1,5 @@
mod exec; mod exec;
mod rpc { mod rpc;
tonic::include_proto!("prymn");
}
use std::{ use std::{
pin::Pin, pin::Pin,
@ -67,14 +65,15 @@ impl rpc::agent_server::Agent for Server {
type ExecStream = Pin<Box<dyn Stream<Item = Result<rpc::ExecResponse>> + Send + Sync>>; type ExecStream = Pin<Box<dyn Stream<Item = Result<rpc::ExecResponse>> + Send + Sync>>;
async fn exec(&self, req: Request<rpc::ExecRequest>) -> Result<Response<Self::ExecStream>> { async fn exec(&self, req: Request<rpc::ExecRequest>) -> Result<Response<Self::ExecStream>> {
use exec::*;
let rpc::ExecRequest { program, args } = req.into_inner(); let rpc::ExecRequest { program, args } = req.into_inner();
match exec::exec(&program, &args) { match exec(&program, &args) {
Ok(receiver) => { Ok(receiver) => {
let stream = ReceiverStream::new(receiver).map(|inner| { let stream = ReceiverStream::new(receiver).map(|inner| {
Ok(rpc::ExecResponse { Ok(rpc::ExecResponse {
stdout: inner.stdout.unwrap_or_default(), response: Some(inner.into()),
stderr: inner.stderr.unwrap_or_default(),
}) })
}); });
@ -92,3 +91,44 @@ pub fn new_server() -> Router {
tonic::transport::Server::builder().add_service(rpc::agent_server::AgentServer::new(server)) tonic::transport::Server::builder().add_service(rpc::agent_server::AgentServer::new(server))
} }
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use super::*;
async fn spawn_server() -> SocketAddr {
let listener = TcpListener::bind("[::]:0").await.unwrap();
let address = listener.local_addr().unwrap();
let listener = TcpListenerStream::new(listener);
tokio::spawn(async move { new_server().serve_with_incoming(listener).await.unwrap() });
address
}
#[tokio::test]
async fn echo_works() {
let addr = spawn_server().await;
let mut client =
rpc::agent_client::AgentClient::connect(format!("http://[::]:{}", addr.port()))
.await
.unwrap();
let message = "Hello!".to_owned();
let response = client
.echo(rpc::EchoRequest {
message: message.clone(),
})
.await
.expect("to respond");
assert_eq!(rpc::EchoResponse { message }, response.into_inner());
}
}