todo: services

This commit is contained in:
Nikos Papadakis 2024-01-27 23:01:59 +02:00
parent 5a6ad517d2
commit e2c50ca174
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
6 changed files with 119 additions and 38 deletions

29
Cargo.lock generated
View file

@ -672,6 +672,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tower",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -1195,12 +1196,40 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.40" version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [ dependencies = [
"log",
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
"tracing-core", "tracing-core",

View file

@ -13,5 +13,6 @@ sysinfo = { version = "0.30.5", default-features = false }
thiserror = "1.0.56" thiserror = "1.0.56"
tokio = { version = "1.35.1", features = ["full"] } tokio = { version = "1.35.1", features = ["full"] }
tokio-stream = { version = "0.1.14", default-features = false } tokio-stream = { version = "0.1.14", default-features = false }
tower = { version = "0.4.13", features = ["steer", "util"] }
tracing = "0.1.40" tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["fmt"] } tracing-subscriber = { version = "0.3.18", features = ["fmt"] }

View file

@ -27,7 +27,7 @@ impl System {
self.sys.refresh_specifics( self.sys.refresh_specifics(
RefreshKind::new() RefreshKind::new()
.with_memory(MemoryRefreshKind::new().with_ram().with_swap()) .with_memory(MemoryRefreshKind::everything())
.with_cpu(CpuRefreshKind::everything()), .with_cpu(CpuRefreshKind::everything()),
); );

View file

@ -4,6 +4,7 @@ use tracing::Level;
mod health; mod health;
mod messaging; mod messaging;
mod services;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -21,16 +22,17 @@ async fn main() -> anyhow::Result<()> {
} }
async fn run() -> anyhow::Result<()> { async fn run() -> anyhow::Result<()> {
let agent = messaging::Agent::connect("demo_agent").await?; let agent = messaging::Client::connect("demo_agent").await?;
tracing::info!("initialized messaging system"); tracing::info!("initialized messaging system");
init_health_subsystem(agent.clone()); init_health_subsystem(agent.clone());
tracing::info!("initialized health system"); tracing::info!("initialized health system");
// services::init_services().await;
tracing::info!("agent is ready"); tracing::info!("agent is ready");
tokio::signal::ctrl_c().await?; tokio::signal::ctrl_c().await?;
Ok(()) Ok(())
} }
fn init_health_subsystem(agent: messaging::Agent) { fn init_health_subsystem(agent: messaging::Client) {
let mut system = health::System::new(); let mut system = health::System::new();
let health_monitor = health::HealthMonitor::new(); let health_monitor = health::HealthMonitor::new();
let health_monitor_clone = health_monitor.clone(); let health_monitor_clone = health_monitor.clone();

View file

@ -1,14 +1,29 @@
//! Prymn messaging framework //! Prymn messaging framework
use async_nats::SubscribeError; use async_nats::{ConnectError, ConnectOptions, SubscribeError};
use async_nats::{Client, ConnectError, ConnectOptions};
use thiserror::Error; use thiserror::Error;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
pub use self::v1::Command;
pub use self::v1::OutgoingMessage; pub use self::v1::OutgoingMessage;
#[derive(Debug, Error)]
pub enum MessagingError<'a> {
#[error("failed to connect to message broker")]
ConnectError(#[from] ConnectError),
#[error("failed to listen for messages from broker")]
SubscribeError(#[from] SubscribeError),
#[error("unknown subject '{0}'")]
UnknownSubject(&'a str),
#[error("parse error on message format")]
MessageFormatError(#[from] serde_json::Error),
}
mod v1 { mod v1 {
use bytes::Bytes; use bytes::Bytes;
use serde::Deserialize;
use super::MessagingError;
pub(super) const PREFIX: &'static str = "agents.v1"; pub(super) const PREFIX: &'static str = "agents.v1";
@ -31,10 +46,31 @@ mod v1 {
} }
} }
// #[derive(Debug)] #[derive(Debug)]
// pub enum Subject { pub enum Command {
// Command, Exec(ExecArgs),
// } }
impl Command {
pub fn from_message<'a>(
subject: &'a str,
payload: &'a [u8],
) -> Result<Self, MessagingError<'a>> {
match subject {
"exec" => Ok(Command::Exec(serde_json::from_slice(payload)?)),
_ => Err(MessagingError::UnknownSubject(subject)),
}
}
}
/// An operating system program execution.
#[derive(Debug, Deserialize)]
pub(super) struct ExecArgs {
user: String,
program: String,
#[serde(default)]
args: Vec<String>,
}
// /// A system update message received when an OS update is requested // /// A system update message received when an OS update is requested
// #[derive(Deserialize)] // #[derive(Deserialize)]
@ -49,38 +85,15 @@ mod v1 {
// /// `true` when a simulated update should occur // /// `true` when a simulated update should occur
// simulate: bool, // simulate: bool,
// } // }
// /// An operating system program execution.
// #[derive(Deserialize)]
// struct Command<'a> {
// /// The user to which the child process will switch to
// user: &'a str,
// /// Program name or path
// program: &'a str,
// /// Program arguments
// args: Vec<&'a str>,
// }
// struct OpenTerminal {
// }
}
#[derive(Debug, Error)]
pub enum MessagingError {
#[error("failure connecting to message broker")]
ConnectError(#[from] ConnectError),
#[error("failure listening for messages from broker")]
SubscribeError(#[from] SubscribeError),
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Agent { pub struct Client {
id: String, id: String,
client: Client, client: async_nats::Client,
} }
impl Agent { impl Client {
#[tracing::instrument] #[tracing::instrument]
pub async fn connect(id: &str) -> Result<Self, MessagingError> { pub async fn connect(id: &str) -> Result<Self, MessagingError> {
let client = ConnectOptions::new() let client = ConnectOptions::new()
@ -91,14 +104,27 @@ impl Agent {
.await?; .await?;
tracing::debug!("connected to NATS"); tracing::debug!("connected to NATS");
let prefix = format!("{}.{}.*", v1::PREFIX, id); let prefix = format!("{}.{}.*", v1::PREFIX, id);
let mut routing_subscriber = client.subscribe(prefix.clone()).await?; let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
tracing::debug!("root message routing initialized on {}", &prefix);
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("v1 command routing subscribed on {}", &prefix);
while let Some(data) = routing_subscriber.next().await { while let Some(data) = routing_subscriber.next().await {
let command = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]); let subject = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]);
dbg!(command);
match Command::from_message(subject, &data.payload) {
Ok(command) => {
tracing::debug!(?command);
// TODO: Think if we might want to queue commands
// handle_command(command);
}
Err(err) => {
tracing::warn!(cause = %err, "failed to parse received message");
tracing::debug!(payload = %String::from_utf8_lossy(&data.payload));
}
}
} }
}); });
@ -117,3 +143,16 @@ impl Agent {
} }
} }
} }
// pub fn receive_requests(agent: Client) {
// let prefix = format!("{}.{}.*", v1::PREFIX, id);
// let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
// let subscriber = agent.subscribe("").await;
// tracing::debug!("v1 command routing subscribed on {}", &prefix);
// }
//
// fn handle_command(command: Command) {
// match command {
// Command::Exec(_) => todo!(),
// }
// }

10
agent/src/services.rs Normal file
View file

@ -0,0 +1,10 @@
use tower::steer::Steer;
fn router() {
let services = vec![];
let svc = Steer::new(services, |req, services: &[_]| {
// do something with req
0
});
}