From e3611ee15a4e68e87c4929cb88b3c0033db68228 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Tue, 30 Jan 2024 17:40:46 +0200 Subject: [PATCH] channels --- agent/src/messaging.rs | 77 +++++++++++++++++++++++++++------------ agent/src/services/mod.rs | 9 ++--- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs index 8900709..3b71335 100644 --- a/agent/src/messaging.rs +++ b/agent/src/messaging.rs @@ -3,6 +3,7 @@ use std::fmt::{Debug, Display}; use anyhow::{anyhow, Result}; use bytes::Bytes; use futures::Stream; +use tokio::sync::mpsc; use tokio_stream::StreamExt; #[derive(Clone)] @@ -41,22 +42,10 @@ impl TryFrom<&str> for Subject { } } -#[derive(Debug)] -pub struct Subscriber { - subject: Subject, -} - -#[derive(Debug)] -pub struct Publisher { - subject: Subject, -} - #[derive(Debug)] pub struct Message { subject: Subject, payload: Bytes, - subscribers: Vec, - publishers: Vec, reply: Option, } @@ -67,8 +56,6 @@ impl Message { Ok(Message { subject, - subscribers: Vec::default(), - publishers: Vec::default(), payload: msg.payload, reply: msg.reply, }) @@ -89,6 +76,17 @@ impl Message { reply: None, }) } + + async fn open_subchannels(&self, client: &Client, sender: mpsc::Sender) { + match self.subject { + Subject::OpenTerminal => { + // let subject = format!("agents.v1.{}.terminal.{}.input"); + // client.nats.subscribe(subject).await; + // sender.send(message).await.unwrap(); + } + _ => {} + } + } } impl Client { @@ -124,14 +122,28 @@ impl Client { } } - pub async fn subscribe(&self) -> Result> { + pub async fn messages_channel(&self) -> Result> { let subject = format!("agents.v1.{}.*", self.id); + let (sender, receiver) = mpsc::channel(100); + let mut stream = self.subscribe(subject).await?; - let subscriber = self + let self_clone = self.clone(); + tokio::spawn(async move { + while let Some(msg) = stream.next().await { + self_clone.clone().open_subchannels(&msg).await; + sender.send(msg).await.unwrap(); + } + }); + + Ok(receiver) + } + + async fn subscribe(&self, subject: String) -> Result> { + let stream = self .nats .subscribe(subject.clone()) .await? - .filter_map(move |data| match Message::from_transport(data) { + .filter_map(|data| match Message::from_transport(data) { Ok(msg) => Some(msg), Err(err) => { tracing::warn!("{}", err); @@ -139,14 +151,33 @@ impl Client { } }); - // let channel = sync::broadcast::channel(10); - // while let Some(message) = subscriber.next().await { - // let subscriptions = self.further_subscriptions(message.subject).await; - // } + tracing::debug!("subscribed on {}", subject); - tracing::debug!("subscribed on {}", &subject); + Ok(stream) + } - Ok(subscriber) + async fn open_subchannels(self, message: &Message, sender: mpsc::Sender) { + match message.subject { + Subject::OpenTerminal => { + let terminal_id = "test"; + let stream = self + .subscribe(format!( + "agents.v1.{}.terminal.{}.input", + self.id, terminal_id + )) + .await + .unwrap(); + } + _ => {} + } + } + + fn send_messages_from_stream(self, stream: impl Stream + Send + Unpin) { + tokio::spawn(async { + // while let Some(msg) = stream.next().await { + // + // } + }); } } diff --git a/agent/src/services/mod.rs b/agent/src/services/mod.rs index f3a5bc2..663e4da 100644 --- a/agent/src/services/mod.rs +++ b/agent/src/services/mod.rs @@ -47,15 +47,12 @@ where } pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> { - let mut message_stream = client - .subscribe() - .await - .context("could not initialize services system")?; + let mut recv = client.messages_channel().await?; tokio::spawn(async move { - while let Some(message) = message_stream.next().await { + while let Some(msg) = recv.recv().await { // TODO: How do i handle this error? - if let Err(err) = route_message(message).await { + if let Err(err) = route_message(msg).await { tracing::warn!("{}", err); }; }