Feature: Agent Tasks (#8)

Reviewed-on: https://git.nikos.gg/prymn/prymn/pulls/8
Co-authored-by: Nikos Papadakis <nikos@papadakis.xyz>
Co-committed-by: Nikos Papadakis <nikos@papadakis.xyz>
This commit is contained in:
Nikos Papadakis 2023-11-14 15:23:50 +00:00 committed by nikos
parent 59c8c6ee23
commit 5c64f02579
22 changed files with 1234 additions and 748 deletions

View file

@ -2,5 +2,5 @@
import_deps: [:ecto, :ecto_sql, :phoenix], import_deps: [:ecto, :ecto_sql, :phoenix],
subdirectories: ["priv/*/migrations"], subdirectories: ["priv/*/migrations"],
plugins: [Phoenix.LiveView.HTMLFormatter, TailwindFormatter], plugins: [Phoenix.LiveView.HTMLFormatter, TailwindFormatter],
inputs: ["*.{heex,ex,exs}", "{config,lib,test}/**/*.{heex,ex,exs}", "priv/*/seeds.exs"] inputs: ["*.{heex,ex,exs}", "app/{config,lib,test}/**/*.{heex,ex,exs}", "priv/*/seeds.exs"]
] ]

433
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,22 +6,24 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.71" anyhow = "1.0.71"
chrono = "0.4.26" chrono = "0.4.26"
clap = { version = "4.3.9" } clap = "4.3.9"
envy = "0.4.2" envy = "0.4.2"
itertools = "0.11.0" itertools = "0.11.0"
nix = "0.27.1" nix = "0.27.1"
once_cell = "1.18.0" once_cell = "1.18.0"
prost = "0.11.9" prost = "0.12.1"
regex = "1.10.2"
reqwest = { version = "0.11.18", features = ["blocking", "json"], default-features = false } reqwest = { version = "0.11.18", features = ["blocking", "json"], default-features = false }
serde = { version = "1.0.173", features = ["derive"] } serde = { version = "1.0.173", features = ["derive"] }
serde_json = "1.0.103" serde_json = "1.0.103"
sysinfo = { version = "0.29.2", default-features = false } sysinfo = { version = "0.29.2", default-features = false }
tokio = { version = "1.28.2", features = ["rt-multi-thread", "io-util", "process", "macros", "signal"] } tokio = { version = "1.28.2", features = ["rt-multi-thread", "io-util", "process", "macros", "signal"] }
tokio-stream = { version = "0.1.14", features = ["net", "sync"] } tokio-stream = { version = "0.1.14", features = ["net", "sync"] }
tonic = { version = "0.9.2" } tokio-util = { version = "0.7.10", features = ["codec"] }
tonic = { version = "0.10.2" }
tower-http = { version = "0.4.3", features = ["trace"] } tower-http = { version = "0.4.3", features = ["trace"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"
[build-dependencies] [build-dependencies]
tonic-build = "0.9.2" tonic-build = "0.10.2"

186
agent/src/debian.rs Normal file
View file

@ -0,0 +1,186 @@
use std::process::{Command, Output};
use regex::Regex;
pub fn update_package_index() -> std::io::Result<Output> {
Command::new("apt-get").arg("-y").arg("update").output()
}
pub fn run_updates(dry_run: bool) -> std::io::Result<Output> {
let mut command = Command::new("apt-get");
if dry_run {
command.arg("-s");
}
command.arg("-y").arg("upgrade").output()
}
pub fn install_packages(packages: &[&str]) -> std::io::Result<Output> {
Command::new("apt-get")
.arg("install")
.arg("-y")
.args(packages)
.output()
}
pub fn get_available_updates() -> std::io::Result<Vec<String>> {
let output = Command::new("apt-get").arg("-sV").arg("upgrade").output()?;
let upgradables = parse_upgrade_output(&String::from_utf8_lossy(&output.stdout));
Ok(upgradables)
}
fn parse_upgrade_output(output: &str) -> Vec<String> {
output
.split_once("The following packages will be upgraded:\n")
.and_then(|(_, rest)| {
// Find the first line with non-whitespace characters (indicating the end of the list)
let re = Regex::new(r"(?m)^\S").unwrap();
re.find(rest).map(|m| rest.split_at(m.start()).0)
})
.map_or_else(Vec::new, |text| {
let lines = text.lines();
lines.map(|line| line.trim().to_owned()).collect()
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_upgrade_output_correctly() {
// `apt-get -sV upgrade`
let test_output = r"
NOTE: This is only a simulation!
apt-get needs root privileges for real execution.
Keep also in mind that locking is deactivated,
so don't depend on the relevance to the real current situation!
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
Calculating upgrade... Done
The following packages have been kept back:
linux-image-amd64 (5.10.191-1 => 5.10.197-1)
The following packages will be upgraded:
adduser (3.118 => 3.118+deb11u1)
base-files (11.1+deb11u7 => 11.1+deb11u8)
cpio (2.13+dfsg-4 => 2.13+dfsg-7.1~deb11u1)
dbus (1.12.24-0+deb11u1 => 1.12.28-0+deb11u1)
distro-info-data (0.51+deb11u3 => 0.51+deb11u4)
dpkg (1.20.12 => 1.20.13)
grub-common (2.06-3~deb11u5 => 2.06-3~deb11u6)
grub-pc (2.06-3~deb11u5 => 2.06-3~deb11u6)
grub-pc-bin (2.06-3~deb11u5 => 2.06-3~deb11u6)
grub2-common (2.06-3~deb11u5 => 2.06-3~deb11u6)
krb5-locales (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4)
libbsd0 (0.11.3-1 => 0.11.3-1+deb11u1)
libcurl3-gnutls (7.74.0-1.3+deb11u7 => 7.74.0-1.3+deb11u10)
libdbus-1-3 (1.12.24-0+deb11u1 => 1.12.28-0+deb11u1)
libgssapi-krb5-2 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4)
libk5crypto3 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4)
libkrb5-3 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4)
libkrb5support0 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4)
libncurses6 (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2)
libncursesw6 (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2)
libnss-systemd (247.3-7+deb11u2 => 247.3-7+deb11u4)
libpam-systemd (247.3-7+deb11u2 => 247.3-7+deb11u4)
libssl1.1 (1.1.1n-0+deb11u5 => 1.1.1w-0+deb11u1)
libsystemd0 (247.3-7+deb11u2 => 247.3-7+deb11u4)
libtinfo6 (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2)
libudev1 (247.3-7+deb11u2 => 247.3-7+deb11u4)
logrotate (3.18.0-2+deb11u1 => 3.18.0-2+deb11u2)
ncurses-base (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2)
ncurses-bin (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2)
ncurses-term (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2)
openssh-client (1:8.4p1-5+deb11u1 => 1:8.4p1-5+deb11u2)
openssh-server (1:8.4p1-5+deb11u1 => 1:8.4p1-5+deb11u2)
openssh-sftp-server (1:8.4p1-5+deb11u1 => 1:8.4p1-5+deb11u2)
openssl (1.1.1n-0+deb11u5 => 1.1.1w-0+deb11u1)
qemu-utils (1:5.2+dfsg-11+deb11u2 => 1:5.2+dfsg-11+deb11u3)
systemd (247.3-7+deb11u2 => 247.3-7+deb11u4)
systemd-sysv (247.3-7+deb11u2 => 247.3-7+deb11u4)
udev (247.3-7+deb11u2 => 247.3-7+deb11u4)
38 upgraded, 0 newly installed, 0 to remove and 1 not upgraded.
Inst base-files [11.1+deb11u7] (11.1+deb11u8 Debian:11.8/oldstable [amd64])
Conf base-files (11.1+deb11u8 Debian:11.8/oldstable [amd64])
Inst dpkg [1.20.12] (1.20.13 Debian:11.8/oldstable [amd64])
Conf dpkg (1.20.13 Debian:11.8/oldstable [amd64])
Inst ncurses-bin [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64])
Conf ncurses-bin (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64])
Inst ncurses-base [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all])
Conf ncurses-base (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all])
Inst libnss-systemd [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) []
Inst libsystemd0 [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [systemd:amd64 ]
Conf libsystemd0 (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [systemd:amd64 ]
Inst libpam-systemd [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [systemd:amd64 ]
Inst systemd [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Inst udev [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) []
Inst libudev1 [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Conf libudev1 (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Inst adduser [3.118] (3.118+deb11u1 Debian:11.8/oldstable [all])
Conf adduser (3.118+deb11u1 Debian:11.8/oldstable [all])
Conf systemd (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Inst systemd-sysv [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Inst dbus [1.12.24-0+deb11u1] (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64]) []
Inst libdbus-1-3 [1.12.24-0+deb11u1] (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64])
Inst libk5crypto3 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64])
Conf libk5crypto3 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64])
Inst libkrb5support0 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libkrb5-3:amd64 ]
Conf libkrb5support0 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libkrb5-3:amd64 ]
Inst libkrb5-3 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libgssapi-krb5-2:amd64 ]
Conf libkrb5-3 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libgssapi-krb5-2:amd64 ]
Inst libgssapi-krb5-2 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64])
Conf libgssapi-krb5-2 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64])
Inst libssl1.1 [1.1.1n-0+deb11u5] (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64])
Conf libssl1.1 (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64])
Inst libncurses6 [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) []
Inst libncursesw6 [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) []
Inst libtinfo6 [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64])
Conf libtinfo6 (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64])
Inst cpio [2.13+dfsg-4] (2.13+dfsg-7.1~deb11u1 Debian:11.8/oldstable [amd64])
Inst logrotate [3.18.0-2+deb11u1] (3.18.0-2+deb11u2 Debian:11.8/oldstable [amd64])
Inst krb5-locales [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [all])
Inst ncurses-term [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all])
Inst openssh-sftp-server [1:8.4p1-5+deb11u1] (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) []
Inst openssh-server [1:8.4p1-5+deb11u1] (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) []
Inst openssh-client [1:8.4p1-5+deb11u1] (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64])
Inst distro-info-data [0.51+deb11u3] (0.51+deb11u4 Debian:11.8/oldstable [all])
Inst grub2-common [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) [grub-pc:amd64 ]
Inst grub-pc [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) []
Inst grub-pc-bin [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) []
Inst grub-common [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64])
Inst libbsd0 [0.11.3-1] (0.11.3-1+deb11u1 Debian:11.8/oldstable [amd64])
Inst libcurl3-gnutls [7.74.0-1.3+deb11u7] (7.74.0-1.3+deb11u10 Debian-Security:11/oldstable-security [amd64])
Inst openssl [1.1.1n-0+deb11u5] (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64])
Inst qemu-utils [1:5.2+dfsg-11+deb11u2] (1:5.2+dfsg-11+deb11u3 Debian:11.8/oldstable [amd64])
Conf libnss-systemd (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Conf libpam-systemd (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Conf udev (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Conf systemd-sysv (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64])
Conf dbus (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64])
Conf libdbus-1-3 (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64])
Conf libncurses6 (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64])
Conf libncursesw6 (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64])
Conf cpio (2.13+dfsg-7.1~deb11u1 Debian:11.8/oldstable [amd64])
Conf logrotate (3.18.0-2+deb11u2 Debian:11.8/oldstable [amd64])
Conf krb5-locales (1.18.3-6+deb11u4 Debian:11.8/oldstable [all])
Conf ncurses-term (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all])
Conf openssh-sftp-server (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64])
Conf openssh-server (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64])
Conf openssh-client (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64])
Conf distro-info-data (0.51+deb11u4 Debian:11.8/oldstable [all])
Conf grub2-common (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64])
Conf grub-pc (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64])
Conf grub-pc-bin (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64])
Conf grub-common (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64])
Conf libbsd0 (0.11.3-1+deb11u1 Debian:11.8/oldstable [amd64])
Conf libcurl3-gnutls (7.74.0-1.3+deb11u10 Debian-Security:11/oldstable-security [amd64])
Conf openssl (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64])
Conf qemu-utils (1:5.2+dfsg-11+deb11u3 Debian:11.8/oldstable [amd64])
";
let upgradables = parse_upgrade_output(test_output);
assert_eq!(upgradables.len(), 38);
}
}

View file

@ -1,4 +1,5 @@
pub mod config; pub mod config;
pub mod debian;
pub mod self_update; pub mod self_update;
pub mod server; pub mod server;
pub mod system; pub mod system;

View file

@ -1,23 +1,28 @@
use std::pin::Pin; use std::{pin::Pin, process::Stdio, sync::Mutex};
use tokio_stream::{ use tokio_stream::{
wrappers::{ReceiverStream, WatchStream}, wrappers::{ReceiverStream, WatchStream},
Stream, StreamExt, Stream, StreamExt,
}; };
use tokio_util::codec::{BytesCodec, FramedRead};
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
use crate::system::health::HealthMonitor; use crate::{
debian,
system::{health::HealthMonitor, info::Info, task::TaskBuilder},
};
use super::proto::*; use super::proto::*;
type AgentResult<T> = std::result::Result<Response<T>, Status>; type AgentResult<T> = std::result::Result<Response<T>, Status>;
pub struct AgentService { pub struct AgentService<'a> {
health: HealthMonitor, pub health: HealthMonitor,
pub info: &'a Mutex<Info>, // TODO: Find a way to remove the Mutex dependency here
} }
#[tonic::async_trait] #[tonic::async_trait]
impl agent_server::Agent for AgentService { impl agent_server::Agent for AgentService<'static> {
type HealthStream = Pin<Box<dyn Stream<Item = Result<HealthResponse, Status>> + Send>>; type HealthStream = Pin<Box<dyn Stream<Item = Result<HealthResponse, Status>> + Send>>;
async fn health(&self, _: Request<()>) -> AgentResult<Self::HealthStream> { async fn health(&self, _: Request<()>) -> AgentResult<Self::HealthStream> {
@ -30,8 +35,8 @@ impl agent_server::Agent for AgentService {
system: Some(health.system().into()), system: Some(health.system().into()),
tasks: health tasks: health
.tasks() .tasks()
.into_iter() .iter()
.map(|(key, val)| (key, TaskHealth::from(val))) .map(|(k, v)| (k.clone(), v.into()))
.collect(), .collect(),
}) })
}); });
@ -40,79 +45,111 @@ impl agent_server::Agent for AgentService {
} }
async fn get_sys_info(&self, _: Request<()>) -> AgentResult<SysInfoResponse> { async fn get_sys_info(&self, _: Request<()>) -> AgentResult<SysInfoResponse> {
use sysinfo::{CpuExt, DiskExt, SystemExt}; Ok(Response::new(SysInfoResponse::from(
&*self.info.lock().unwrap(),
)))
}
let mut sys = crate::system::SYSTEM.lock().unwrap(); type SysUpdateStream = Pin<Box<dyn Stream<Item = Result<SysUpdateResponse, Status>> + Send>>;
sys.refresh_specifics( async fn sys_update(
sysinfo::RefreshKind::new() &self,
// .with_disks() // what is this? req: Request<SysUpdateRequest>,
.with_disks_list() ) -> AgentResult<Self::SysUpdateStream> {
.with_memory() let dry_run = req.get_ref().dry_run;
.with_processes(sysinfo::ProcessRefreshKind::everything())
.with_cpu(sysinfo::CpuRefreshKind::everything()),
);
let cpus = sys let mut receiver =
.cpus() TaskBuilder::new("system update".to_owned()).health_monitor(self.health.clone());
.iter()
.map(|cpu| sys_info_response::Cpu { if dry_run {
freq_mhz: cpu.frequency(), receiver = receiver
usage: cpu.cpu_usage(), .add_step(async { Ok("simulating a system update...".to_owned()) })
.add_step(async {
const DUR: std::time::Duration = std::time::Duration::from_secs(5);
tokio::time::sleep(DUR).await;
Ok("completed running an artifical delay...".to_owned())
});
}
let receiver = receiver
.add_step(async move {
tokio::task::spawn_blocking(move || {
let output = debian::run_updates(dry_run).map_err(|err| {
tracing::error!(%err, "failed to run updates");
err
})?;
let out = if !output.status.success() {
tracing::error!(?output, "child process exited unsuccessfuly");
match output.status.code() {
Some(exit_code) => Err(Status::internal(format!(
"operation exited with error (code {exit_code})"
))),
None => Err(Status::cancelled("operation was cancelled by signal")),
}
} else {
Ok(String::from_utf8_lossy(output.stdout.as_slice()).to_string())
};
// TODO: We could split the output by lines and emit those as "steps" so the
// upgrade process is more interactive
out
}) })
.collect(); .await
.unwrap()
let disks = sys
.disks()
.iter()
.map(|disk| sys_info_response::Disk {
name: disk.name().to_string_lossy().into_owned(),
total_bytes: disk.total_space(),
avail_bytes: disk.available_space(),
mount_point: disk.mount_point().to_string_lossy().into_owned(),
}) })
.collect(); .build()
.into_background();
let response = Response::new(SysInfoResponse { let stream = ReceiverStream::new(receiver).map(|output| {
uptime: sys.uptime(), output
hostname: sys.host_name().unwrap_or_default(), .map(|output| SysUpdateResponse {
os: sys.long_os_version().unwrap_or_default(), output,
mem_total_bytes: sys.total_memory(), progress: 1,
mem_avail_bytes: sys.available_memory(), })
swap_total_bytes: sys.total_swap(), .map_err(|err| Status::internal(err.to_string()))
swap_free_bytes: sys.free_swap(),
cpus,
disks,
}); });
Ok(response) Ok(Response::new(Box::pin(stream)))
} }
type ExecStream = Pin<Box<dyn Stream<Item = Result<ExecResponse, Status>> + Send>>; type ExecStream = Pin<Box<dyn Stream<Item = Result<ExecResponse, Status>> + Send>>;
async fn exec(&self, req: Request<ExecRequest>) -> AgentResult<Self::ExecStream> { async fn exec(&self, req: Request<ExecRequest>) -> AgentResult<Self::ExecStream> {
use crate::system::exec::*; use exec_response::Out;
let ExecRequest { program, args } = req.get_ref(); let ExecRequest { program, args } = req.get_ref();
match exec(program, args) { let mut command = tokio::process::Command::new(program)
Ok(receiver) => { .args(args)
let stream = ReceiverStream::new(receiver).map(|_inner| { .stdout(Stdio::piped())
Ok(ExecResponse { .stderr(Stdio::piped())
// TODO .spawn()?;
response: None,
}) let stdout =
FramedRead::new(command.stdout.take().unwrap(), BytesCodec::new()).map(|stdout| {
let stdout = stdout.unwrap();
Out::Stdout(String::from_utf8_lossy(&stdout[..]).to_string())
}); });
let stderr =
FramedRead::new(command.stderr.take().unwrap(), BytesCodec::new()).map(|stderr| {
let stderr = stderr.unwrap();
Out::Stderr(String::from_utf8_lossy(&stderr[..]).to_string())
});
let exit = TaskBuilder::new(format!("exec {program}"))
.health_monitor(self.health.clone())
.add_step(async move { command.wait().await.unwrap() })
.build()
.into_stream();
let stream = stdout
.merge(stderr)
.chain(exit.map(|code| Out::ExitCode(code.code().unwrap_or_default())))
.map(|out| Ok(ExecResponse { out: Some(out) }));
Ok(Response::new(Box::pin(stream))) Ok(Response::new(Box::pin(stream)))
} }
Err(err) => Err(Status::failed_precondition(err.to_string())),
}
}
}
pub fn server(health_monitor: HealthMonitor) -> agent_server::AgentServer<AgentService> {
agent_server::AgentServer::new(AgentService {
health: health_monitor,
})
} }

View file

@ -1,16 +1,19 @@
use std::time::Duration; use std::time::Duration;
use tokio::{signal, sync::oneshot, time::sleep}; use tokio::{signal, sync::oneshot};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use crate::system::health::HealthMonitor; use crate::{
server::{agent::AgentService, proto::agent_server},
system::{self, health::HealthMonitor},
};
mod agent; mod agent;
mod proto { mod proto {
tonic::include_proto!("prymn"); tonic::include_proto!("prymn");
impl From<crate::system::health::SystemHealth> for SystemHealth { impl From<&crate::system::health::SystemHealth> for SystemHealth {
fn from(val: crate::system::health::SystemHealth) -> Self { fn from(val: &crate::system::health::SystemHealth) -> Self {
if let crate::system::health::SystemStatus::Critical(ref reasons) = val.status { if let crate::system::health::SystemStatus::Critical(ref reasons) = val.status {
SystemHealth { SystemHealth {
status: itertools::join(reasons.iter().map(ToString::to_string), ","), status: itertools::join(reasons.iter().map(ToString::to_string), ","),
@ -23,13 +26,52 @@ mod proto {
} }
} }
impl From<crate::system::health::TaskHealth> for TaskHealth { impl From<&crate::system::task::TaskStatus> for TaskHealth {
fn from(val: crate::system::health::TaskHealth) -> Self { fn from(value: &crate::system::task::TaskStatus) -> Self {
TaskHealth { Self {
status: val.status().to_string(), started_on: value.started_on().to_string(),
message: val.message().to_owned(), progress: value.progress(),
started_on: val.started_on().to_string(), }
progress: val.progress() as i32, }
}
impl From<&crate::system::info::Info> for SysInfoResponse {
fn from(info: &crate::system::info::Info) -> Self {
use sysinfo::{CpuExt, DiskExt, SystemExt};
let system = info.system();
let cpus = system
.cpus()
.iter()
.map(|cpu| sys_info_response::Cpu {
freq_mhz: cpu.frequency(),
usage: cpu.cpu_usage(),
})
.collect();
let disks = system
.disks()
.iter()
.map(|disk| sys_info_response::Disk {
name: disk.name().to_string_lossy().into_owned(),
total_bytes: disk.total_space(),
avail_bytes: disk.available_space(),
mount_point: disk.mount_point().to_string_lossy().into_owned(),
})
.collect();
Self {
uptime: system.uptime(),
hostname: system.host_name().unwrap_or_default(),
os: system.long_os_version().unwrap_or_default(),
mem_total_bytes: system.total_memory(),
mem_avail_bytes: system.available_memory(),
swap_total_bytes: system.total_swap(),
swap_free_bytes: system.free_swap(),
updates_available: info.updates().len() as u32,
cpus,
disks,
} }
} }
} }
@ -49,15 +91,27 @@ pub async fn run() -> anyhow::Result<()> {
let _ = shutdown_tx.send(()); let _ = shutdown_tx.send(());
}); });
let info = system::info::spawn_info_subsystem();
let health_monitor = HealthMonitor::new(); let health_monitor = HealthMonitor::new();
let agent_service = agent::server(health_monitor.clone());
// Monitor system info forever
// TODO: Maybe we can move it inside the server response function?
// We could spawn a new loop whenever we need it, but the problem is when does it get
// destroyed?
{
let health_monitor = health_monitor.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
health_monitor.check_system().await; health_monitor.check_system_info(&info.lock().unwrap());
sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
}); });
}
let agent_service = agent_server::AgentServer::new(AgentService {
health: health_monitor.clone(),
info,
});
let addr = "[::]:50012".parse()?; let addr = "[::]:50012".parse()?;
tracing::info!("listening on {}", addr); tracing::info!("listening on {}", addr);

View file

@ -1,122 +0,0 @@
use std::{
ffi::OsStr,
process::{ExitStatus, Stdio},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
sync::mpsc,
};
#[derive(Debug)]
pub enum ExecOutput {
Output {
stdout: Option<String>,
stderr: Option<String>,
},
Exit(ExitStatus),
Error(String),
}
pub fn exec<P, A>(program: P, args: &[A]) -> anyhow::Result<mpsc::Receiver<ExecOutput>>
where
P: AsRef<OsStr>,
A: AsRef<OsStr>,
{
let (tx, rx) = mpsc::channel(4);
let command = Command::new(program)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let fut = run_process(command, tx);
tokio::spawn(fut);
Ok(rx)
}
async fn run_process(mut command: Child, sender: mpsc::Sender<ExecOutput>) {
let mut stdout = {
let stdout = command.stdout.take().expect("bug: no pipe for stdout");
BufReader::new(stdout).lines()
};
let mut stderr = {
let stderr = command.stderr.take().expect("bug: no pipe for stderr");
BufReader::new(stderr).lines()
};
loop {
match (stdout.next_line().await, stderr.next_line().await) {
(Ok(None), Ok(None)) => break,
(Ok(stdout), Ok(stderr)) => sender
.send(ExecOutput::Output { stdout, stderr })
.await
.expect("stream closed"),
(Err(err), _) | (_, Err(err)) => sender
.send(ExecOutput::Error(err.to_string()))
.await
.expect("stream closed"),
}
}
match command.wait().await {
Ok(exit_status) => sender
.send(ExecOutput::Exit(exit_status))
.await
.expect("stream closed"),
Err(err) => sender
.send(ExecOutput::Error(err.to_string()))
.await
.expect("stream closed"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn exec_works() {
let mut rx = exec("echo", &["1\n2\n3"]).expect("to spawn command");
let mut outputs = vec![];
while let Some(output) = rx.recv().await {
outputs.push(output);
}
assert_eq!(outputs.len(), 4);
let ExecOutput::Output {
ref stdout,
ref stderr,
} = outputs[0]
else {
panic!()
};
assert_eq!(*stdout, Some("1".to_owned()));
assert_eq!(*stderr, None);
let ExecOutput::Output {
ref stdout,
ref stderr,
} = outputs[1]
else {
panic!()
};
assert_eq!(*stdout, Some("2".to_owned()));
assert_eq!(*stderr, None);
let ExecOutput::Output {
ref stdout,
ref stderr,
} = outputs[2]
else {
panic!()
};
assert_eq!(*stdout, Some("3".to_owned()));
assert_eq!(*stderr, None);
}
}

View file

@ -1,10 +1,9 @@
//! System health module //! System health module
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Utc};
use tokio::sync::watch; use tokio::sync::watch;
use super::SYSTEM; use super::{info::Info, task::TaskStatus};
const MEMORY_USAGE_CRITICAL_THRESHOLD: u64 = 90; const MEMORY_USAGE_CRITICAL_THRESHOLD: u64 = 90;
const CPU_USAGE_CRITICAL_THRESHOLD: u64 = 90; const CPU_USAGE_CRITICAL_THRESHOLD: u64 = 90;
@ -31,167 +30,89 @@ pub struct SystemHealth {
pub status: SystemStatus, pub status: SystemStatus,
} }
#[derive(Clone, PartialEq, Debug)]
pub enum TaskStatus {
Normal,
Warning,
Error,
Completed,
}
#[derive(Clone)]
pub struct TaskHealth {
status: TaskStatus,
started_on: DateTime<Utc>,
message: String,
progress: u8,
}
impl TaskHealth {
pub fn new(message: String) -> Self {
let started_on = chrono::Utc::now();
Self {
status: TaskStatus::Normal,
started_on,
message,
progress: 0,
}
}
pub fn set_normal(&mut self, message: String) {
self.status = TaskStatus::Normal;
self.message = message;
}
pub fn set_warning(&mut self, message: String) {
self.status = TaskStatus::Warning;
self.message = message;
}
pub fn set_error(&mut self, message: String) {
self.status = TaskStatus::Error;
self.message = message;
}
pub fn set_completed(mut self, message: String) {
self.status = TaskStatus::Completed;
self.progress = 100;
self.message = message;
}
pub fn set_progress(&mut self, message: String, progress: u8) {
self.progress = progress;
self.message = message;
}
pub fn status(&self) -> &TaskStatus {
&self.status
}
pub fn started_on(&self) -> &DateTime<Utc> {
&self.started_on
}
pub fn message(&self) -> &str {
&self.message
}
pub fn progress(&self) -> u8 {
self.progress
}
}
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct Health { pub struct Health {
system: SystemHealth, system: SystemHealth,
tasks: HashMap<String, TaskHealth>, tasks: HashMap<String, TaskStatus>,
} }
impl Health { impl Health {
pub fn system(&self) -> SystemHealth { pub fn system(&self) -> &SystemHealth {
self.system.clone() &self.system
} }
pub fn tasks(self) -> HashMap<String, TaskHealth> { pub fn tasks(&self) -> &HashMap<String, TaskStatus> {
self.tasks &self.tasks
} }
} }
/// `HealthMonitor` gives access to shared system health state, allowing to watch health and update /// [HealthMonitor] gives access to shared system health state, allowing to watch health and update
/// task health status. /// task health status.
/// ///
/// # Usage /// # Usage
/// Internally `HealthMonitor` uses [Arc] so it can be cheaply cloned and shared. /// Internally it uses [Arc] so it can be cheaply cloned and shared.
/// /// ```
/// ```no_run /// use prymn_agent::system::health::HealthMonitor;
/// use prymn_agent::system::health::{HealthMonitor, TaskHealth}; /// use prymn_agent::system::info::Info;
/// ///
/// let mut info = Info::new();
/// let health_monitor = HealthMonitor::new(); /// let health_monitor = HealthMonitor::new();
/// let health_monitor_clone = health_monitor.clone(); ///
/// tokio::spawn(async move { /// // Monitor health changes
/// loop { /// let _receiver = health_monitor.monitor();
/// health_monitor_clone.check_system().await; ///
/// } /// // Refresh system resources
/// }); /// info.refresh_resources();
/// tokio::spawn(async move { ///
/// health_monitor.set_task_health( /// // Update the health monitor with the refreshed info
/// "some_task".to_string(), /// health_monitor.check_system_info(&info);
/// TaskHealth::new("example".to_string())
/// );
/// });
/// ``` /// ```
#[derive(Clone)] #[derive(Clone)]
pub struct HealthMonitor { pub struct HealthMonitor {
sender: Arc<watch::Sender<Health>>, sender: Arc<watch::Sender<Health>>,
receiver: watch::Receiver<Health>,
} }
impl HealthMonitor { impl HealthMonitor {
pub fn new() -> Self { pub fn new() -> Self {
let (sender, receiver) = watch::channel(Health::default()); let (sender, _) = watch::channel(Health::default());
Self { Self {
sender: Arc::new(sender), sender: Arc::new(sender),
receiver,
} }
} }
// TODO: Remove async from here (so it can be consistent) pub fn check_system_info(&self, info: &Info) {
// Move system checking task into it's own thing
pub async fn check_system(&self) {
use sysinfo::{CpuExt, DiskExt, SystemExt}; use sysinfo::{CpuExt, DiskExt, SystemExt};
let status = tokio::task::spawn_blocking(|| { let sys = info.system();
let mut status = SystemStatus::Normal; let mut status = SystemStatus::Normal;
// TODO: For testability, dependency inject this System struct in this function.
let mut sys = SYSTEM.lock().unwrap();
// Refresh system resources usage
sys.refresh_specifics(
sysinfo::RefreshKind::new()
.with_memory()
.with_disks()
.with_cpu(sysinfo::CpuRefreshKind::new().with_cpu_usage()),
);
let mut statuses = vec![]; let mut statuses = vec![];
// Check for critical memory usage // Check for critical memory usage
let memory_usage = sys.used_memory() * 100 / sys.total_memory(); let memory_usage = if sys.total_memory() > 0 {
sys.used_memory() * 100 / sys.total_memory()
} else {
0
};
if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD { if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD {
statuses.push(CriticalReason::HighMemoryUsage); statuses.push(CriticalReason::HighMemoryUsage);
} }
// Check for critical CPU usage // Check for critical CPU usage
let cpu_usage = sys.global_cpu_info().cpu_usage(); let cpu_usage = sys.global_cpu_info().cpu_usage();
if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD as f32 { if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD as f32 {
statuses.push(CriticalReason::HighCpuUsage); statuses.push(CriticalReason::HighCpuUsage);
} }
// Check for any disk usage that is critical // Check for any disk usage that is critical
for disk in sys.disks() { for disk in sys.disks() {
let available_disk = disk.available_space() * 100 / disk.total_space(); let available_disk = if disk.total_space() > 0 {
disk.available_space() * 100 / disk.total_space()
} else {
0
};
if available_disk < 100 - DISK_USAGE_CRITICAL_THRESHOLD { if available_disk < 100 - DISK_USAGE_CRITICAL_THRESHOLD {
statuses.push(CriticalReason::HighDiskUsage); statuses.push(CriticalReason::HighDiskUsage);
} }
@ -201,11 +122,6 @@ impl HealthMonitor {
status = SystemStatus::Critical(statuses); status = SystemStatus::Critical(statuses);
} }
status
})
.await
.expect("system checking task panicked - possibly due to panicked mutex lock");
self.sender.send_if_modified(|Health { system, .. }| { self.sender.send_if_modified(|Health { system, .. }| {
if system.status == status { if system.status == status {
return false; return false;
@ -216,10 +132,23 @@ impl HealthMonitor {
}); });
} }
pub fn set_task_health(&self, task_name: String, health: TaskHealth) { /// Spawns a new tokio task that tracks from the [watch::Receiver] the status of a Prymn task
// Always send a notification in this case since it is an explicit action. /// via [TaskStatus]
self.sender.send_modify(|Health { tasks, .. }| { pub fn track_task(&self, name: String, mut task_recv: watch::Receiver<TaskStatus>) {
tasks.insert(task_name, health); let sender = self.sender.clone();
tokio::task::spawn(async move {
while task_recv.changed().await.is_ok() {
sender.send_modify(|health| {
health
.tasks
.insert(String::from(&name), task_recv.borrow().clone());
});
}
// At this point the Sender part of the watch dropped, meaning we can clear the task
// because it is complete.
sender.send_if_modified(|health| health.tasks.remove(&name).is_some());
}); });
} }
@ -229,13 +158,13 @@ impl HealthMonitor {
} }
pub fn monitor(&self) -> watch::Receiver<Health> { pub fn monitor(&self) -> watch::Receiver<Health> {
self.receiver.clone() self.sender.subscribe()
} }
} }
impl Default for HealthMonitor { impl Default for HealthMonitor {
fn default() -> Self { fn default() -> Self {
HealthMonitor::new() Self::new()
} }
} }
@ -259,44 +188,3 @@ impl std::fmt::Display for CriticalReason {
} }
} }
} }
impl std::fmt::Display for TaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskStatus::Normal => write!(f, "normal"),
TaskStatus::Warning => write!(f, "warning"),
TaskStatus::Error => write!(f, "error"),
TaskStatus::Completed => write!(f, "completed"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_monitor() {
let health_monitor = HealthMonitor::new();
let receiver = health_monitor.monitor();
assert!(receiver.has_changed().is_ok_and(|changed| !changed));
let health = TaskHealth::new("this is normal".to_owned());
health_monitor.set_task_health("some_task".to_string(), health);
assert!(receiver.has_changed().is_ok_and(|changed| changed));
{
let health = receiver.borrow();
let task_health = health.tasks.get("some_task").expect("a task should exist");
assert_eq!(task_health.status, TaskStatus::Normal);
assert_eq!(task_health.progress, 0);
assert_eq!(task_health.message, "this is normal");
}
health_monitor.clear_task("some_task");
assert!(!receiver.borrow().tasks.contains_key("some_task"));
}
}

87
agent/src/system/info.rs Normal file
View file

@ -0,0 +1,87 @@
//! System info
use std::{sync::Mutex, time::Duration};
use anyhow::Context;
use sysinfo::{CpuRefreshKind, SystemExt};
use crate::debian;
pub struct Info {
system: sysinfo::System,
updates: Vec<String>,
}
impl Info {
pub fn new() -> Self {
Self {
system: sysinfo::System::new(),
updates: Vec::new(),
}
}
pub fn refresh_resources(&mut self) {
self.system.refresh_specifics(
sysinfo::RefreshKind::new()
.with_disks_list()
.with_memory()
.with_cpu(CpuRefreshKind::new().with_cpu_usage()),
);
}
pub fn refresh_updates(&mut self) -> anyhow::Result<()> {
debian::update_package_index().context("while fetching the package index")?;
let updates =
debian::get_available_updates().context("while fetching available updates")?;
self.updates = updates;
Ok(())
}
pub fn system(&self) -> &sysinfo::System {
&self.system
}
pub fn updates(&self) -> &Vec<String> {
&self.updates
}
}
impl Default for Info {
fn default() -> Self {
Self::new()
}
}
/// Spawns a new thread that forever gathers system information.
pub fn spawn_info_subsystem() -> &'static Mutex<Info> {
const REFRESH_RESOURCES_INTERVAL: Duration = Duration::from_secs(5);
const REFRESH_UPDATES_INTERVAL: Duration = Duration::from_secs(3600);
let info = Box::new(Mutex::new(Info::new()));
let info = Box::leak(info);
std::thread::spawn(|| loop {
tracing::debug!("refreshing system resources");
#[allow(clippy::mut_mutex_lock)]
info.lock().unwrap().refresh_resources();
std::thread::sleep(REFRESH_RESOURCES_INTERVAL);
});
std::thread::spawn(|| loop {
tracing::debug!("refreshing available system updates");
#[allow(clippy::mut_mutex_lock)]
if let Err(err) = info.lock().unwrap().refresh_updates() {
tracing::warn!(?err, "failed to refresh updates");
}
std::thread::sleep(REFRESH_UPDATES_INTERVAL);
});
info
}

View file

@ -1,12 +1,5 @@
//! System boundary and modules that interact with the operating system and programs. //! System boundary and modules that interact with the operating system and programs.
use std::sync::Mutex;
use once_cell::sync::Lazy;
use sysinfo::SystemExt;
pub mod exec;
pub mod health; pub mod health;
pub mod info;
// TODO: Make this mock-able so we can test code that interacts with it pub mod task;
pub static SYSTEM: Lazy<Mutex<sysinfo::System>> = Lazy::new(|| Mutex::new(sysinfo::System::new()));

154
agent/src/system/task.rs Normal file
View file

@ -0,0 +1,154 @@
//! A task is an atomic executing routine that the agent is running, potentially in the background.
//! The task is tracked by the system monitor.
// TODO: Take a look at futures::stream::FuturesOrdered
// It is used to store futures in an ordered fashion, and it also implements Stream
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, watch};
use tokio_stream::{Stream, StreamExt};
use super::health::HealthMonitor;
#[derive(Clone, Default)]
pub struct TaskStatus {
started_on: DateTime<Utc>,
curr_step: usize,
max_steps: usize,
}
impl TaskStatus {
/// Returns the task progress as a percentage value
pub fn progress(&self) -> f32 {
100.0 * (self.curr_step as f32 / self.max_steps as f32)
}
/// Returns the datetime when this task began executing
pub fn started_on(&self) -> &DateTime<Utc> {
&self.started_on
}
fn next_step(&mut self) {
if self.curr_step < self.max_steps {
self.curr_step += 1;
}
}
}
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub struct TaskBuilder<Step> {
task: Task<Step>,
}
impl<T> TaskBuilder<T> {
pub fn new(name: String) -> Self {
let (sender, _) = watch::channel(TaskStatus::default());
Self {
task: Task {
name,
health_monitor: None,
status_channel: sender,
steps: Vec::new(),
},
}
}
/// Attaches a health monitor to notify the health system on progress made.
pub fn health_monitor(mut self, health_monitor: HealthMonitor) -> Self {
self.task.health_monitor = Some(health_monitor);
self
}
pub fn build(self) -> Task<T> {
self.task
}
}
impl<T: Send + 'static> TaskBuilder<BoxFuture<T>> {
pub fn add_step(mut self, step: impl Future<Output = T> + Send + 'static) -> Self {
self.task.add_step(step);
self
}
}
pub struct Task<T> {
name: String,
health_monitor: Option<HealthMonitor>,
status_channel: watch::Sender<TaskStatus>,
steps: Vec<T>,
}
impl<T: Send + 'static> Task<BoxFuture<T>> {
fn add_step(&mut self, step: impl Future<Output = T> + Send + 'static) {
self.steps.push(Box::pin(step))
}
/// Turn this Task into an object that implements [Stream].
///
/// The new stream will output each step's future output.
pub fn into_stream(self) -> TaskStream<T> {
if let Some(health) = &self.health_monitor {
health.track_task(self.name.clone(), self.status_channel.subscribe());
}
// Immediately notify the initial status (step 0)
self.status_channel.send_replace(TaskStatus {
started_on: Utc::now(),
curr_step: 0,
max_steps: self.steps.len(),
});
TaskStream { inner: self }
}
/// Run this task concurrently in the background.
///
/// Returns a [mpsc::Receiver<T>] which receives the returned values of each step's future
/// output.
pub fn into_background(self) -> mpsc::Receiver<T> {
let (sender, receiver) = mpsc::channel(10);
tokio::spawn(async move {
let mut stream = self.into_stream();
while let Some(value) = stream.next().await {
let _ = sender.send(value).await;
}
});
receiver
}
}
pub struct TaskStream<T> {
inner: Task<BoxFuture<T>>,
}
impl<T> Stream for TaskStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.steps.get_mut(0) {
Some(fut) => match fut.as_mut().poll(cx) {
Poll::Ready(value) => {
self.inner.steps.remove(0);
self.inner
.status_channel
.send_modify(|task| task.next_step());
Poll::Ready(Some(value))
}
Poll::Pending => Poll::Pending,
},
None => Poll::Ready(None),
}
}
}

View file

@ -0,0 +1,18 @@
use prymn_agent::system::{health::HealthMonitor, task::TaskBuilder};
#[tokio::test]
async fn task_is_gone_from_health_monitor_when_complete() {
let health_monitor = HealthMonitor::new();
let health_recv = health_monitor.monitor();
let mut task_recv = TaskBuilder::new("test task".to_owned())
.health_monitor(health_monitor)
.add_step(async { "foo" })
.add_step(async { "bar" })
.build()
.into_background();
assert_eq!(task_recv.recv().await.unwrap(), "foo");
assert_eq!(task_recv.recv().await.unwrap(), "bar");
assert!(health_recv.borrow().tasks().is_empty());
}

View file

@ -38,20 +38,31 @@ defmodule Prymn.Agents do
Health.lookup(host_address) Health.lookup(host_address)
end end
# TODO: We should not expose this api, instead wrap every GRPC call in this @doc """
# module GRPC is an "internal implementation detail" (although it probably Get the system's information (CPU, Memory usage, etc.).
# wont ever change) """
# def get_sys_info(host_address) do
# E.g. lookup_connection(host_address)
# def get_sys_info(agent) do |> Connection.get_sys_info()
# PrymnProto.Prymn.Agent.Stub.get_sys_info(agent.channel, %Google.Protobuf.Empty{}) end
# end
def get_channel(host_address) do @doc """
with [{pid, _}] <- Registry.lookup(Prymn.Agents.Registry, host_address), Perform a system update.
channel when channel != nil <- Connection.get_channel(pid) do
{:ok, channel} ## Asynchronous call
else Messages are sent to the caller in the form of the struct:
_ -> {:error, :not_found}
%PrymnProto.Prymn.SysUpdateResponse{}
"""
def sys_update(host_address, dry_run) when is_boolean(dry_run) do
lookup_connection(host_address)
|> Connection.sys_update(dry_run)
end
defp lookup_connection(host_address) when is_binary(host_address) do
case Registry.lookup(Prymn.Agents.Registry, host_address) do
[{pid, _}] -> pid
[] -> nil
end end
end end
end end

View file

@ -2,22 +2,29 @@ defmodule Prymn.Agents.Connection do
@moduledoc false @moduledoc false
alias Prymn.Agents.Health alias Prymn.Agents.Health
alias PrymnProto.Prymn.Agent.Stub
require Logger require Logger
use GenServer, restart: :transient use GenServer, restart: :transient
@timeout :timer.minutes(2) @timeout :timer.minutes(2)
@spec start_link(String.t()) :: GenServer.on_start()
def start_link(host_address) do def start_link(host_address) do
GenServer.start_link(__MODULE__, host_address, name: via(host_address)) GenServer.start_link(__MODULE__, host_address, name: via(host_address))
end end
@spec get_channel(pid) :: GRPC.Channel.t() | nil
def get_channel(server) do def get_channel(server) do
GenServer.call(server, :get_channel) GenServer.call(server, :get_channel)
end end
def get_sys_info(server) when is_pid(server) do
GenServer.call(server, :get_sys_info)
end
def sys_update(server, dry_run) when is_pid(server) and is_boolean(dry_run) do
GenServer.call(server, {:sys_update, dry_run})
end
## ##
## Server callbacks ## Server callbacks
## ##
@ -46,40 +53,53 @@ defmodule Prymn.Agents.Connection do
pid = self() pid = self()
Task.start_link(fn -> Task.start_link(fn ->
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{}) case Stub.health(channel, %Google.Protobuf.Empty{}) do
{:ok, stream} ->
# Read from the stream forever and send data back to parent # Read from the stream forever and send data back to parent
stream stream
|> Stream.each(fn {_, data} -> send(pid, data) end) |> Stream.each(fn {_, data} -> send(pid, data) end)
|> Enum.take_while(fn _ -> true end) |> Enum.take_while(fn _ -> true end)
{:error, _rpcerror} ->
send(pid, {:connect_error, :rpc_error})
end
end) end)
{:noreply, state} {:noreply, state}
end end
@impl true
def handle_cast(_, state) do
{:noreply, state}
end
@impl true @impl true
def handle_call(:get_channel, _from, {_, channel} = state) do def handle_call(:get_channel, _from, {_, channel} = state) do
{:reply, channel, state, @timeout} {:reply, channel, state, @timeout}
end end
def handle_call(:get_sys_info, _from, {_, channel} = state) do
reply = Stub.get_sys_info(channel, %Google.Protobuf.Empty{})
{:reply, reply, state, @timeout}
end
def handle_call({:sys_update, dry_run}, {from, _}, {_, channel} = state) do
request = %PrymnProto.Prymn.SysUpdateRequest{dry_run: dry_run}
streaming_call(fn -> Stub.sys_update(channel, request) end, from)
{:reply, :ok, state, @timeout}
end
@impl true @impl true
def handle_info(%GRPC.Channel{} = channel, {host, _}) do def handle_info(%GRPC.Channel{} = channel, {host, _}) do
{:noreply, {host, channel}, {:continue, :health}} {:noreply, {host, channel}, {:continue, :health}}
end end
def handle_info({:connect_error, reason}, {host, _} = state) do def handle_info({:connect_error, reason}, {host, _} = state) do
if reason == :timeout do health = Health.lookup(host, default: true)
Health.lookup(host, default: true)
|> Health.make_timed_out()
|> Health.update_and_broadcast()
end
{:stop, reason, state} case reason do
:timeout -> Health.make_timed_out(health)
:rpc_error -> Health.make_disconnected(health)
end
|> Health.update_and_broadcast()
# NOTE: Here we terminate normally, which means we won't be retrying. Maybe we want to?
{:stop, :normal, state}
end end
def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do
@ -102,7 +122,7 @@ defmodule Prymn.Agents.Connection do
end end
def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do
Health.lookup(host) Health.lookup(host, default: true)
|> Health.make_disconnected() |> Health.make_disconnected()
|> Health.update_and_broadcast() |> Health.update_and_broadcast()
@ -136,4 +156,18 @@ defmodule Prymn.Agents.Connection do
receive_loop(pid) receive_loop(pid)
end end
defp streaming_call(fun, from) do
Task.start_link(fn ->
case fun.() do
{:ok, stream} ->
stream
|> Stream.each(fn {:ok, data} -> send(from, data) end)
|> Enum.to_list()
{:error, _error} ->
:todo
end
end)
end
end end

View file

@ -5,13 +5,14 @@ defmodule Prymn.Agents.Health do
getting depleted, or if it's unable be reached. getting depleted, or if it's unable be reached.
""" """
defstruct [:host, :version, message: "Unknown"] defstruct [:host, :version, :status, tasks: [], message: "Unknown"]
alias PrymnProto.Prymn.HealthResponse alias PrymnProto.Prymn.HealthResponse
@type t :: %{ @type t :: %{
host: String.t(), host: String.t(),
version: String.t(), version: String.t(),
status: atom(),
message: String.t() message: String.t()
} }
@ -51,15 +52,15 @@ defmodule Prymn.Agents.Health do
end end
def make_timed_out(%__MODULE__{} = health) do def make_timed_out(%__MODULE__{} = health) do
%__MODULE__{health | message: "Connect timed out"} %__MODULE__{health | status: :unreachable, message: "Connect timed out"}
end end
def make_disconnected(%__MODULE__{} = health) do def make_disconnected(%__MODULE__{} = health) do
%__MODULE__{health | message: "Disconnected"} %__MODULE__{health | status: :disconnected, message: "Disconnected"}
end end
def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do
%__MODULE__{host: host} %__MODULE__{host: host, status: :connected}
|> do_version(version) |> do_version(version)
|> do_system(system) |> do_system(system)
|> do_tasks(tasks) |> do_tasks(tasks)
@ -76,7 +77,13 @@ defmodule Prymn.Agents.Health do
end end
end end
defp do_tasks(health, _tasks) do defp do_tasks(health, tasks) do
health tasks =
Enum.map(tasks, fn {task_key, task_value} ->
progress = Float.round(task_value.progress, 2)
{task_key, %{task_value | progress: "#{progress}%"}}
end)
%__MODULE__{health | tasks: tasks}
end end
end end

View file

@ -212,8 +212,9 @@ defmodule PrymnWeb.CoreComponents do
def button(assigns) do def button(assigns) do
assigns = assigns =
assign(assigns, :style, [ assign(assigns, :style, [
"phx-submit-loading:opacity-75 rounded-lg bg-zinc-900 hover:bg-zinc-700 py-2 px-3", "inline-flex items-center rounded-lg bg-zinc-900 hover:bg-zinc-700 py-2 px-3",
"text-sm font-semibold leading-6 text-white active:text-white/80", "font-semibold leading-6 text-sm text-white active:text-white/80",
"phx-submit-loading:opacity-75 phx-click-loading:opacity-75 disabled:cursor-not-allowed",
assigns.class assigns.class
]) ])
@ -228,7 +229,8 @@ defmodule PrymnWeb.CoreComponents do
_ -> _ ->
~H""" ~H"""
<button type={@type} class={@style} {@rest}> <button type={@type} class={@style} {@rest}>
<%= render_slot(@inner_block) %> <span><%= render_slot(@inner_block) %></span>
<.spinner class="ml-2 hidden w-6 text-white phx-click-loading:inline-block phx-submit-loading:inline-block" />
</button> </button>
""" """
end end
@ -594,6 +596,30 @@ defmodule PrymnWeb.CoreComponents do
""" """
end end
@doc """
Renders a spinner.
"""
attr :class, :string, default: nil
attr :rest, :global
def spinner(assigns) do
~H"""
<div class={["animate-spin", assigns.class]} {@rest}>
<svg viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg">
<path
fill="currentColor"
d="M12,1A11,11,0,1,0,23,12,11,11,0,0,0,12,1Zm0,19a8,8,0,1,1,8-8A8,8,0,0,1,12,20Z"
opacity=".25"
/>
<path
fill="currentColor"
d="M12,4a8,8,0,0,1,7.89,6.7A1.53,1.53,0,0,0,21.38,12h0a1.5,1.5,0,0,0,1.48-1.75,11,11,0,0,0-21.72,0A1.5,1.5,0,0,0,2.62,12h0a1.53,1.53,0,0,0,1.49-1.3A8,8,0,0,1,12,4Z"
/>
</svg>
</div>
"""
end
## JS Commands ## JS Commands
def show(js \\ %JS{}, selector) do def show(js \\ %JS{}, selector) do

View file

@ -58,29 +58,29 @@ defmodule PrymnWeb.ServerLive.Index do
case {assigns.status, assigns.health} do case {assigns.status, assigns.health} do
{:unregistered, _} -> {:unregistered, _} ->
~H""" ~H"""
<span class="text-gray-500">Needs registration</span> <span class="self-center text-sm text-gray-500">Needs registration</span>
""" """
{:registered, nil} -> {:registered, nil} ->
~H""" ~H"""
<span class="text-yellow-600">Connecting...</span> <.spinner class="w-5" />
""" """
{:registered, %Agents.Health{message: "Connected"}} -> {:registered, %Agents.Health{status: :connected}} ->
~H""" ~H"""
<span class="text-green-600">Connected</span> <span class="self-center text-sm text-green-600">Connected</span>
""" """
{:registered, %Agents.Health{message: "Disconnected"}} -> {:registered, %Agents.Health{status: :disconnected}} ->
~H""" ~H"""
<span class="text-red-600">Disconnected</span> <span class="self-center text-sm text-red-600">Disconnected</span>
""" """
{:registered, %Agents.Health{message: message}} -> {:registered, %Agents.Health{message: message}} ->
assigns = assign(assigns, :message, message) assigns = assign(assigns, :message, message)
~H""" ~H"""
<span class="text-yellow-900"><%= @message %></span> <span class="self-center text-sm text-yellow-900"><%= @message %></span>
""" """
end end
end end

View file

@ -16,12 +16,19 @@
> >
<div class="flex flex-row flex-wrap justify-between"> <div class="flex flex-row flex-wrap justify-between">
<h2 class="text-xl"><%= server.name %></h2> <h2 class="text-xl"><%= server.name %></h2>
<span class="self-center text-sm">
<.server_status status={server.status} health={@healths[server.public_ip]} /> <.server_status status={server.status} health={@healths[server.public_ip]} />
</span>
</div> </div>
<div class="lg:text-sm"> <div class="flex flex-row flex-wrap justify-between lg:text-sm">
<span>IP: <%= server.public_ip || "N/A" %></span> <span>IP: <%= server.public_ip || "N/A" %></span>
<span
:if={@healths[server.public_ip] && Enum.count(@healths[server.public_ip].tasks)}
class="text-right text-xs text-slate-700"
>
<%= for {name, task} <- Enum.take(@healths[server.public_ip].tasks, 1) do %>
<div>In progress: <%= name %></div>
<div><%= task.progress %></div>
<% end %>
</span>
</div> </div>
</.link> </.link>
</div> </div>

View file

@ -1,6 +1,7 @@
defmodule PrymnWeb.ServerLive.Show do defmodule PrymnWeb.ServerLive.Show do
use PrymnWeb, :live_view use PrymnWeb, :live_view
require Logger
alias Prymn.{Agents, Servers} alias Prymn.{Agents, Servers}
@impl true @impl true
@ -11,12 +12,11 @@ defmodule PrymnWeb.ServerLive.Show do
@impl true @impl true
def handle_params(%{"id" => id}, _, socket) do def handle_params(%{"id" => id}, _, socket) do
server = Servers.get_server!(id) server = Servers.get_server!(id)
pid = self()
if connected?(socket) and server.status == :registered do if connected?(socket) and server.status == :registered do
Agents.subscribe_to_health(server.public_ip) Agents.subscribe_to_health(server.public_ip)
Agents.start_connection(server.public_ip) Agents.start_connection(server.public_ip)
Task.start_link(fn -> get_sys_info(pid, server.public_ip) end) send(self(), :get_sys_info)
end end
health = Agents.get_health(server.public_ip) health = Agents.get_health(server.public_ip)
@ -26,32 +26,83 @@ defmodule PrymnWeb.ServerLive.Show do
|> assign(:health, health || %{message: "Connecting..."}) |> assign(:health, health || %{message: "Connecting..."})
|> assign(:page_title, server.name) |> assign(:page_title, server.name)
|> assign(:server, server) |> assign(:server, server)
|> assign(:uptime, 0) |> assign(:dry_run, false)
|> assign(:cpus, []) |> assign(:update_output, [])
|> assign(:used_disk, 0) |> assign(:sys_info, nil)
|> assign(:total_memory, 0) # TODO: Do not assign this to the socket - instead generate it in the HTML
|> assign(:used_memory, 0)
|> assign(:registration_command, Servers.create_setup_command(server))} |> assign(:registration_command, Servers.create_setup_command(server))}
end end
@impl true @impl true
def handle_info(:get_sys_info, socket) do
status = get_in(socket.assigns, [:health, Access.key(:status)])
host_address = get_in(socket.assigns, [:server, Access.key(:public_ip)])
pid = self()
if host_address != nil and status == :connected do
Task.start_link(fn ->
{:ok, sys_info} = Agents.get_sys_info(host_address)
send(pid, sys_info)
end)
end
# 10 seconds is >5 which is gun's timeout duration (which might have a race
# condition if they are equal)
Process.send_after(self(), :get_sys_info, :timer.seconds(10))
{:noreply, socket}
end
def handle_info(%PrymnProto.Prymn.SysInfoResponse{} = response, socket) do def handle_info(%PrymnProto.Prymn.SysInfoResponse{} = response, socket) do
# TODO: GRPC calls should be done through wrapper functions. Necessary
# calculations should be done then and there.
{:noreply, {:noreply,
socket socket
|> assign(:uptime, response.uptime) |> assign(:sys_info, response)
|> assign( |> assign(:updates_available, response.updates_available)
:used_memory,
bytes_to_gigabytes(response.mem_total_bytes - response.mem_avail_bytes)
)
|> assign(:total_memory, bytes_to_gigabytes(response.mem_total_bytes))
|> assign(:used_disk, calculate_disk_used_percent(response.disks))
|> assign(:cpus, response.cpus)} |> assign(:cpus, response.cpus)}
end end
def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do
output = String.split(response.output, "\n")
socket = assign(socket, :update_output, output)
{:noreply, socket}
end
def handle_info(%Agents.Health{} = health, socket) do def handle_info(%Agents.Health{} = health, socket) do
{:noreply, assign(socket, :health, health)} {:noreply, assign(socket, :health, health)}
end end
@impl true
def handle_event("system_update", _params, socket) do
host_address = get_in(socket.assigns, [:server, Access.key(:public_ip)])
server_name = get_in(socket.assigns, [:server, Access.key(:name)])
socket =
if host_address do
Agents.sys_update(host_address, socket.assigns.dry_run)
put_flash(socket, :info, "Started a system update on server #{server_name}.")
else
put_flash(
socket,
:error,
"Could not perform the update. Your server does not seem to have an address"
)
end
{:noreply, socket}
end
def handle_event("change_dry_run", %{"dry_run" => enabled}, socket) do
enabled = (enabled == "true" && true) || false
{:noreply, assign(socket, :dry_run, enabled)}
end
defp calculate_cpu_usage(cpus) do
(Enum.reduce(cpus, 0, fn x, acc -> x.usage + acc end) / Enum.count(cpus))
|> Float.round(2)
end
defp bytes_to_gigabytes(bytes) do defp bytes_to_gigabytes(bytes) do
Float.round(bytes / Integer.pow(1024, 3), 2) Float.round(bytes / Integer.pow(1024, 3), 2)
end end
@ -66,17 +117,4 @@ defmodule PrymnWeb.ServerLive.Show do
Float.round(100 * used / total, 2) Float.round(100 * used / total, 2)
end end
defp get_sys_info(from, host_address) do
alias PrymnProto.Prymn.Agent
with {:ok, channel} <- Agents.get_channel(host_address),
{:ok, reply} <- Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) do
send(from, reply)
end
Process.sleep(:timer.seconds(5))
get_sys_info(from, host_address)
end
end end

View file

@ -1,12 +1,15 @@
<.header> <.header>
<span class="relative flex items-center">
Server <%= @server.name %>
<span <span
role="tooltip" role="tooltip"
class="relative inline-flex h-3 w-3 class={[
before:-translate-x-1/2 before:-translate-y-full before:-top-2 "absolute -left-6 inline-flex h-3 w-3 before:-translate-x-1/2 before:-translate-y-full",
before:left-1/2 before:absolute before:text-sm before:text-white "before:-top-2 before:left-1/2 before:absolute before:text-sm before:text-white",
before:font-normal before:content-[attr(data-tip)] before:opacity-0 "before:font-normal before:content-[attr(data-tip)] before:opacity-0",
hover:before:opacity-100 before:py-1 before:px-2 before:bg-black "hover:before:opacity-100 before:py-1 before:px-2 before:bg-black",
before:rounded before:pointer-events-none before:transition-opacity" "before:rounded before:pointer-events-none before:transition-opacity"
]}
data-tip={@health.message} data-tip={@health.message}
> >
<%= case @health.message do %> <%= case @health.message do %>
@ -19,10 +22,20 @@
<span class="h-3 w-3 rounded-full bg-yellow-500" /> <span class="h-3 w-3 rounded-full bg-yellow-500" />
<% end %> <% end %>
</span> </span>
<span class="ml-3">Server <%= @server.name %></span> </span>
<:subtitle>
<%= @server.public_ip %>
</:subtitle>
</.header> </.header>
<section :if={@server.status == :unregistered} class="my-10"> <div class="my-3 text-sm text-slate-700">
<%= for {name, task} <- @health.tasks do %>
Background task in progress: <%= name %>
<p><%= task.progress %> complete</p>
<% end %>
</div>
<div :if={@server.status == :unregistered} class="my-10">
<p class="mb-9"> <p class="mb-9">
Connect to your server using root credentials and execute the following command: Connect to your server using root credentials and execute the following command:
</p> </p>
@ -40,34 +53,58 @@
/> />
</button> </button>
</div> </div>
</section> </div>
<section <div :if={@server.status == :registered && @sys_info} class="my-10">
:if={@server.status == :registered} <section class="flex justify-between rounded bg-gray-800 p-5 text-white">
class="my-10 flex justify-between rounded bg-gray-800 p-5 text-white"
>
<div> <div>
<p class="text-xl"><%= @uptime || "" %>s</p> <p class="text-xl"><%= @sys_info.uptime || "" %>s</p>
<p class="text-sm">Uptime</p> <p class="text-sm">Uptime</p>
</div> </div>
<div class="ml-4"> <div class="ml-4">
<p class="text-xl"><%= Enum.count(@cpus || []) %></p> <p class="text-xl"><%= Enum.count(@sys_info.cpus || []) %></p>
<p class="text-sm">CPUs</p> <p class="text-sm">CPUs</p>
</div> </div>
<div class="ml-4">
<p class="text-xl"><%= calculate_cpu_usage(@sys_info.cpus) %></p>
<p class="text-sm">CPU%</p>
</div>
<div class="ml-4"> <div class="ml-4">
<p class="text-xl"> <p class="text-xl">
<%= @used_memory || 0 %> / <%= @total_memory || 0 %> <%= bytes_to_gigabytes(@sys_info.mem_total_bytes - @sys_info.mem_avail_bytes) %>
<span>/</span>
<%= bytes_to_gigabytes(@sys_info.mem_total_bytes) %>
<span>GiB</span> <span>GiB</span>
</p> </p>
<p class="text-sm">Memory</p> <p class="text-sm">Memory</p>
</div> </div>
<div class="ml-4"> <div class="ml-4">
<p class="text-xl"> <p class="text-xl">
<%= @used_disk %> <%= calculate_disk_used_percent(@sys_info.disks) %>
<span>%</span> <span>%</span>
</p> </p>
<p class="text-sm">Used Disk</p> <p class="text-sm">Used Disk</p>
</div> </div>
</section> </section>
<section class="mt-4">
<form phx-change="change_dry_run">
<.input type="checkbox" name="dry_run" value={@dry_run} label="Enable dry-run operations" />
</form>
</section>
<section class="mt-4">
<h2 class="border-b border-solid border-gray-500 pb-1 text-2xl font-medium">System</h2>
<p class="mt-4">
Updates: <%= @sys_info.updates_available %> pending updates.
<.button type="button" class="ml-4" phx-click="system_update">
Update now
</.button>
<p :for={output <- assigns.update_output}>
<%= output %>
</p>
</p>
</section>
</div>
<.back navigate={~p"/servers"}>Back to servers</.back> <.back navigate={~p"/servers"}>Back to servers</.back>

View file

@ -10,10 +10,8 @@ message SystemHealth {
} }
message TaskHealth { message TaskHealth {
string status = 1; string started_on = 1;
string message = 2; float progress = 2;
string started_on = 3;
int32 progress = 4;
} }
message HealthResponse { message HealthResponse {
@ -44,6 +42,7 @@ message SysInfoResponse {
uint64 swap_free_bytes = 7; uint64 swap_free_bytes = 7;
repeated Cpu cpus = 8; repeated Cpu cpus = 8;
repeated Disk disks = 9; repeated Disk disks = 9;
uint32 updates_available = 10;
} }
message ExecRequest { message ExecRequest {
@ -52,20 +51,26 @@ message ExecRequest {
} }
message ExecResponse { message ExecResponse {
message Output { oneof out {
string stdout = 1; string stdout = 1;
string stderr = 2; string stderr = 2;
}
oneof response {
Output output = 1;
int32 exit_code = 2;
string error = 3; string error = 3;
int32 exit_code = 4;
} }
} }
message SysUpdateRequest {
bool dry_run = 1;
}
message SysUpdateResponse {
string output = 1;
int32 progress = 2;
}
service Agent { service Agent {
rpc Health(google.protobuf.Empty) returns (stream HealthResponse); rpc Health(google.protobuf.Empty) returns (stream HealthResponse);
rpc GetSysInfo(google.protobuf.Empty) returns (SysInfoResponse);
rpc Exec(ExecRequest) returns (stream ExecResponse); rpc Exec(ExecRequest) returns (stream ExecResponse);
rpc GetSysInfo(google.protobuf.Empty) returns (SysInfoResponse);
rpc SysUpdate(SysUpdateRequest) returns (stream SysUpdateResponse);
} }