Add agent

This commit is contained in:
Nikos Papadakis 2023-06-13 16:02:49 +03:00
parent ffbd1d6f75
commit 1056446778
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
9 changed files with 1299 additions and 0 deletions

1
agent/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target/

1059
agent/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

15
agent/Cargo.toml Normal file
View file

@ -0,0 +1,15 @@
[package]
name = "prymn_agent"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.71"
prost = "0.11.9"
sysinfo = { version = "0.29.2", default-features = false }
tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros", "io-util", "process"] }
tokio-stream = "0.1.14"
tonic = "0.9.2"
[build-dependencies]
tonic-build = "0.9.2"

5
agent/build.rs Normal file
View file

@ -0,0 +1,5 @@
fn main() {
tonic_build::configure()
.compile(&["proto/agent.proto"], &["proto"])
.unwrap();
}

53
agent/proto/agent.proto Normal file
View file

@ -0,0 +1,53 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package prymn;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
message SysInfoResponse {
message Cpu {
uint64 freq_mhz = 1;
float usage = 2;
}
message Disk {
string name = 1;
uint64 total_bytes = 2;
uint64 avail_bytes = 3;
string mount_point = 4;
}
uint64 uptime = 1;
string hostname = 2;
string os = 3;
uint64 mem_total_bytes = 4;
uint64 mem_avail_bytes = 5;
uint64 swap_total_bytes = 6;
uint64 swap_free_bytes = 7;
repeated Cpu cpus = 8;
repeated Disk disks = 9;
}
message ExecRequest {
string program = 1;
repeated string args = 2;
}
message ExecResponse {
string stdout = 1;
string stderr = 2;
}
service Agent {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc GetSysInfo(google.protobuf.Empty) returns (SysInfoResponse);
rpc Exec(ExecRequest) returns (stream ExecResponse);
}

View file

@ -0,0 +1,8 @@
use prymn_agent::new_server;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
new_server().serve("0.0.0.0:5043".parse()?).await?;
Ok(())
}

3
agent/src/lib.rs Normal file
View file

@ -0,0 +1,3 @@
mod server;
pub use server::new_server;

61
agent/src/server/exec.rs Normal file
View file

@ -0,0 +1,61 @@
use std::{ffi::OsStr, process::Stdio};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
sync::mpsc,
};
#[derive(Debug)]
pub(super) struct ExecOutput {
pub(super) stdout: Option<String>,
pub(super) stderr: Option<String>,
}
pub(super) fn exec<S>(program: &S, args: &[S]) -> anyhow::Result<mpsc::Receiver<ExecOutput>>
where
S: AsRef<OsStr>,
{
let (tx, rx) = mpsc::channel(4);
let command = Command::new(program)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
tokio::spawn(async move { run_command(command, tx).await });
Ok(rx)
}
async fn run_command(mut command: Child, tx: mpsc::Sender<ExecOutput>) {
let mut stdout = {
let stdout = command.stdout.take().expect("bug: no pipe for stdout");
BufReader::new(stdout).lines()
};
let mut stderr = {
let stderr = command.stderr.take().expect("bug: no pipe for stderr");
BufReader::new(stderr).lines()
};
loop {
match (stdout.next_line().await, stderr.next_line().await) {
// TODO: Handle errors
(Err(_err), _) | (_, Err(_err)) => break,
(stdout, stderr) => tx
.send(ExecOutput {
stdout: stdout.unwrap(),
stderr: stderr.unwrap(),
})
.await
.expect("bug: channel closed"),
}
}
match command.wait().await {
Ok(exit_status) => println!("exit: {}", exit_status),
Err(err) => panic!("errorrrrr {}", err),
};
}

94
agent/src/server/mod.rs Normal file
View file

@ -0,0 +1,94 @@
mod exec;
mod rpc {
tonic::include_proto!("prymn");
}
use std::{
pin::Pin,
sync::{Arc, Mutex},
};
use sysinfo::{CpuExt, DiskExt, System, SystemExt};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{transport::server::Router, Request, Response, Status};
pub struct Server {
sys: Arc<Mutex<System>>,
}
type Result<T> = std::result::Result<T, tonic::Status>;
#[tonic::async_trait]
impl rpc::agent_server::Agent for Server {
async fn echo(&self, req: Request<rpc::EchoRequest>) -> Result<Response<rpc::EchoResponse>> {
Ok(Response::new(rpc::EchoResponse {
message: req.into_inner().message,
}))
}
async fn get_sys_info(&self, _: Request<()>) -> Result<Response<rpc::SysInfoResponse>> {
let sys = self.sys.lock().unwrap();
let cpus = sys
.cpus()
.iter()
.map(|cpu| rpc::sys_info_response::Cpu {
freq_mhz: cpu.frequency(),
usage: cpu.cpu_usage(),
})
.collect();
let disks = sys
.disks()
.iter()
.map(|disk| rpc::sys_info_response::Disk {
name: disk.name().to_string_lossy().into_owned(),
total_bytes: disk.total_space(),
avail_bytes: disk.available_space(),
mount_point: disk.mount_point().to_string_lossy().into_owned(),
})
.collect();
let response = Response::new(rpc::SysInfoResponse {
uptime: sys.uptime(),
hostname: sys.host_name().unwrap_or_default(),
os: sys.long_os_version().unwrap_or_default(),
mem_total_bytes: sys.total_memory(),
mem_avail_bytes: sys.available_memory(),
swap_total_bytes: sys.total_swap(),
swap_free_bytes: sys.free_swap(),
cpus,
disks,
});
Ok(response)
}
type ExecStream = Pin<Box<dyn Stream<Item = Result<rpc::ExecResponse>> + Send + Sync>>;
async fn exec(&self, req: Request<rpc::ExecRequest>) -> Result<Response<Self::ExecStream>> {
let rpc::ExecRequest { program, args } = req.into_inner();
match exec::exec(&program, &args) {
Ok(receiver) => {
let stream = ReceiverStream::new(receiver).map(|inner| {
Ok(rpc::ExecResponse {
stdout: inner.stdout.unwrap_or_default(),
stderr: inner.stderr.unwrap_or_default(),
})
});
Ok(Response::new(Box::pin(stream)))
}
Err(err) => Err(Status::failed_precondition(err.to_string())),
}
}
}
pub fn new_server() -> Router {
let server = Server {
sys: Arc::new(Mutex::new(System::new_all())),
};
tonic::transport::Server::builder().add_service(rpc::agent_server::AgentServer::new(server))
}