uggggh
This commit is contained in:
parent
e2c50ca174
commit
67d147c7e3
6 changed files with 261 additions and 196 deletions
49
Cargo.lock
generated
49
Cargo.lock
generated
|
@ -144,6 +144,16 @@ version = "1.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "chrono"
|
||||||
|
version = "0.4.33"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
|
||||||
|
dependencies = [
|
||||||
|
"num-traits",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "const-oid"
|
name = "const-oid"
|
||||||
version = "0.9.6"
|
version = "0.9.6"
|
||||||
|
@ -515,6 +525,15 @@ dependencies = [
|
||||||
"rand",
|
"rand",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num-traits"
|
||||||
|
version = "0.2.17"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num_cpus"
|
name = "num_cpus"
|
||||||
version = "1.16.0"
|
version = "1.16.0"
|
||||||
|
@ -666,13 +685,15 @@ dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-nats",
|
"async-nats",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"chrono",
|
||||||
|
"futures",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sysinfo",
|
"sysinfo",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tower",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
]
|
]
|
||||||
|
@ -1197,39 +1218,25 @@ dependencies = [
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower"
|
name = "tokio-util"
|
||||||
version = "0.4.13"
|
version = "0.7.10"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-sink",
|
||||||
"pin-project",
|
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tower-layer",
|
"tokio",
|
||||||
"tower-service",
|
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tower-layer"
|
|
||||||
version = "0.3.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tower-service"
|
|
||||||
version = "0.3.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing"
|
name = "tracing"
|
||||||
version = "0.1.40"
|
version = "0.1.40"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
|
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tracing-attributes",
|
"tracing-attributes",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
|
|
|
@ -7,12 +7,14 @@ edition = "2021"
|
||||||
anyhow = "1.0.79"
|
anyhow = "1.0.79"
|
||||||
async-nats = "0.33.0"
|
async-nats = "0.33.0"
|
||||||
bytes = "1.5.0"
|
bytes = "1.5.0"
|
||||||
|
chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] }
|
||||||
|
futures = { version = "0.3.30", default-features = false, features = ["std"] }
|
||||||
serde = { version = "1.0.195", features = ["derive"] }
|
serde = { version = "1.0.195", features = ["derive"] }
|
||||||
serde_json = "1.0.111"
|
serde_json = "1.0.111"
|
||||||
sysinfo = { version = "0.30.5", default-features = false }
|
sysinfo = { version = "0.30.5", default-features = false }
|
||||||
thiserror = "1.0.56"
|
thiserror = "1.0.56"
|
||||||
tokio = { version = "1.35.1", features = ["full"] }
|
tokio = { version = "1.35.1", features = ["full"] }
|
||||||
tokio-stream = { version = "0.1.14", default-features = false }
|
tokio-stream = { version = "0.1.14", default-features = false }
|
||||||
tower = { version = "0.4.13", features = ["steer", "util"] }
|
tokio-util = { version = "0.7.10", features = ["codec"] }
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
tracing-subscriber = { version = "0.3.18", features = ["fmt"] }
|
tracing-subscriber = { version = "0.3.18", features = ["fmt"] }
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
//! System health information and checking
|
//! System health information and checking
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use serde::Serialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use crate::messaging::{Client, Message};
|
||||||
|
|
||||||
const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
|
const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
|
||||||
const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
|
const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
|
||||||
const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
|
const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
|
||||||
|
@ -43,14 +45,14 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, PartialEq, Serialize)]
|
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||||
pub enum Status {
|
pub enum Status {
|
||||||
#[default]
|
#[default]
|
||||||
Normal,
|
Normal,
|
||||||
Critical,
|
Critical,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Serialize)]
|
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||||
pub struct Health {
|
pub struct Health {
|
||||||
cpu_status: Status,
|
cpu_status: Status,
|
||||||
memory_status: Status,
|
memory_status: Status,
|
||||||
|
@ -126,3 +128,30 @@ impl Default for HealthMonitor {
|
||||||
Self::new()
|
Self::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn init_health_subsystem(client: Client) -> HealthMonitor {
|
||||||
|
let health_monitor = HealthMonitor::new();
|
||||||
|
let health_monitor_clone = health_monitor.clone();
|
||||||
|
let health_monitor_ret = health_monitor.clone();
|
||||||
|
let mut system = System::new();
|
||||||
|
|
||||||
|
// Forever refresh system resources and monitor changes
|
||||||
|
std::thread::spawn(move || loop {
|
||||||
|
const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
|
||||||
|
system.refresh_resources();
|
||||||
|
health_monitor.check_system(&system);
|
||||||
|
std::thread::sleep(REFRESH_INTERVAL);
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut recv = health_monitor_clone.monitor();
|
||||||
|
|
||||||
|
while let Ok(()) = recv.changed().await {
|
||||||
|
tracing::info!(health = ?&*recv.borrow(), "health watermark");
|
||||||
|
let health = recv.borrow().clone();
|
||||||
|
client.publish(Message::health(health).unwrap()).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
health_monitor_ret
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use std::time::Duration;
|
use health::init_health_subsystem;
|
||||||
|
use messaging::Client;
|
||||||
|
use services::init_services;
|
||||||
use tracing::Level;
|
use tracing::Level;
|
||||||
|
|
||||||
mod health;
|
mod health;
|
||||||
|
@ -22,37 +23,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run() -> anyhow::Result<()> {
|
async fn run() -> anyhow::Result<()> {
|
||||||
let agent = messaging::Client::connect("demo_agent").await?;
|
let client = Client::connect("demo_agent").await?;
|
||||||
tracing::info!("initialized messaging system");
|
|
||||||
init_health_subsystem(agent.clone());
|
let _health_monitor = init_health_subsystem(client.clone()).await;
|
||||||
tracing::info!("initialized health system");
|
tracing::info!("initialized health system");
|
||||||
// services::init_services().await;
|
|
||||||
|
init_services(client).await;
|
||||||
|
tracing::info!("initialized services");
|
||||||
|
|
||||||
tracing::info!("agent is ready");
|
tracing::info!("agent is ready");
|
||||||
tokio::signal::ctrl_c().await?;
|
tokio::signal::ctrl_c().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_health_subsystem(agent: messaging::Client) {
|
|
||||||
let mut system = health::System::new();
|
|
||||||
let health_monitor = health::HealthMonitor::new();
|
|
||||||
let health_monitor_clone = health_monitor.clone();
|
|
||||||
|
|
||||||
// Forever refresh system resources and monitor changes
|
|
||||||
std::thread::spawn(move || loop {
|
|
||||||
const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
|
|
||||||
system.refresh_resources();
|
|
||||||
health_monitor.check_system(&system);
|
|
||||||
std::thread::sleep(REFRESH_INTERVAL);
|
|
||||||
});
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut recv = health_monitor_clone.monitor();
|
|
||||||
|
|
||||||
while let Ok(()) = recv.changed().await {
|
|
||||||
tracing::info!(health = ?&*recv.borrow(), "health watermark");
|
|
||||||
let payload = serde_json::to_string(&*recv.borrow()).expect("serializable health");
|
|
||||||
let message = messaging::OutgoingMessage::Health(payload);
|
|
||||||
agent.publish(message).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,102 +1,80 @@
|
||||||
//! Prymn messaging framework
|
use std::fmt::Display;
|
||||||
|
|
||||||
use async_nats::{ConnectError, ConnectOptions, SubscribeError};
|
use anyhow::{anyhow, Result};
|
||||||
use thiserror::Error;
|
use bytes::Bytes;
|
||||||
|
use futures::Stream;
|
||||||
|
use serde::Deserialize;
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
|
|
||||||
pub use self::v1::Command;
|
use crate::health::Health;
|
||||||
pub use self::v1::OutgoingMessage;
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Clone)]
|
||||||
pub enum MessagingError<'a> {
|
|
||||||
#[error("failed to connect to message broker")]
|
|
||||||
ConnectError(#[from] ConnectError),
|
|
||||||
#[error("failed to listen for messages from broker")]
|
|
||||||
SubscribeError(#[from] SubscribeError),
|
|
||||||
#[error("unknown subject '{0}'")]
|
|
||||||
UnknownSubject(&'a str),
|
|
||||||
#[error("parse error on message format")]
|
|
||||||
MessageFormatError(#[from] serde_json::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
mod v1 {
|
|
||||||
use bytes::Bytes;
|
|
||||||
use serde::Deserialize;
|
|
||||||
|
|
||||||
use super::MessagingError;
|
|
||||||
|
|
||||||
pub(super) const PREFIX: &'static str = "agents.v1";
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum OutgoingMessage {
|
|
||||||
Health(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl OutgoingMessage {
|
|
||||||
pub fn subject(&self, sender: &str) -> String {
|
|
||||||
match self {
|
|
||||||
OutgoingMessage::Health(_) => format!("{}.{}.health", PREFIX, sender),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_payload(self) -> Bytes {
|
|
||||||
match self {
|
|
||||||
OutgoingMessage::Health(data) => Bytes::from(data.into_bytes()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Command {
|
|
||||||
Exec(ExecArgs),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Command {
|
|
||||||
pub fn from_message<'a>(
|
|
||||||
subject: &'a str,
|
|
||||||
payload: &'a [u8],
|
|
||||||
) -> Result<Self, MessagingError<'a>> {
|
|
||||||
match subject {
|
|
||||||
"exec" => Ok(Command::Exec(serde_json::from_slice(payload)?)),
|
|
||||||
_ => Err(MessagingError::UnknownSubject(subject)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An operating system program execution.
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub(super) struct ExecArgs {
|
|
||||||
user: String,
|
|
||||||
program: String,
|
|
||||||
#[serde(default)]
|
|
||||||
args: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// /// A system update message received when an OS update is requested
|
|
||||||
// #[derive(Deserialize)]
|
|
||||||
// struct SystemUpdate {
|
|
||||||
// /// `true` when a simulated update should occur
|
|
||||||
// simulate: bool,
|
|
||||||
// }
|
|
||||||
|
|
||||||
// /// An agent update message received when an agent binary update is requested
|
|
||||||
// #[derive(Deserialize)]
|
|
||||||
// struct AgentUpdate {
|
|
||||||
// /// `true` when a simulated update should occur
|
|
||||||
// simulate: bool,
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
id: String,
|
id: String,
|
||||||
client: async_nats::Client,
|
nats: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Subject {
|
||||||
|
Health,
|
||||||
|
Exec,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Subject {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Subject::Health => write!(f, "health"),
|
||||||
|
Subject::Exec => write!(f, "exec"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Message {
|
||||||
|
subject: Subject,
|
||||||
|
payload: Bytes,
|
||||||
|
reply: Option<async_nats::Subject>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
|
fn from_transport(msg: async_nats::Message) -> Result<Self> {
|
||||||
|
let suffix = msg.subject.split_terminator('.').last().unwrap_or_default();
|
||||||
|
|
||||||
|
match suffix {
|
||||||
|
"exec" => Ok(Message {
|
||||||
|
subject: Subject::Exec,
|
||||||
|
payload: msg.payload,
|
||||||
|
reply: msg.reply,
|
||||||
|
}),
|
||||||
|
"health" => Ok(Message {
|
||||||
|
subject: Subject::Health,
|
||||||
|
payload: msg.payload,
|
||||||
|
reply: msg.reply,
|
||||||
|
}),
|
||||||
|
_ => Err(anyhow!("unknown subject: {}", msg.subject)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subject(&self) -> &Subject {
|
||||||
|
&self.subject
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_payload<'a, T: Deserialize<'a>>(&'a self) -> Result<T> {
|
||||||
|
Ok(serde_json::from_slice(&self.payload[..])?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn health(health: Health) -> Result<Message> {
|
||||||
|
Ok(Message {
|
||||||
|
subject: Subject::Health,
|
||||||
|
payload: Bytes::from_iter(serde_json::to_vec(&health)?),
|
||||||
|
reply: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
#[tracing::instrument]
|
pub async fn connect(id: &str) -> Result<Self> {
|
||||||
pub async fn connect(id: &str) -> Result<Self, MessagingError> {
|
let nats = async_nats::ConnectOptions::new()
|
||||||
let client = ConnectOptions::new()
|
|
||||||
.name(format!("Prymn Agent {id}"))
|
.name(format!("Prymn Agent {id}"))
|
||||||
.custom_inbox_prefix(format!("_INBOX_{id}"))
|
.custom_inbox_prefix(format!("_INBOX_{id}"))
|
||||||
.user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned())
|
.user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned())
|
||||||
|
@ -105,54 +83,45 @@ impl Client {
|
||||||
|
|
||||||
tracing::debug!("connected to NATS");
|
tracing::debug!("connected to NATS");
|
||||||
|
|
||||||
let prefix = format!("{}.{}.*", v1::PREFIX, id);
|
|
||||||
let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tracing::debug!("v1 command routing subscribed on {}", &prefix);
|
|
||||||
|
|
||||||
while let Some(data) = routing_subscriber.next().await {
|
|
||||||
let subject = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]);
|
|
||||||
|
|
||||||
match Command::from_message(subject, &data.payload) {
|
|
||||||
Ok(command) => {
|
|
||||||
tracing::debug!(?command);
|
|
||||||
// TODO: Think if we might want to queue commands
|
|
||||||
// handle_command(command);
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
tracing::warn!(cause = %err, "failed to parse received message");
|
|
||||||
tracing::debug!(payload = %String::from_utf8_lossy(&data.payload));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
id: id.to_owned(),
|
id: id.to_owned(),
|
||||||
client,
|
nats,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
pub async fn publish(&self, msg: Message) {
|
||||||
pub async fn publish(&self, msg: OutgoingMessage) {
|
let subject = format!("agents.v1.{}.{}", self.id, msg.subject);
|
||||||
let subj = msg.subject(&self.id);
|
self.nats.publish(subject, msg.payload).await.unwrap();
|
||||||
tracing::debug!(subject = &subj, "publish");
|
|
||||||
if let Err(err) = self.client.publish(subj, msg.into_payload()).await {
|
|
||||||
tracing::error!(cause = %err, "publish error");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn reply(&self, msg: Message, mut stream: impl Stream<Item = Bytes> + Unpin) {
|
||||||
|
match msg.reply {
|
||||||
|
Some(reply) => {
|
||||||
|
while let Some(data) = stream.next().await {
|
||||||
|
self.nats.publish(reply.clone(), data.into()).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => tracing::warn!(?msg, "tried to reply to message without a reply subject"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn subscribe(&self) -> Result<impl Stream<Item = Message>> {
|
||||||
|
let subject = format!("agents.v1.{}.*", self.id);
|
||||||
|
|
||||||
|
let subscriber = self
|
||||||
|
.nats
|
||||||
|
.subscribe(subject.clone())
|
||||||
|
.await?
|
||||||
|
.filter_map(move |data| match Message::from_transport(data) {
|
||||||
|
Ok(msg) => Some(msg),
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!("{}", err);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tracing::debug!("subscribed on {}", &subject);
|
||||||
|
|
||||||
|
Ok(subscriber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn receive_requests(agent: Client) {
|
|
||||||
// let prefix = format!("{}.{}.*", v1::PREFIX, id);
|
|
||||||
// let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
|
|
||||||
// let subscriber = agent.subscribe("").await;
|
|
||||||
// tracing::debug!("v1 command routing subscribed on {}", &prefix);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// fn handle_command(command: Command) {
|
|
||||||
// match command {
|
|
||||||
// Command::Exec(_) => todo!(),
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
|
@ -1,10 +1,89 @@
|
||||||
use tower::steer::Steer;
|
use std::process::Stdio;
|
||||||
|
|
||||||
fn router() {
|
use bytes::Bytes;
|
||||||
let services = vec![];
|
use futures::{FutureExt, Stream};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use tokio::process::Command;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
|
|
||||||
let svc = Steer::new(services, |req, services: &[_]| {
|
use crate::{
|
||||||
// do something with req
|
health::Health,
|
||||||
0
|
messaging::{Client, Message, Subject},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn init_services(client: Client) {
|
||||||
|
let mut message_stream = client.subscribe().await.unwrap();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(message) = message_stream.next().await {
|
||||||
|
let client = client.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(err) = handle_message(client, message).await {
|
||||||
|
tracing::warn!("{err}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_message(client: Client, message: Message) -> anyhow::Result<()> {
|
||||||
|
match message.subject() {
|
||||||
|
Subject::Exec => {
|
||||||
|
let stream = exec_handler(message.parse_payload()?).await?;
|
||||||
|
client.reply(message, stream).await;
|
||||||
|
}
|
||||||
|
Subject::Health => {
|
||||||
|
let health: Health = message.parse_payload()?;
|
||||||
|
tracing::info!(?health, "received a health");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An operating system program execution.
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct ExecMessage {
|
||||||
|
user: String,
|
||||||
|
program: String,
|
||||||
|
#[serde(default)]
|
||||||
|
args: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn exec_handler(req: ExecMessage) -> anyhow::Result<impl Stream<Item = Bytes> + Unpin> {
|
||||||
|
// TODO: Tasks should be idempontent
|
||||||
|
// TODO: Root user should be able to run only specific programs
|
||||||
|
let mut cmd = if req.user != "root" {
|
||||||
|
let mut cmd = Command::new("sudo");
|
||||||
|
cmd.arg("-iu").arg(&req.user).arg("--").arg(&req.program);
|
||||||
|
cmd
|
||||||
|
} else {
|
||||||
|
Command::new(&req.program)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut io = cmd
|
||||||
|
.args(&req.args)
|
||||||
|
.stdout(Stdio::piped())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
|
.spawn()?;
|
||||||
|
|
||||||
|
let stdout = FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new())
|
||||||
|
.map(|stdout| Bytes::from(stdout.unwrap()));
|
||||||
|
|
||||||
|
let stderr = FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new())
|
||||||
|
.map(|stderr| Bytes::from(stderr.unwrap()));
|
||||||
|
|
||||||
|
let exit = async move {
|
||||||
|
io.wait()
|
||||||
|
.await
|
||||||
|
.map(|exit| {
|
||||||
|
let exit = exit.to_string();
|
||||||
|
Bytes::from(exit)
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
.into_stream();
|
||||||
|
|
||||||
|
Ok(Box::pin(stdout.merge(stderr).chain(exit)))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue