diff --git a/Cargo.lock b/Cargo.lock
index 979fbae..8d3f5f9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -105,6 +105,12 @@ version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
+[[package]]
+name = "bitflags"
+version = "2.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
+
 [[package]]
 name = "block-buffer"
 version = "0.10.4"
@@ -281,6 +287,16 @@ dependencies = [
  "subtle",
 ]
 
+[[package]]
+name = "errno"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "fiat-crypto"
 version = "0.2.5"
@@ -435,9 +451,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
 
 [[package]]
 name = "libc"
-version = "0.2.151"
+version = "0.2.152"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4"
+checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
+
+[[package]]
+name = "linux-raw-sys"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
 
 [[package]]
 name = "lock_api"
@@ -687,6 +709,7 @@ dependencies = [
  "bytes",
  "chrono",
  "futures",
+ "rustix",
  "serde",
  "serde_json",
  "sysinfo",
@@ -743,7 +766,7 @@ version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
 dependencies = [
- "bitflags",
+ "bitflags 1.3.2",
 ]
 
 [[package]]
@@ -804,6 +827,20 @@ dependencies = [
  "semver",
 ]
 
+[[package]]
+name = "rustix"
+version = "0.38.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
+dependencies = [
+ "bitflags 2.4.2",
+ "errno",
+ "itoa",
+ "libc",
+ "linux-raw-sys",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "rustls"
 version = "0.21.10"
@@ -884,7 +921,7 @@ version = "2.9.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de"
 dependencies = [
- "bitflags",
+ "bitflags 1.3.2",
  "core-foundation",
  "core-foundation-sys",
  "libc",
diff --git a/agent/Cargo.toml b/agent/Cargo.toml
index c19ef3c..96a6809 100644
--- a/agent/Cargo.toml
+++ b/agent/Cargo.toml
@@ -9,6 +9,7 @@ async-nats = "0.33.0"
 bytes = "1.5.0"
 chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] }
 futures = { version = "0.3.30", default-features = false, features = ["std"] }
+rustix = { version = "0.38.30", features = ["termios", "stdio", "pty", "process"] }
 serde = { version = "1.0.195", features = ["derive"] }
 serde_json = "1.0.111"
 sysinfo = { version = "0.30.5", default-features = false }
diff --git a/agent/src/health.rs b/agent/src/health.rs
index d9cbc72..58296e9 100644
--- a/agent/src/health.rs
+++ b/agent/src/health.rs
@@ -1,6 +1,6 @@
 //! System health information and checking
 
-use std::{sync::Arc, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
 
 use serde::{Deserialize, Serialize};
 use tokio::sync::watch;
@@ -9,7 +9,7 @@ use crate::messaging::{Client, Message};
 
 const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
 const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
-const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
+const DISK_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
 
 pub struct System {
     sys: sysinfo::System,
@@ -33,7 +33,7 @@ impl System {
                 .with_cpu(CpuRefreshKind::everything()),
         );
 
-        // self.disks.refresh_list();
+        self.disks.refresh_list();
     }
 
     pub fn system(&self) -> &sysinfo::System {
@@ -45,18 +45,26 @@ impl System {
     }
 }
 
-#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
 pub enum Status {
     #[default]
     Normal,
     Critical,
 }
 
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
+pub enum DiskStatus {
+    #[default]
+    Normal,
+    // HighUsage,
+    VeryHighUsage,
+}
+
 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
 pub struct Health {
     cpu_status: Status,
     memory_status: Status,
-    disk_status: Status,
+    disk_status: HashMap<String, DiskStatus>,
 }
 
 #[derive(Clone)]
@@ -80,13 +88,12 @@ impl HealthMonitor {
 
         let cpu_usage = sys.global_cpu_info().cpu_usage();
 
-        // for d in system.disks().list() {
-        //     let _avail = if d.total_space() > 0 {
-        //         (d.available_space() * 100 / d.total_space()) as u8
-        //     } else {
-        //         0 as u8
-        //     };
-        // }
+        let disks_usage = system.disks().list().iter().map(|dk| {
+            (
+                dk.name().to_str().unwrap_or("<INVALID DISK NAME DETECTED>"),
+                (dk.total_space() - dk.available_space()) as f64 / dk.total_space() as f64 * 100.0,
+            )
+        });
 
         self.0.send_if_modified(|health| {
             let cpu_changed = match health.cpu_status {
@@ -113,7 +120,42 @@ impl HealthMonitor {
                 _ => false,
             };
 
-            cpu_changed || memory_changed
+            let mut disk_changed = false;
+
+            disks_usage.for_each(|(name, usage)| match health.disk_status.get_mut(name) {
+                Some(DiskStatus::Normal) if usage > DISK_USAGE_CRITICAL_THRESHOLD => {
+                    println!("{usage}");
+                    health
+                        .disk_status
+                        .insert(name.to_owned(), DiskStatus::VeryHighUsage);
+
+                    disk_changed = true;
+                }
+                Some(DiskStatus::VeryHighUsage) if usage <= DISK_USAGE_CRITICAL_THRESHOLD => {
+                    health
+                        .disk_status
+                        .insert(name.to_owned(), DiskStatus::Normal);
+
+                    disk_changed = true;
+                }
+                None if usage > DISK_USAGE_CRITICAL_THRESHOLD => {
+                    health
+                        .disk_status
+                        .insert(name.to_owned(), DiskStatus::VeryHighUsage);
+
+                    disk_changed = true;
+                }
+                None => {
+                    health
+                        .disk_status
+                        .insert(name.to_owned(), DiskStatus::Normal);
+
+                    disk_changed = true;
+                }
+                _ => {}
+            });
+
+            cpu_changed || memory_changed || disk_changed
         });
     }
 
@@ -137,7 +179,7 @@ pub async fn init_health_subsystem(client: Client) -> HealthMonitor {
 
     // Forever refresh system resources and monitor changes
     std::thread::spawn(move || loop {
-        const REFRESH_INTERVAL: Duration = Duration::from_secs(1);
+        const REFRESH_INTERVAL: Duration = Duration::from_secs(5);
         system.refresh_resources();
         health_monitor.check_system(&system);
         std::thread::sleep(REFRESH_INTERVAL);
diff --git a/agent/src/main.rs b/agent/src/main.rs
index 6c86a03..0ffc009 100644
--- a/agent/src/main.rs
+++ b/agent/src/main.rs
@@ -5,6 +5,7 @@ use tracing::Level;
 
 mod health;
 mod messaging;
+mod pty;
 mod services;
 
 #[tokio::main]
@@ -28,7 +29,7 @@ async fn run() -> anyhow::Result<()> {
     let _health_monitor = init_health_subsystem(client.clone()).await;
     tracing::info!("initialized health system");
 
-    init_services(client).await;
+    init_services(client).await?;
     tracing::info!("initialized services");
 
     tracing::info!("agent is ready");
diff --git a/agent/src/messaging.rs b/agent/src/messaging.rs
index 49b4f85..77b988d 100644
--- a/agent/src/messaging.rs
+++ b/agent/src/messaging.rs
@@ -1,9 +1,8 @@
-use std::fmt::Display;
+use std::fmt::{Debug, Display};
 
 use anyhow::{anyhow, Result};
 use bytes::Bytes;
 use futures::Stream;
-use serde::Deserialize;
 use tokio_stream::StreamExt;
 
 use crate::health::Health;
@@ -18,6 +17,7 @@ pub struct Client {
 pub enum Subject {
     Health,
     Exec,
+    OpenTerminal,
 }
 
 impl Display for Subject {
@@ -25,10 +25,24 @@ impl Display for Subject {
         match self {
             Subject::Health => write!(f, "health"),
             Subject::Exec => write!(f, "exec"),
+            Subject::OpenTerminal => write!(f, "open_terminal"),
         }
     }
 }
 
+impl TryFrom<&str> for Subject {
+    type Error = anyhow::Error;
+
+    fn try_from(value: &str) -> Result<Self> {
+        Ok(match value {
+            "health" => Subject::Health,
+            "exec" => Subject::Exec,
+            "open_terminal" => Subject::OpenTerminal,
+            _ => return Err(anyhow!("unknown subject '{}'", value)),
+        })
+    }
+}
+
 #[derive(Debug)]
 pub struct Message {
     subject: Subject,
@@ -40,27 +54,19 @@ impl Message {
     fn from_transport(msg: async_nats::Message) -> Result<Self> {
         let suffix = msg.subject.split_terminator('.').last().unwrap_or_default();
 
-        match suffix {
-            "exec" => Ok(Message {
-                subject: Subject::Exec,
-                payload: msg.payload,
-                reply: msg.reply,
-            }),
-            "health" => Ok(Message {
-                subject: Subject::Health,
-                payload: msg.payload,
-                reply: msg.reply,
-            }),
-            _ => Err(anyhow!("unknown subject: {}", msg.subject)),
-        }
+        Ok(Message {
+            subject: suffix.try_into()?,
+            payload: msg.payload,
+            reply: msg.reply,
+        })
     }
 
     pub fn subject(&self) -> &Subject {
         &self.subject
     }
 
-    pub fn parse_payload<'a, T: Deserialize<'a>>(&'a self) -> Result<T> {
-        Ok(serde_json::from_slice(&self.payload[..])?)
+    pub fn body(&self) -> Bytes {
+        self.payload.clone()
     }
 
     pub fn health(health: Health) -> Result<Message> {
@@ -125,3 +131,9 @@ impl Client {
         Ok(subscriber)
     }
 }
+
+impl Debug for Client {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "Client {{ id: {} }}", self.id)
+    }
+}
diff --git a/agent/src/pty.rs b/agent/src/pty.rs
new file mode 100644
index 0000000..814e0de
--- /dev/null
+++ b/agent/src/pty.rs
@@ -0,0 +1,166 @@
+use std::{io, task::ready};
+
+use rustix::{
+    fd::OwnedFd,
+    fs::{fcntl_getfl, fcntl_setfl, OFlags},
+    process::{ioctl_tiocsctty, setsid},
+    pty::{grantpt, ioctl_tiocgptpeer, openpt, unlockpt, OpenptFlags},
+    stdio::{dup2_stderr, dup2_stdin, dup2_stdout},
+    termios::{tcsetwinsize, Winsize},
+};
+use tokio::{
+    io::{unix::AsyncFd, AsyncRead, AsyncWrite},
+    process::Child,
+};
+
+#[derive(Debug)]
+pub struct Pty {
+    fd: AsyncFd<OwnedFd>,
+}
+
+impl Pty {
+    pub fn open() -> io::Result<Self> {
+        let master = openpt(OpenptFlags::RDWR | OpenptFlags::NOCTTY | OpenptFlags::CLOEXEC)?;
+        grantpt(&master)?;
+        unlockpt(&master)?;
+
+        // Set nonblocking
+        let flags = fcntl_getfl(&master)?;
+        fcntl_setfl(&master, flags | OFlags::NONBLOCK)?;
+
+        let fd = AsyncFd::new(master)?;
+
+        Ok(Self { fd })
+    }
+
+    pub fn child(&self) -> io::Result<PtyChild> {
+        // NOTE: Linux v4.13 and above
+        let fd = ioctl_tiocgptpeer(&self.fd, OpenptFlags::RDWR | OpenptFlags::NOCTTY)?;
+        let child = PtyChild { fd };
+
+        Ok(child)
+    }
+
+    pub fn resize_window(&self, rows: u16, cols: u16) -> io::Result<()> {
+        let winsize = Winsize {
+            ws_row: rows,
+            ws_col: cols,
+            ws_xpixel: 0,
+            ws_ypixel: 0,
+        };
+
+        tcsetwinsize(&self.fd, winsize)?;
+
+        Ok(())
+    }
+
+    pub fn try_clone(&self) -> io::Result<Pty> {
+        let fd = self.fd.get_ref().try_clone()?;
+
+        Ok(Pty {
+            fd: AsyncFd::new(fd)?,
+        })
+    }
+}
+
+impl AsyncRead for Pty {
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> std::task::Poll<io::Result<()>> {
+        loop {
+            let mut guard = ready!(self.fd.poll_read_ready(cx)?);
+
+            match guard.try_io(|inner| {
+                let fd = inner.get_ref();
+                let n = rustix::io::read(fd, buf.initialize_unfilled())?;
+                buf.advance(n);
+
+                Ok(())
+            }) {
+                Ok(result) => return std::task::Poll::Ready(result),
+                Err(_would_block) => continue,
+            }
+        }
+    }
+}
+
+impl AsyncWrite for Pty {
+    fn poll_write(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        loop {
+            let mut guard = ready!(self.fd.poll_write_ready(cx))?;
+
+            match guard.try_io(|inner| Ok(rustix::io::write(inner.get_ref(), buf)?)) {
+                Ok(result) => return std::task::Poll::Ready(result),
+                Err(_would_block) => continue,
+            }
+        }
+    }
+
+    fn poll_flush(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        std::task::Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        std::task::Poll::Ready(Ok(()))
+    }
+}
+
+#[derive(Debug)]
+pub struct PtyChild {
+    fd: OwnedFd,
+}
+
+impl PtyChild {
+    pub fn login_tty(&self) -> io::Result<()> {
+        setsid()?;
+        ioctl_tiocsctty(&self.fd)?;
+        dup2_stdin(&self.fd)?;
+        dup2_stdout(&self.fd)?;
+        dup2_stderr(&self.fd)?;
+
+        Ok(())
+    }
+}
+
+pub fn open_shell(pty_child: PtyChild, shell: &str) -> io::Result<Child> {
+    let mut cmd = tokio::process::Command::new(shell);
+
+    unsafe {
+        cmd.pre_exec(move || {
+            pty_child.login_tty()?;
+            Ok(())
+        });
+    }
+
+    cmd.spawn()
+}
+
+#[cfg(test)]
+mod test {
+    use rustix::fd::AsRawFd;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn can_open_pty() {
+        let pty = Pty::open().unwrap();
+        let child = pty.child().unwrap();
+
+        let master_fd = pty.fd.get_ref().as_raw_fd();
+        let child_fd = child.fd.as_raw_fd();
+
+        assert!(master_fd != child_fd);
+    }
+}
diff --git a/agent/src/services.rs b/agent/src/services.rs
deleted file mode 100644
index a5b1985..0000000
--- a/agent/src/services.rs
+++ /dev/null
@@ -1,89 +0,0 @@
-use std::process::Stdio;
-
-use bytes::Bytes;
-use futures::{FutureExt, Stream};
-use serde::Deserialize;
-use tokio::process::Command;
-use tokio_stream::StreamExt;
-use tokio_util::codec::{BytesCodec, FramedRead};
-
-use crate::{
-    health::Health,
-    messaging::{Client, Message, Subject},
-};
-
-pub async fn init_services(client: Client) {
-    let mut message_stream = client.subscribe().await.unwrap();
-
-    tokio::spawn(async move {
-        while let Some(message) = message_stream.next().await {
-            let client = client.clone();
-            tokio::spawn(async move {
-                if let Err(err) = handle_message(client, message).await {
-                    tracing::warn!("{err}");
-                }
-            });
-        }
-    });
-}
-
-async fn handle_message(client: Client, message: Message) -> anyhow::Result<()> {
-    match message.subject() {
-        Subject::Exec => {
-            let stream = exec_handler(message.parse_payload()?).await?;
-            client.reply(message, stream).await;
-        }
-        Subject::Health => {
-            let health: Health = message.parse_payload()?;
-            tracing::info!(?health, "received a health");
-        }
-    }
-
-    Ok(())
-}
-
-/// An operating system program execution.
-#[derive(Debug, Deserialize)]
-struct ExecMessage {
-    user: String,
-    program: String,
-    #[serde(default)]
-    args: Vec<String>,
-}
-
-async fn exec_handler(req: ExecMessage) -> anyhow::Result<impl Stream<Item = Bytes> + Unpin> {
-    // TODO: Tasks should be idempontent
-    // TODO: Root user should be able to run only specific programs
-    let mut cmd = if req.user != "root" {
-        let mut cmd = Command::new("sudo");
-        cmd.arg("-iu").arg(&req.user).arg("--").arg(&req.program);
-        cmd
-    } else {
-        Command::new(&req.program)
-    };
-
-    let mut io = cmd
-        .args(&req.args)
-        .stdout(Stdio::piped())
-        .stderr(Stdio::piped())
-        .spawn()?;
-
-    let stdout = FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new())
-        .map(|stdout| Bytes::from(stdout.unwrap()));
-
-    let stderr = FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new())
-        .map(|stderr| Bytes::from(stderr.unwrap()));
-
-    let exit = async move {
-        io.wait()
-            .await
-            .map(|exit| {
-                let exit = exit.to_string();
-                Bytes::from(exit)
-            })
-            .unwrap()
-    }
-    .into_stream();
-
-    Ok(Box::pin(stdout.merge(stderr).chain(exit)))
-}
diff --git a/agent/src/services/exec.rs b/agent/src/services/exec.rs
new file mode 100644
index 0000000..0609244
--- /dev/null
+++ b/agent/src/services/exec.rs
@@ -0,0 +1,79 @@
+use std::process::Stdio;
+
+use bytes::Bytes;
+use futures::FutureExt;
+use serde::Deserialize;
+use tokio::process::Command;
+use tokio_stream::StreamExt;
+use tokio_util::codec::{BytesCodec, FramedRead};
+
+use super::Ctx;
+
+#[derive(Debug, Deserialize)]
+pub struct ExecReq {
+    user: String,
+    program: String,
+    #[serde(default)]
+    args: Vec<String>,
+}
+
+impl TryFrom<Bytes> for ExecReq {
+    type Error = serde_json::Error;
+
+    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
+        serde_json::from_slice(&value[..])
+    }
+}
+
+/// An operating system program execution.
+pub async fn exec(ctx: Ctx<ExecReq>) -> anyhow::Result<()> {
+    // TODO: Tasks should be idempontent
+    // TODO: Root user should be able to run only specific programs
+
+    let mut cmd = if &ctx.body.user != "root" {
+        let mut cmd = Command::new("sudo");
+        cmd.arg("-iu")
+            .arg(&ctx.body.user)
+            .arg("--")
+            .arg(&ctx.body.program);
+        cmd
+    } else {
+        Command::new(&ctx.body.program)
+    };
+
+    let mut io = cmd
+        .args(&ctx.body.args)
+        .stdout(Stdio::piped())
+        .stderr(Stdio::piped())
+        .spawn()?;
+
+    let stdout =
+        FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| {
+            stdout
+                .map(|bytes| bytes.freeze())
+                .map_err(|err| tracing::error!(%err, "read error on stdout"))
+                .ok()
+        });
+
+    let stderr =
+        FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| {
+            stderr
+                .map(|bytes| bytes.freeze())
+                .map_err(|err| tracing::error!(%err, "read error on stderr"))
+                .ok()
+        });
+
+    let exit = async move {
+        io.wait()
+            .await
+            .map(|exit| {
+                let exit = exit.to_string();
+                Bytes::from(exit)
+            })
+            .unwrap()
+    }
+    .into_stream();
+
+    //     Ok(Box::pin(stdout.merge(stderr).chain(exit)))
+    Ok(())
+}
diff --git a/agent/src/services/mod.rs b/agent/src/services/mod.rs
new file mode 100644
index 0000000..876657c
--- /dev/null
+++ b/agent/src/services/mod.rs
@@ -0,0 +1,67 @@
+use anyhow::Context;
+use bytes::Bytes;
+use thiserror::Error;
+use tokio_stream::StreamExt;
+
+mod exec;
+mod terminal;
+
+#[derive(Debug, Error)]
+enum ServiceError {
+    #[error("received an invalid body format for a valid message")]
+    BodyFormatError,
+}
+
+struct Ctx<T> {
+    body: T,
+}
+
+impl<T> Ctx<T>
+where
+    T: TryFrom<Bytes>,
+{
+    fn with_body(client: crate::messaging::Client, body: Bytes) -> Result<Self, ServiceError> {
+        Ok(Self {
+            body: body
+                .try_into()
+                .map_err(|_err| ServiceError::BodyFormatError)?,
+        })
+    }
+}
+
+async fn route_message(
+    client: crate::messaging::Client,
+    message: crate::messaging::Message,
+) -> Result<(), ServiceError> {
+    match message.subject() {
+        crate::messaging::Subject::Health => {}
+        crate::messaging::Subject::Exec => {
+            let ctx = Ctx::with_body(client, message.body())?;
+            let _ = self::exec::exec(ctx).await;
+        }
+        crate::messaging::Subject::OpenTerminal => {
+            let ctx = Ctx::with_body(client, message.body())?;
+            let _ = self::terminal::open_terminal(ctx).await;
+        }
+    }
+
+    Ok(())
+}
+
+pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> {
+    let mut message_stream = client
+        .subscribe()
+        .await
+        .context("could not initialize services system")?;
+
+    tokio::spawn(async move {
+        while let Some(message) = message_stream.next().await {
+            // TODO: How do i handle this error?
+            if let Err(err) = route_message(client.clone(), message).await {
+                tracing::warn!("{}", err);
+            };
+        }
+    });
+
+    Ok(())
+}
diff --git a/agent/src/services/terminal.rs b/agent/src/services/terminal.rs
new file mode 100644
index 0000000..2553b4e
--- /dev/null
+++ b/agent/src/services/terminal.rs
@@ -0,0 +1,45 @@
+use bytes::Bytes;
+use futures::Stream;
+use serde::Deserialize;
+use tokio::io::AsyncWriteExt;
+use tokio_stream::StreamExt;
+use tokio_util::codec::{BytesCodec, FramedRead};
+
+use super::Ctx;
+
+#[derive(Debug, Deserialize)]
+pub struct OpenTerminalMessage {
+    id: String,
+}
+
+impl TryFrom<Bytes> for OpenTerminalMessage {
+    type Error = serde_json::Error;
+
+    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
+        serde_json::from_slice(&value[..])
+    }
+}
+
+pub async fn open_terminal(ctx: Ctx<OpenTerminalMessage>) -> anyhow::Result<()> {
+    let pty = crate::pty::Pty::open()?;
+    let mut pty_clone = pty.try_clone()?;
+
+    tokio::spawn(async move {
+        while let Some(data) = tokio_stream::once(b"foo").next().await {
+            if let Err(err) = pty_clone.write_all(&data[..]).await {
+                tracing::warn!(%err, "pseudoterminal write error");
+            }
+        }
+    });
+
+    let _out_stream = FramedRead::new(pty, BytesCodec::new()).filter_map(|inner| {
+        inner
+            .map(|bytes| bytes.freeze())
+            .map_err(|err| {
+                tracing::warn!(%err, "pseudoterminal read error");
+            })
+            .ok()
+    });
+
+    Ok(())
+}