From 46ae6f0163ce90f9e50908fa69d7ae709bd63e89 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Tue, 30 Jan 2024 01:51:56 +0200 Subject: [PATCH] todo: subscribers and publishers created on messaging --- agent/src/messaging.rs | 26 +++++++++++++++++++++---- agent/src/services/mod.rs | 40 +++++++++++++++++++-------------------- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs index 77b988d..8900709 100644 --- a/agent/src/messaging.rs +++ b/agent/src/messaging.rs @@ -5,8 +5,6 @@ use bytes::Bytes; use futures::Stream; use tokio_stream::StreamExt; -use crate::health::Health; - #[derive(Clone)] pub struct Client { id: String, @@ -43,19 +41,34 @@ impl TryFrom<&str> for Subject { } } +#[derive(Debug)] +pub struct Subscriber { + subject: Subject, +} + +#[derive(Debug)] +pub struct Publisher { + subject: Subject, +} + #[derive(Debug)] pub struct Message { subject: Subject, payload: Bytes, + subscribers: Vec, + publishers: Vec, reply: Option, } impl Message { fn from_transport(msg: async_nats::Message) -> Result { let suffix = msg.subject.split_terminator('.').last().unwrap_or_default(); + let subject = suffix.try_into()?; Ok(Message { - subject: suffix.try_into()?, + subject, + subscribers: Vec::default(), + publishers: Vec::default(), payload: msg.payload, reply: msg.reply, }) @@ -69,7 +82,7 @@ impl Message { self.payload.clone() } - pub fn health(health: Health) -> Result { + pub fn health(health: crate::health::Health) -> Result { Ok(Message { subject: Subject::Health, payload: Bytes::from_iter(serde_json::to_vec(&health)?), @@ -126,6 +139,11 @@ impl Client { } }); + // let channel = sync::broadcast::channel(10); + // while let Some(message) = subscriber.next().await { + // let subscriptions = self.further_subscriptions(message.subject).await; + // } + tracing::debug!("subscribed on {}", &subject); Ok(subscriber) diff --git a/agent/src/services/mod.rs b/agent/src/services/mod.rs index 876657c..f3a5bc2 100644 --- a/agent/src/services/mod.rs +++ b/agent/src/services/mod.rs @@ -12,15 +12,32 @@ enum ServiceError { BodyFormatError, } +async fn route_message(message: crate::messaging::Message) -> Result<(), ServiceError> { + match message.subject() { + crate::messaging::Subject::Health => {} + crate::messaging::Subject::Exec => { + let ctx = Ctx::with_body(message.body())?; + let _ = self::exec::exec(ctx).await; + } + crate::messaging::Subject::OpenTerminal => { + let ctx = Ctx::with_body(message.body())?; + let _ = self::terminal::open_terminal(ctx).await; + } + } + + Ok(()) +} + struct Ctx { body: T, + // input_streams: } impl Ctx where T: TryFrom, { - fn with_body(client: crate::messaging::Client, body: Bytes) -> Result { + fn with_body(body: Bytes) -> Result { Ok(Self { body: body .try_into() @@ -29,25 +46,6 @@ where } } -async fn route_message( - client: crate::messaging::Client, - message: crate::messaging::Message, -) -> Result<(), ServiceError> { - match message.subject() { - crate::messaging::Subject::Health => {} - crate::messaging::Subject::Exec => { - let ctx = Ctx::with_body(client, message.body())?; - let _ = self::exec::exec(ctx).await; - } - crate::messaging::Subject::OpenTerminal => { - let ctx = Ctx::with_body(client, message.body())?; - let _ = self::terminal::open_terminal(ctx).await; - } - } - - Ok(()) -} - pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> { let mut message_stream = client .subscribe() @@ -57,7 +55,7 @@ pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<( tokio::spawn(async move { while let Some(message) = message_stream.next().await { // TODO: How do i handle this error? - if let Err(err) = route_message(client.clone(), message).await { + if let Err(err) = route_message(message).await { tracing::warn!("{}", err); }; }