diff --git a/Cargo.lock b/Cargo.lock
index 841d610..979fbae 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -144,6 +144,16 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "chrono"
+version = "0.4.33"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
+dependencies = [
+ "num-traits",
+ "serde",
+]
+
 [[package]]
 name = "const-oid"
 version = "0.9.6"
@@ -515,6 +525,15 @@ dependencies = [
  "rand",
 ]
 
+[[package]]
+name = "num-traits"
+version = "0.2.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c"
+dependencies = [
+ "autocfg",
+]
+
 [[package]]
 name = "num_cpus"
 version = "1.16.0"
@@ -666,13 +685,15 @@ dependencies = [
  "anyhow",
  "async-nats",
  "bytes",
+ "chrono",
+ "futures",
  "serde",
  "serde_json",
  "sysinfo",
  "thiserror",
  "tokio",
  "tokio-stream",
- "tower",
+ "tokio-util",
  "tracing",
  "tracing-subscriber",
 ]
@@ -1197,39 +1218,25 @@ dependencies = [
 ]
 
 [[package]]
-name = "tower"
-version = "0.4.13"
+name = "tokio-util"
+version = "0.7.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
+checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
 dependencies = [
+ "bytes",
  "futures-core",
- "futures-util",
- "pin-project",
+ "futures-sink",
  "pin-project-lite",
- "tower-layer",
- "tower-service",
+ "tokio",
  "tracing",
 ]
 
-[[package]]
-name = "tower-layer"
-version = "0.3.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
-
-[[package]]
-name = "tower-service"
-version = "0.3.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
-
 [[package]]
 name = "tracing"
 version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "log",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
diff --git a/agent/Cargo.toml b/agent/Cargo.toml
index 4c24298..c19ef3c 100644
--- a/agent/Cargo.toml
+++ b/agent/Cargo.toml
@@ -7,12 +7,14 @@ edition = "2021"
 anyhow = "1.0.79"
 async-nats = "0.33.0"
 bytes = "1.5.0"
+chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] }
+futures = { version = "0.3.30", default-features = false, features = ["std"] }
 serde = { version = "1.0.195", features = ["derive"] }
 serde_json = "1.0.111"
 sysinfo = { version = "0.30.5", default-features = false }
 thiserror = "1.0.56"
 tokio = { version = "1.35.1", features = ["full"] }
 tokio-stream = { version = "0.1.14", default-features = false }
-tower = { version = "0.4.13", features = ["steer", "util"] }
+tokio-util = { version = "0.7.10", features = ["codec"] }
 tracing = "0.1.40"
 tracing-subscriber = { version = "0.3.18", features = ["fmt"] }
diff --git a/agent/src/health.rs b/agent/src/health.rs
index 1183cf0..d9cbc72 100644
--- a/agent/src/health.rs
+++ b/agent/src/health.rs
@@ -1,10 +1,12 @@
 //! System health information and checking
 
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
 
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
 use tokio::sync::watch;
 
+use crate::messaging::{Client, Message};
+
 const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
 const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
 const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
@@ -43,14 +45,14 @@ impl System {
     }
 }
 
-#[derive(Debug, Default, PartialEq, Serialize)]
+#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
 pub enum Status {
     #[default]
     Normal,
     Critical,
 }
 
-#[derive(Debug, Default, Serialize)]
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
 pub struct Health {
     cpu_status: Status,
     memory_status: Status,
@@ -126,3 +128,30 @@ impl Default for HealthMonitor {
         Self::new()
     }
 }
+
+pub async fn init_health_subsystem(client: Client) -> HealthMonitor {
+    let health_monitor = HealthMonitor::new();
+    let health_monitor_clone = health_monitor.clone();
+    let health_monitor_ret = health_monitor.clone();
+    let mut system = System::new();
+
+    // Forever refresh system resources and monitor changes
+    std::thread::spawn(move || loop {
+        const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
+        system.refresh_resources();
+        health_monitor.check_system(&system);
+        std::thread::sleep(REFRESH_INTERVAL);
+    });
+
+    tokio::spawn(async move {
+        let mut recv = health_monitor_clone.monitor();
+
+        while let Ok(()) = recv.changed().await {
+            tracing::info!(health = ?&*recv.borrow(), "health watermark");
+            let health = recv.borrow().clone();
+            client.publish(Message::health(health).unwrap()).await;
+        }
+    });
+
+    health_monitor_ret
+}
diff --git a/agent/src/main.rs b/agent/src/main.rs
index 0aa7177..6c86a03 100644
--- a/agent/src/main.rs
+++ b/agent/src/main.rs
@@ -1,5 +1,6 @@
-use std::time::Duration;
-
+use health::init_health_subsystem;
+use messaging::Client;
+use services::init_services;
 use tracing::Level;
 
 mod health;
@@ -22,37 +23,15 @@ async fn main() -> anyhow::Result<()> {
 }
 
 async fn run() -> anyhow::Result<()> {
-    let agent = messaging::Client::connect("demo_agent").await?;
-    tracing::info!("initialized messaging system");
-    init_health_subsystem(agent.clone());
+    let client = Client::connect("demo_agent").await?;
+
+    let _health_monitor = init_health_subsystem(client.clone()).await;
     tracing::info!("initialized health system");
-    // services::init_services().await;
+
+    init_services(client).await;
+    tracing::info!("initialized services");
+
     tracing::info!("agent is ready");
     tokio::signal::ctrl_c().await?;
     Ok(())
 }
-
-fn init_health_subsystem(agent: messaging::Client) {
-    let mut system = health::System::new();
-    let health_monitor = health::HealthMonitor::new();
-    let health_monitor_clone = health_monitor.clone();
-
-    // Forever refresh system resources and monitor changes
-    std::thread::spawn(move || loop {
-        const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
-        system.refresh_resources();
-        health_monitor.check_system(&system);
-        std::thread::sleep(REFRESH_INTERVAL);
-    });
-
-    tokio::spawn(async move {
-        let mut recv = health_monitor_clone.monitor();
-
-        while let Ok(()) = recv.changed().await {
-            tracing::info!(health = ?&*recv.borrow(), "health watermark");
-            let payload = serde_json::to_string(&*recv.borrow()).expect("serializable health");
-            let message = messaging::OutgoingMessage::Health(payload);
-            agent.publish(message).await;
-        }
-    });
-}
diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs
index 7a549dd..49b4f85 100644
--- a/agent/src/messaging.rs
+++ b/agent/src/messaging.rs
@@ -1,102 +1,80 @@
-//! Prymn messaging framework
+use std::fmt::Display;
 
-use async_nats::{ConnectError, ConnectOptions, SubscribeError};
-use thiserror::Error;
+use anyhow::{anyhow, Result};
+use bytes::Bytes;
+use futures::Stream;
+use serde::Deserialize;
 use tokio_stream::StreamExt;
 
-pub use self::v1::Command;
-pub use self::v1::OutgoingMessage;
+use crate::health::Health;
 
-#[derive(Debug, Error)]
-pub enum MessagingError<'a> {
-    #[error("failed to connect to message broker")]
-    ConnectError(#[from] ConnectError),
-    #[error("failed to listen for messages from broker")]
-    SubscribeError(#[from] SubscribeError),
-    #[error("unknown subject '{0}'")]
-    UnknownSubject(&'a str),
-    #[error("parse error on message format")]
-    MessageFormatError(#[from] serde_json::Error),
-}
-
-mod v1 {
-    use bytes::Bytes;
-    use serde::Deserialize;
-
-    use super::MessagingError;
-
-    pub(super) const PREFIX: &'static str = "agents.v1";
-
-    #[derive(Debug)]
-    pub enum OutgoingMessage {
-        Health(String),
-    }
-
-    impl OutgoingMessage {
-        pub fn subject(&self, sender: &str) -> String {
-            match self {
-                OutgoingMessage::Health(_) => format!("{}.{}.health", PREFIX, sender),
-            }
-        }
-
-        pub fn into_payload(self) -> Bytes {
-            match self {
-                OutgoingMessage::Health(data) => Bytes::from(data.into_bytes()),
-            }
-        }
-    }
-
-    #[derive(Debug)]
-    pub enum Command {
-        Exec(ExecArgs),
-    }
-
-    impl Command {
-        pub fn from_message<'a>(
-            subject: &'a str,
-            payload: &'a [u8],
-        ) -> Result<Self, MessagingError<'a>> {
-            match subject {
-                "exec" => Ok(Command::Exec(serde_json::from_slice(payload)?)),
-                _ => Err(MessagingError::UnknownSubject(subject)),
-            }
-        }
-    }
-
-    /// An operating system program execution.
-    #[derive(Debug, Deserialize)]
-    pub(super) struct ExecArgs {
-        user: String,
-        program: String,
-        #[serde(default)]
-        args: Vec<String>,
-    }
-
-    // /// A system update message received when an OS update is requested
-    // #[derive(Deserialize)]
-    // struct SystemUpdate {
-    //     /// `true` when a simulated update should occur
-    //     simulate: bool,
-    // }
-
-    // /// An agent update message received when an agent binary update is requested
-    // #[derive(Deserialize)]
-    // struct AgentUpdate {
-    //     /// `true` when a simulated update should occur
-    //     simulate: bool,
-    // }
-}
-
-#[derive(Debug, Clone)]
+#[derive(Clone)]
 pub struct Client {
     id: String,
-    client: async_nats::Client,
+    nats: async_nats::Client,
+}
+
+#[derive(Debug)]
+pub enum Subject {
+    Health,
+    Exec,
+}
+
+impl Display for Subject {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Subject::Health => write!(f, "health"),
+            Subject::Exec => write!(f, "exec"),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Message {
+    subject: Subject,
+    payload: Bytes,
+    reply: Option<async_nats::Subject>,
+}
+
+impl Message {
+    fn from_transport(msg: async_nats::Message) -> Result<Self> {
+        let suffix = msg.subject.split_terminator('.').last().unwrap_or_default();
+
+        match suffix {
+            "exec" => Ok(Message {
+                subject: Subject::Exec,
+                payload: msg.payload,
+                reply: msg.reply,
+            }),
+            "health" => Ok(Message {
+                subject: Subject::Health,
+                payload: msg.payload,
+                reply: msg.reply,
+            }),
+            _ => Err(anyhow!("unknown subject: {}", msg.subject)),
+        }
+    }
+
+    pub fn subject(&self) -> &Subject {
+        &self.subject
+    }
+
+    pub fn parse_payload<'a, T: Deserialize<'a>>(&'a self) -> Result<T> {
+        Ok(serde_json::from_slice(&self.payload[..])?)
+    }
+
+    pub fn health(health: Health) -> Result<Message> {
+        Ok(Message {
+            subject: Subject::Health,
+            payload: Bytes::from_iter(serde_json::to_vec(&health)?),
+            reply: None,
+        })
+    }
 }
 
 impl Client {
-    #[tracing::instrument]
-    pub async fn connect(id: &str) -> Result<Self, MessagingError> {
-        let client = ConnectOptions::new()
+    pub async fn connect(id: &str) -> Result<Self> {
+        let nats = async_nats::ConnectOptions::new()
             .name(format!("Prymn Agent {id}"))
             .custom_inbox_prefix(format!("_INBOX_{id}"))
             .user_and_password("demo_agent".to_owned(), "demo_agent_password".to_owned())
@@ -105,54 +83,45 @@ impl Client {
 
         tracing::debug!("connected to NATS");
 
-        let prefix = format!("{}.{}.*", v1::PREFIX, id);
-        let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
-
-        tokio::spawn(async move {
-            tracing::debug!("v1 command routing subscribed on {}", &prefix);
-
-            while let Some(data) = routing_subscriber.next().await {
-                let subject = data.subject.trim_start_matches(&prefix[..prefix.len() - 1]);
-
-                match Command::from_message(subject, &data.payload) {
-                    Ok(command) => {
-                        tracing::debug!(?command);
-                        // TODO: Think if we might want to queue commands
-                        // handle_command(command);
-                    }
-                    Err(err) => {
-                        tracing::warn!(cause = %err, "failed to parse received message");
-                        tracing::debug!(payload = %String::from_utf8_lossy(&data.payload));
-                    }
-                }
-            }
-        });
-
         Ok(Self {
             id: id.to_owned(),
-            client,
+            nats,
         })
     }
 
-    #[tracing::instrument(skip(self))]
-    pub async fn publish(&self, msg: OutgoingMessage) {
-        let subj = msg.subject(&self.id);
-        tracing::debug!(subject = &subj, "publish");
-        if let Err(err) = self.client.publish(subj, msg.into_payload()).await {
-            tracing::error!(cause = %err, "publish error");
+    pub async fn publish(&self, msg: Message) {
+        let subject = format!("agents.v1.{}.{}", self.id, msg.subject);
+        self.nats.publish(subject, msg.payload).await.unwrap();
+    }
+
+    pub async fn reply(&self, msg: Message, mut stream: impl Stream<Item = Bytes> + Unpin) {
+        match msg.reply {
+            Some(reply) => {
+                while let Some(data) = stream.next().await {
+                    self.nats.publish(reply.clone(), data.into()).await.unwrap();
+                }
+            }
+            None => tracing::warn!(?msg, "tried to reply to message without a reply subject"),
         }
     }
-}
 
-// pub fn receive_requests(agent: Client) {
-//     let prefix = format!("{}.{}.*", v1::PREFIX, id);
-//     let mut routing_subscriber = client.subscribe(prefix.clone()).await?;
-//     let subscriber = agent.subscribe("").await;
-//     tracing::debug!("v1 command routing subscribed on {}", &prefix);
-// }
-//
-// fn handle_command(command: Command) {
-//     match command {
-//         Command::Exec(_) => todo!(),
-//     }
-// }
+    pub async fn subscribe(&self) -> Result<impl Stream<Item = Message>> {
+        let subject = format!("agents.v1.{}.*", self.id);
+
+        let subscriber = self
+            .nats
+            .subscribe(subject.clone())
+            .await?
+            .filter_map(move |data| match Message::from_transport(data) {
+                Ok(msg) => Some(msg),
+                Err(err) => {
+                    tracing::warn!("{}", err);
+                    None
+                }
+            });
+
+        tracing::debug!("subscribed on {}", &subject);
+
+        Ok(subscriber)
+    }
+}
diff --git a/agent/src/services.rs b/agent/src/services.rs
index a4c84bb..a5b1985 100644
--- a/agent/src/services.rs
+++ b/agent/src/services.rs
@@ -1,10 +1,89 @@
-use tower::steer::Steer;
+use std::process::Stdio;
 
-fn router() {
-    let services = vec![];
+use bytes::Bytes;
+use futures::{FutureExt, Stream};
+use serde::Deserialize;
+use tokio::process::Command;
+use tokio_stream::StreamExt;
+use tokio_util::codec::{BytesCodec, FramedRead};
 
-    let svc = Steer::new(services, |req, services: &[_]| {
-        // do something with req
-        0
+use crate::{
+    health::Health,
+    messaging::{Client, Message, Subject},
+};
+
+pub async fn init_services(client: Client) {
+    let mut message_stream = client.subscribe().await.unwrap();
+
+    tokio::spawn(async move {
+        while let Some(message) = message_stream.next().await {
+            let client = client.clone();
+            tokio::spawn(async move {
+                if let Err(err) = handle_message(client, message).await {
+                    tracing::warn!("{err}");
+                }
+            });
+        }
     });
 }
+
+async fn handle_message(client: Client, message: Message) -> anyhow::Result<()> {
+    match message.subject() {
+        Subject::Exec => {
+            let stream = exec_handler(message.parse_payload()?).await?;
+            client.reply(message, stream).await;
+        }
+        Subject::Health => {
+            let health: Health = message.parse_payload()?;
+            tracing::info!(?health, "received a health");
+        }
+    }
+
+    Ok(())
+}
+
+/// An operating system program execution.
+#[derive(Debug, Deserialize)]
+struct ExecMessage {
+    user: String,
+    program: String,
+    #[serde(default)]
+    args: Vec<String>,
+}
+
+async fn exec_handler(req: ExecMessage) -> anyhow::Result<impl Stream<Item = Bytes> + Unpin> {
+    // TODO: Tasks should be idempontent
+    // TODO: Root user should be able to run only specific programs
+    let mut cmd = if req.user != "root" {
+        let mut cmd = Command::new("sudo");
+        cmd.arg("-iu").arg(&req.user).arg("--").arg(&req.program);
+        cmd
+    } else {
+        Command::new(&req.program)
+    };
+
+    let mut io = cmd
+        .args(&req.args)
+        .stdout(Stdio::piped())
+        .stderr(Stdio::piped())
+        .spawn()?;
+
+    let stdout = FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new())
+        .map(|stdout| Bytes::from(stdout.unwrap()));
+
+    let stderr = FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new())
+        .map(|stderr| Bytes::from(stderr.unwrap()));
+
+    let exit = async move {
+        io.wait()
+            .await
+            .map(|exit| {
+                let exit = exit.to_string();
+                Bytes::from(exit)
+            })
+            .unwrap()
+    }
+    .into_stream();
+
+    Ok(Box::pin(stdout.merge(stderr).chain(exit)))
+}