what am i doing

This commit is contained in:
Nikos Papadakis 2024-01-30 23:34:19 +02:00
parent e3611ee15a
commit b7a29405f0
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
6 changed files with 133 additions and 87 deletions

1
Cargo.lock generated
View file

@ -709,6 +709,7 @@ dependencies = [
"bytes",
"chrono",
"futures",
"once_cell",
"rustix",
"serde",
"serde_json",

View file

@ -9,6 +9,7 @@ async-nats = "0.33.0"
bytes = "1.5.0"
chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] }
futures = { version = "0.3.30", default-features = false, features = ["std"] }
once_cell = "1.19.0"
rustix = { version = "0.38.30", features = ["termios", "stdio", "pty", "process"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"

View file

@ -5,7 +5,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::messaging::{Client, Message};
use crate::messaging::Message;
const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
@ -171,7 +171,7 @@ impl Default for HealthMonitor {
}
}
pub async fn init_health_subsystem(client: Client) -> HealthMonitor {
pub async fn init_health_subsystem(client: crate::messaging::Client) -> HealthMonitor {
let health_monitor = HealthMonitor::new();
let health_monitor_clone = health_monitor.clone();
let health_monitor_ret = health_monitor.clone();

View file

@ -1,4 +1,7 @@
use std::fmt::{Debug, Display};
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use anyhow::{anyhow, Result};
use bytes::Bytes;
@ -6,11 +9,7 @@ use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
#[derive(Clone)]
pub struct Client {
id: String,
nats: async_nats::Client,
}
const PREFIX: &'static str = "agents.v1";
#[derive(Debug)]
pub enum Subject {
@ -46,18 +45,19 @@ impl TryFrom<&str> for Subject {
pub struct Message {
subject: Subject,
payload: Bytes,
reply: Option<async_nats::Subject>,
// reply: Option<async_nats::Subject>,
}
impl Message {
fn from_transport(msg: async_nats::Message) -> Result<Self> {
let suffix = msg.subject.split_terminator('.').last().unwrap_or_default();
let subject = suffix.try_into()?;
fn from_transport(subject: &str, payload: Bytes) -> Result<Self> {
let subject = subject.try_into()?;
// msg.headers.unwrap().get("x-idempotent-key");
Ok(Message {
subject,
payload: msg.payload,
reply: msg.reply,
payload,
// reply: None,
})
}
@ -73,20 +73,16 @@ impl Message {
Ok(Message {
subject: Subject::Health,
payload: Bytes::from_iter(serde_json::to_vec(&health)?),
reply: None,
// reply: None,
})
}
}
async fn open_subchannels(&self, client: &Client, sender: mpsc::Sender<Self>) {
match self.subject {
Subject::OpenTerminal => {
// let subject = format!("agents.v1.{}.terminal.{}.input");
// client.nats.subscribe(subject).await;
// sender.send(message).await.unwrap();
}
_ => {}
}
}
#[derive(Clone)]
pub struct Client {
id: Arc<String>,
nats: async_nats::Client,
subj_prefix: Arc<String>,
}
impl Client {
@ -101,8 +97,9 @@ impl Client {
tracing::debug!("connected to NATS");
Ok(Self {
id: id.to_owned(),
nats,
id: Arc::new(id.to_owned()),
subj_prefix: Arc::new(format!("{}.{}", PREFIX, id)),
})
}
@ -111,26 +108,29 @@ impl Client {
self.nats.publish(subject, msg.payload).await.unwrap();
}
pub async fn reply(&self, msg: Message, mut stream: impl Stream<Item = Bytes> + Unpin) {
match msg.reply {
Some(reply) => {
while let Some(data) = stream.next().await {
self.nats.publish(reply.clone(), data.into()).await.unwrap();
}
}
None => tracing::warn!(?msg, "tried to reply to message without a reply subject"),
}
}
// pub async fn reply(&self, msg: Message, mut stream: impl Stream<Item = Bytes> + Unpin) {
// match msg.reply {
// Some(reply) => {
// while let Some(data) = stream.next().await {
// self.nats.publish(reply.clone(), data.into()).await.unwrap();
// }
// }
// None => tracing::warn!(?msg, "tried to reply to message without a reply subject"),
// }
// }
pub async fn messages_channel(&self) -> Result<mpsc::Receiver<Message>> {
let subject = format!("agents.v1.{}.*", self.id);
let (sender, receiver) = mpsc::channel(100);
let mut stream = self.subscribe(subject).await?;
let mut stream = self.subscribe("*").await?;
let self_clone = self.clone();
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
self_clone.clone().open_subchannels(&msg).await;
self_clone
.clone()
.open_subchannels(&msg, sender.clone())
.await;
sender.send(msg).await.unwrap();
}
});
@ -138,16 +138,22 @@ impl Client {
Ok(receiver)
}
async fn subscribe(&self, subject: String) -> Result<impl Stream<Item = Message>> {
async fn subscribe(&self, subject: &str) -> Result<impl Stream<Item = Message>> {
let prefix = Arc::clone(&self.subj_prefix);
let stream = self
.nats
.subscribe(subject.clone())
.subscribe(format!("{}.{}", self.subj_prefix, subject))
.await?
.filter_map(|data| match Message::from_transport(data) {
Ok(msg) => Some(msg),
Err(err) => {
tracing::warn!("{}", err);
None
.filter_map(move |data| {
let subject = data.subject[..].trim_start_matches(&*prefix);
match Message::from_transport(subject, data.payload) {
Ok(msg) => Some(msg),
Err(err) => {
tracing::warn!("{}", err);
None
}
}
});
@ -156,29 +162,29 @@ impl Client {
Ok(stream)
}
async fn open_subchannels(self, message: &Message, sender: mpsc::Sender<Message>) {
async fn open_subchannels(&self, message: &Message, sender: mpsc::Sender<Message>) {
fn send_messages_from_stream(
mut stream: impl Stream<Item = Message> + Send + Unpin + 'static,
sender: mpsc::Sender<Message>,
) {
tokio::spawn(async move {
while let Some(message) = stream.next().await {
sender.send(message).await.unwrap();
}
});
}
match message.subject {
Subject::OpenTerminal => {
let terminal_id = "test";
let stream = self
.subscribe(format!(
"agents.v1.{}.terminal.{}.input",
self.id, terminal_id
))
.await
.unwrap();
let input_stream = self.subscribe("terminal.key.input").await.unwrap();
let resize_stream = self.subscribe("terminal.key.resize").await.unwrap();
send_messages_from_stream(input_stream, sender.clone());
send_messages_from_stream(resize_stream, sender);
}
_ => {}
}
}
fn send_messages_from_stream(self, stream: impl Stream<Item = Message> + Send + Unpin) {
tokio::spawn(async {
// while let Some(msg) = stream.next().await {
//
// }
});
}
}
impl Debug for Client {

View file

@ -1,7 +1,7 @@
use anyhow::Context;
use std::collections::HashMap;
use bytes::Bytes;
use thiserror::Error;
use tokio_stream::StreamExt;
mod exec;
mod terminal;
@ -12,6 +12,20 @@ enum ServiceError {
BodyFormatError,
}
struct Service {
terminals: HashMap<String, ()>,
}
impl Service {
fn serve(&mut self, message: crate::messaging::Message) {
match message.subject() {
crate::messaging::Subject::OpenTerminal => {}
crate::messaging::Subject::Exec => todo!(),
crate::messaging::Subject::Health => todo!(),
}
}
}
async fn route_message(message: crate::messaging::Message) -> Result<(), ServiceError> {
match message.subject() {
crate::messaging::Subject::Health => {}
@ -30,7 +44,7 @@ async fn route_message(message: crate::messaging::Message) -> Result<(), Service
struct Ctx<T> {
body: T,
// input_streams:
terminals: HashMap<String, (crate::pty::Pty, tokio::process::Child)>,
}
impl<T> Ctx<T>
@ -38,10 +52,13 @@ where
T: TryFrom<Bytes>,
{
fn with_body(body: Bytes) -> Result<Self, ServiceError> {
let body = body
.try_into()
.map_err(|_err| ServiceError::BodyFormatError)?;
Ok(Self {
body: body
.try_into()
.map_err(|_err| ServiceError::BodyFormatError)?,
body,
terminals: HashMap::default(),
})
}
}

View file

@ -1,38 +1,44 @@
use std::convert::Infallible;
use bytes::Bytes;
use futures::Stream;
use serde::Deserialize;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use crate::pty::open_shell;
use super::Ctx;
#[derive(Debug, Deserialize)]
pub struct OpenTerminalMessage {
id: String,
}
pub struct OpenTerminalMessage(Bytes);
impl TryFrom<Bytes> for OpenTerminalMessage {
type Error = serde_json::Error;
type Error = Infallible;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
serde_json::from_slice(&value[..])
Ok(Self(value))
}
}
pub async fn open_terminal(ctx: Ctx<OpenTerminalMessage>) -> anyhow::Result<()> {
#[derive(Debug, Deserialize)]
pub struct TerminalInput(Bytes);
impl TryFrom<Bytes> for TerminalInput {
type Error = Infallible;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
Ok(Self(value))
}
}
pub async fn open_terminal(
mut ctx: Ctx<OpenTerminalMessage>,
) -> anyhow::Result<Ctx<OpenTerminalMessage>> {
let pty = crate::pty::Pty::open()?;
let mut pty_clone = pty.try_clone()?;
let shell = open_shell(pty.child()?, "/bin/bash")?;
tokio::spawn(async move {
while let Some(data) = tokio_stream::once(b"foo").next().await {
if let Err(err) = pty_clone.write_all(&data[..]).await {
tracing::warn!(%err, "pseudoterminal write error");
}
}
});
let _out_stream = FramedRead::new(pty, BytesCodec::new()).filter_map(|inner| {
let _out_stream = FramedRead::new(pty.try_clone()?, BytesCodec::new()).filter_map(|inner| {
inner
.map(|bytes| bytes.freeze())
.map_err(|err| {
@ -41,5 +47,20 @@ pub async fn open_terminal(ctx: Ctx<OpenTerminalMessage>) -> anyhow::Result<()>
.ok()
});
Ok(())
ctx.terminals.insert(String::from("test"), (pty, shell));
Ok(ctx)
}
pub async fn terminal_input(
terminal_id: &str,
mut ctx: Ctx<TerminalInput>,
) -> anyhow::Result<Ctx<TerminalInput>> {
let (pty, _) = ctx.terminals.get_mut(terminal_id).unwrap();
if let Err(err) = pty.write_all(&ctx.body.0[..]).await {
tracing::warn!(%err, "pseudoterminal write error");
}
Ok(ctx)
}