messaging stuff
This commit is contained in:
parent
4c951a5bfe
commit
5a6ad517d2
5 changed files with 186 additions and 70 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -665,9 +665,11 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"async-nats",
|
||||
"bytes",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sysinfo",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
|
|
|
@ -6,9 +6,11 @@ edition = "2021"
|
|||
[dependencies]
|
||||
anyhow = "1.0.79"
|
||||
async-nats = "0.33.0"
|
||||
bytes = "1.5.0"
|
||||
serde = { version = "1.0.195", features = ["derive"] }
|
||||
serde_json = "1.0.111"
|
||||
sysinfo = { version = "0.30.5", default-features = false }
|
||||
thiserror = "1.0.56"
|
||||
tokio = { version = "1.35.1", features = ["full"] }
|
||||
tokio-stream = { version = "0.1.14", default-features = false }
|
||||
tracing = "0.1.40"
|
||||
|
|
|
@ -23,15 +23,15 @@ impl System {
|
|||
}
|
||||
|
||||
pub fn refresh_resources(&mut self) {
|
||||
use sysinfo::{CpuRefreshKind, MemoryRefreshKind};
|
||||
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind};
|
||||
|
||||
self.sys.refresh_specifics(
|
||||
sysinfo::RefreshKind::new()
|
||||
RefreshKind::new()
|
||||
.with_memory(MemoryRefreshKind::new().with_ram().with_swap())
|
||||
.with_cpu(CpuRefreshKind::new().with_cpu_usage()),
|
||||
.with_cpu(CpuRefreshKind::everything()),
|
||||
);
|
||||
|
||||
self.disks.refresh_list();
|
||||
// self.disks.refresh_list();
|
||||
}
|
||||
|
||||
pub fn system(&self) -> &sysinfo::System {
|
||||
|
@ -47,17 +47,14 @@ impl System {
|
|||
pub enum Status {
|
||||
#[default]
|
||||
Normal,
|
||||
MemoryWarning,
|
||||
MemoryCritical,
|
||||
CpuWarning,
|
||||
CpuCritical,
|
||||
DiskWarning,
|
||||
DiskCritical,
|
||||
Critical,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
pub struct Health {
|
||||
status: Vec<Status>,
|
||||
cpu_status: Status,
|
||||
memory_status: Status,
|
||||
disk_status: Status,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -65,26 +62,24 @@ pub struct HealthMonitor(Arc<watch::Sender<Health>>);
|
|||
|
||||
impl HealthMonitor {
|
||||
pub fn new() -> Self {
|
||||
let (sender, _) = watch::channel(Health {
|
||||
status: [Status::default()].into(),
|
||||
});
|
||||
let (sender, _) = watch::channel(Health::default());
|
||||
|
||||
Self(Arc::new(sender))
|
||||
}
|
||||
|
||||
pub fn check_system(&self, system: &System) {
|
||||
let s = system.system();
|
||||
let sys = system.system();
|
||||
|
||||
let memory_usage = if s.total_memory() > 0 {
|
||||
s.used_memory() as f64 * 100.0 / s.total_memory() as f64
|
||||
let memory_usage = if sys.total_memory() > 0 {
|
||||
sys.used_memory() as f64 * 100.0 / sys.total_memory() as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
let cpu_usage = s.global_cpu_info().cpu_usage();
|
||||
let cpu_usage = sys.global_cpu_info().cpu_usage();
|
||||
|
||||
// for d in system.disks().list() {
|
||||
// let avail = if d.total_space() > 0 {
|
||||
// let _avail = if d.total_space() > 0 {
|
||||
// (d.available_space() * 100 / d.total_space()) as u8
|
||||
// } else {
|
||||
// 0 as u8
|
||||
|
@ -92,36 +87,31 @@ impl HealthMonitor {
|
|||
// }
|
||||
|
||||
self.0.send_if_modified(|health| {
|
||||
let cpu_changed = if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD {
|
||||
health
|
||||
.status
|
||||
.iter()
|
||||
.find(|&it| *it == Status::CpuCritical)
|
||||
.and(None)
|
||||
.unwrap_or_else(|| {
|
||||
health.status.push(Status::CpuCritical);
|
||||
true
|
||||
})
|
||||
} else {
|
||||
false
|
||||
let cpu_changed = match health.cpu_status {
|
||||
Status::Normal if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD => {
|
||||
health.cpu_status = Status::Critical;
|
||||
true
|
||||
}
|
||||
Status::Critical if cpu_usage <= CPU_USAGE_CRITICAL_THRESHOLD => {
|
||||
health.cpu_status = Status::Normal;
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
let mem_changed = if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD {
|
||||
health
|
||||
.status
|
||||
.iter()
|
||||
.find(|&it| *it == Status::MemoryCritical)
|
||||
.and(None)
|
||||
.unwrap_or_else(|| {
|
||||
health.status.push(Status::MemoryCritical);
|
||||
true
|
||||
})
|
||||
} else {
|
||||
false
|
||||
let memory_changed = match health.memory_status {
|
||||
Status::Normal if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD => {
|
||||
health.memory_status = Status::Critical;
|
||||
true
|
||||
}
|
||||
Status::Critical if memory_usage <= MEMORY_USAGE_CRITICAL_THRESHOLD => {
|
||||
health.memory_status = Status::Normal;
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
dbg!(cpu_changed || mem_changed);
|
||||
cpu_changed || mem_changed
|
||||
cpu_changed || memory_changed
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -129,3 +119,10 @@ impl HealthMonitor {
|
|||
self.0.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for HealthMonitor {
|
||||
#[inline]
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use async_nats::{Client, ConnectOptions};
|
||||
use tracing::Level;
|
||||
|
||||
mod health;
|
||||
mod messaging;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
|
@ -14,31 +14,31 @@ async fn main() -> anyhow::Result<()> {
|
|||
tracing::subscriber::set_global_default(subscriber)
|
||||
.expect("to set a tracing global subscriber");
|
||||
|
||||
let client = ConnectOptions::new()
|
||||
.name("Prymn Agent demo_agent")
|
||||
.custom_inbox_prefix("_INBOX_demo_agent")
|
||||
.user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned())
|
||||
.connect("localhost")
|
||||
.await
|
||||
.map_err(|err| err)?;
|
||||
|
||||
tracing::info!("connected to nats server");
|
||||
|
||||
init_health_subsystem(client);
|
||||
|
||||
// wait_for_commands(client).await;
|
||||
run().await.map_err(|err| {
|
||||
tracing::error!(cause = %err, "could not start agent");
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
async fn run() -> anyhow::Result<()> {
|
||||
let agent = messaging::Agent::connect("demo_agent").await?;
|
||||
tracing::info!("initialized messaging system");
|
||||
init_health_subsystem(agent.clone());
|
||||
tracing::info!("initialized health system");
|
||||
tracing::info!("agent is ready");
|
||||
tokio::signal::ctrl_c().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_health_subsystem(client: Client) {
|
||||
let system = health::System::new();
|
||||
fn init_health_subsystem(agent: messaging::Agent) {
|
||||
let mut system = health::System::new();
|
||||
let health_monitor = health::HealthMonitor::new();
|
||||
let health_monitor_clone = health_monitor.clone();
|
||||
|
||||
// Forever refresh system resources and monitor changes
|
||||
std::thread::spawn(move || loop {
|
||||
const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
system.refresh_resources();
|
||||
health_monitor.check_system(&system);
|
||||
std::thread::sleep(REFRESH_INTERVAL);
|
||||
});
|
||||
|
@ -47,14 +47,10 @@ fn init_health_subsystem(client: Client) {
|
|||
let mut recv = health_monitor_clone.monitor();
|
||||
|
||||
while let Ok(()) = recv.changed().await {
|
||||
let payload = serde_json::to_string(&*recv.borrow()).unwrap();
|
||||
|
||||
tracing::info!(%payload, "sending health event");
|
||||
|
||||
client
|
||||
.publish("agents.v1.demo_agent.health", payload.into())
|
||||
.await
|
||||
.unwrap();
|
||||
tracing::info!(health = ?&*recv.borrow(), "health watermark");
|
||||
let payload = serde_json::to_string(&*recv.borrow()).expect("serializable health");
|
||||
let message = messaging::OutgoingMessage::Health(payload);
|
||||
agent.publish(message).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
119
agent/src/messaging.rs
Normal file
119
agent/src/messaging.rs
Normal file
|
@ -0,0 +1,119 @@
|
|||
//! Prymn messaging framework
|
||||
|
||||
use async_nats::SubscribeError;
|
||||
use async_nats::{Client, ConnectError, ConnectOptions};
|
||||
use thiserror::Error;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
pub use self::v1::OutgoingMessage;
|
||||
|
||||
mod v1 {
|
||||
use bytes::Bytes;
|
||||
|
||||
pub(super) const PREFIX: &'static str = "agents.v1";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OutgoingMessage {
|
||||
Health(String),
|
||||
}
|
||||
|
||||
impl OutgoingMessage {
|
||||
pub fn subject(&self, sender: &str) -> String {
|
||||
match self {
|
||||
OutgoingMessage::Health(_) => format!("{}.{}.health", PREFIX, sender),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_payload(self) -> Bytes {
|
||||
match self {
|
||||
OutgoingMessage::Health(data) => Bytes::from(data.into_bytes()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #[derive(Debug)]
|
||||
// pub enum Subject {
|
||||
// Command,
|
||||
// }
|
||||
|
||||
// /// A system update message received when an OS update is requested
|
||||
// #[derive(Deserialize)]
|
||||
// struct SystemUpdate {
|
||||
// /// `true` when a simulated update should occur
|
||||
// simulate: bool,
|
||||
// }
|
||||
|
||||
// /// An agent update message received when an agent binary update is requested
|
||||
// #[derive(Deserialize)]
|
||||
// struct AgentUpdate {
|
||||
// /// `true` when a simulated update should occur
|
||||
// simulate: bool,
|
||||
// }
|
||||
|
||||
// /// An operating system program execution.
|
||||
// #[derive(Deserialize)]
|
||||
// struct Command<'a> {
|
||||
// /// The user to which the child process will switch to
|
||||
// user: &'a str,
|
||||
// /// Program name or path
|
||||
// program: &'a str,
|
||||
// /// Program arguments
|
||||
// args: Vec<&'a str>,
|
||||
// }
|
||||
|
||||
// struct OpenTerminal {
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MessagingError {
|
||||
#[error("failure connecting to message broker")]
|
||||
ConnectError(#[from] ConnectError),
|
||||
#[error("failure listening for messages from broker")]
|
||||
SubscribeError(#[from] SubscribeError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Agent {
|
||||
id: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl Agent {
|
||||
#[tracing::instrument]
|
||||
pub async fn connect(id: &str) -> Result<Self, MessagingError> {
|
||||
let client = ConnectOptions::new()
|
||||
.name(format!("Prymn Agent {id}"))
|
||||
.custom_inbox_prefix(format!("_INBOX_{id}"))
|
||||
.user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned())
|
||||
.connect("localhost")
|
||||
.await?;
|
||||
|
||||
tracing::debug!("connected to NATS");
|
||||
let prefix = format!("{}.{}.*", v1::PREFIX, id);
|
||||
let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
|
||||
tracing::debug!("root message routing initialized on {}", &prefix);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(data) = routing_subscriber.next().await {
|
||||
let command = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]);
|
||||
dbg!(command);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
id: id.to_owned(),
|
||||
client,
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn publish(&self, msg: OutgoingMessage) {
|
||||
let subj = msg.subject(&self.id);
|
||||
tracing::debug!(subject = &subj, "publish");
|
||||
if let Err(err) = self.client.publish(subj, msg.into_payload()).await {
|
||||
tracing::error!(cause = %err, "publish error");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue