This commit is contained in:
Nikos Papadakis 2024-01-30 17:40:46 +02:00
parent 46ae6f0163
commit e3611ee15a
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
2 changed files with 57 additions and 29 deletions

View file

@ -3,6 +3,7 @@ use std::fmt::{Debug, Display};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
#[derive(Clone)] #[derive(Clone)]
@ -41,22 +42,10 @@ impl TryFrom<&str> for Subject {
} }
} }
#[derive(Debug)]
pub struct Subscriber {
subject: Subject,
}
#[derive(Debug)]
pub struct Publisher {
subject: Subject,
}
#[derive(Debug)] #[derive(Debug)]
pub struct Message { pub struct Message {
subject: Subject, subject: Subject,
payload: Bytes, payload: Bytes,
subscribers: Vec<Subscriber>,
publishers: Vec<Publisher>,
reply: Option<async_nats::Subject>, reply: Option<async_nats::Subject>,
} }
@ -67,8 +56,6 @@ impl Message {
Ok(Message { Ok(Message {
subject, subject,
subscribers: Vec::default(),
publishers: Vec::default(),
payload: msg.payload, payload: msg.payload,
reply: msg.reply, reply: msg.reply,
}) })
@ -89,6 +76,17 @@ impl Message {
reply: None, reply: None,
}) })
} }
async fn open_subchannels(&self, client: &Client, sender: mpsc::Sender<Self>) {
match self.subject {
Subject::OpenTerminal => {
// let subject = format!("agents.v1.{}.terminal.{}.input");
// client.nats.subscribe(subject).await;
// sender.send(message).await.unwrap();
}
_ => {}
}
}
} }
impl Client { impl Client {
@ -124,14 +122,28 @@ impl Client {
} }
} }
pub async fn subscribe(&self) -> Result<impl Stream<Item = Message>> { pub async fn messages_channel(&self) -> Result<mpsc::Receiver<Message>> {
let subject = format!("agents.v1.{}.*", self.id); let subject = format!("agents.v1.{}.*", self.id);
let (sender, receiver) = mpsc::channel(100);
let mut stream = self.subscribe(subject).await?;
let subscriber = self let self_clone = self.clone();
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
self_clone.clone().open_subchannels(&msg).await;
sender.send(msg).await.unwrap();
}
});
Ok(receiver)
}
async fn subscribe(&self, subject: String) -> Result<impl Stream<Item = Message>> {
let stream = self
.nats .nats
.subscribe(subject.clone()) .subscribe(subject.clone())
.await? .await?
.filter_map(move |data| match Message::from_transport(data) { .filter_map(|data| match Message::from_transport(data) {
Ok(msg) => Some(msg), Ok(msg) => Some(msg),
Err(err) => { Err(err) => {
tracing::warn!("{}", err); tracing::warn!("{}", err);
@ -139,14 +151,33 @@ impl Client {
} }
}); });
// let channel = sync::broadcast::channel(10); tracing::debug!("subscribed on {}", subject);
// while let Some(message) = subscriber.next().await {
// let subscriptions = self.further_subscriptions(message.subject).await;
// }
tracing::debug!("subscribed on {}", &subject); Ok(stream)
}
Ok(subscriber) async fn open_subchannels(self, message: &Message, sender: mpsc::Sender<Message>) {
match message.subject {
Subject::OpenTerminal => {
let terminal_id = "test";
let stream = self
.subscribe(format!(
"agents.v1.{}.terminal.{}.input",
self.id, terminal_id
))
.await
.unwrap();
}
_ => {}
}
}
fn send_messages_from_stream(self, stream: impl Stream<Item = Message> + Send + Unpin) {
tokio::spawn(async {
// while let Some(msg) = stream.next().await {
//
// }
});
} }
} }

View file

@ -47,15 +47,12 @@ where
} }
pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> { pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> {
let mut message_stream = client let mut recv = client.messages_channel().await?;
.subscribe()
.await
.context("could not initialize services system")?;
tokio::spawn(async move { tokio::spawn(async move {
while let Some(message) = message_stream.next().await { while let Some(msg) = recv.recv().await {
// TODO: How do i handle this error? // TODO: How do i handle this error?
if let Err(err) = route_message(message).await { if let Err(err) = route_message(msg).await {
tracing::warn!("{}", err); tracing::warn!("{}", err);
}; };
} }