todo: subscribers and publishers created on messaging

This commit is contained in:
Nikos Papadakis 2024-01-30 01:51:56 +02:00
parent 46b87d2559
commit 46ae6f0163
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
2 changed files with 41 additions and 25 deletions

View file

@ -5,8 +5,6 @@ use bytes::Bytes;
use futures::Stream; use futures::Stream;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::health::Health;
#[derive(Clone)] #[derive(Clone)]
pub struct Client { pub struct Client {
id: String, id: String,
@ -43,19 +41,34 @@ impl TryFrom<&str> for Subject {
} }
} }
#[derive(Debug)]
pub struct Subscriber {
subject: Subject,
}
#[derive(Debug)]
pub struct Publisher {
subject: Subject,
}
#[derive(Debug)] #[derive(Debug)]
pub struct Message { pub struct Message {
subject: Subject, subject: Subject,
payload: Bytes, payload: Bytes,
subscribers: Vec<Subscriber>,
publishers: Vec<Publisher>,
reply: Option<async_nats::Subject>, reply: Option<async_nats::Subject>,
} }
impl Message { impl Message {
fn from_transport(msg: async_nats::Message) -> Result<Self> { fn from_transport(msg: async_nats::Message) -> Result<Self> {
let suffix = msg.subject.split_terminator('.').last().unwrap_or_default(); let suffix = msg.subject.split_terminator('.').last().unwrap_or_default();
let subject = suffix.try_into()?;
Ok(Message { Ok(Message {
subject: suffix.try_into()?, subject,
subscribers: Vec::default(),
publishers: Vec::default(),
payload: msg.payload, payload: msg.payload,
reply: msg.reply, reply: msg.reply,
}) })
@ -69,7 +82,7 @@ impl Message {
self.payload.clone() self.payload.clone()
} }
pub fn health(health: Health) -> Result<Message> { pub fn health(health: crate::health::Health) -> Result<Message> {
Ok(Message { Ok(Message {
subject: Subject::Health, subject: Subject::Health,
payload: Bytes::from_iter(serde_json::to_vec(&health)?), payload: Bytes::from_iter(serde_json::to_vec(&health)?),
@ -126,6 +139,11 @@ impl Client {
} }
}); });
// let channel = sync::broadcast::channel(10);
// while let Some(message) = subscriber.next().await {
// let subscriptions = self.further_subscriptions(message.subject).await;
// }
tracing::debug!("subscribed on {}", &subject); tracing::debug!("subscribed on {}", &subject);
Ok(subscriber) Ok(subscriber)

View file

@ -12,15 +12,32 @@ enum ServiceError {
BodyFormatError, BodyFormatError,
} }
async fn route_message(message: crate::messaging::Message) -> Result<(), ServiceError> {
match message.subject() {
crate::messaging::Subject::Health => {}
crate::messaging::Subject::Exec => {
let ctx = Ctx::with_body(message.body())?;
let _ = self::exec::exec(ctx).await;
}
crate::messaging::Subject::OpenTerminal => {
let ctx = Ctx::with_body(message.body())?;
let _ = self::terminal::open_terminal(ctx).await;
}
}
Ok(())
}
struct Ctx<T> { struct Ctx<T> {
body: T, body: T,
// input_streams:
} }
impl<T> Ctx<T> impl<T> Ctx<T>
where where
T: TryFrom<Bytes>, T: TryFrom<Bytes>,
{ {
fn with_body(client: crate::messaging::Client, body: Bytes) -> Result<Self, ServiceError> { fn with_body(body: Bytes) -> Result<Self, ServiceError> {
Ok(Self { Ok(Self {
body: body body: body
.try_into() .try_into()
@ -29,25 +46,6 @@ where
} }
} }
async fn route_message(
client: crate::messaging::Client,
message: crate::messaging::Message,
) -> Result<(), ServiceError> {
match message.subject() {
crate::messaging::Subject::Health => {}
crate::messaging::Subject::Exec => {
let ctx = Ctx::with_body(client, message.body())?;
let _ = self::exec::exec(ctx).await;
}
crate::messaging::Subject::OpenTerminal => {
let ctx = Ctx::with_body(client, message.body())?;
let _ = self::terminal::open_terminal(ctx).await;
}
}
Ok(())
}
pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> { pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> {
let mut message_stream = client let mut message_stream = client
.subscribe() .subscribe()
@ -57,7 +55,7 @@ pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<(
tokio::spawn(async move { tokio::spawn(async move {
while let Some(message) = message_stream.next().await { while let Some(message) = message_stream.next().await {
// TODO: How do i handle this error? // TODO: How do i handle this error?
if let Err(err) = route_message(client.clone(), message).await { if let Err(err) = route_message(message).await {
tracing::warn!("{}", err); tracing::warn!("{}", err);
}; };
} }