more churn

This commit is contained in:
Nikos Papadakis 2024-01-29 20:18:29 +02:00
parent 67d147c7e3
commit 46b87d2559
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
10 changed files with 486 additions and 125 deletions

45
Cargo.lock generated
View file

@ -105,6 +105,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.4" version = "0.10.4"
@ -281,6 +287,16 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "errno"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "fiat-crypto" name = "fiat-crypto"
version = "0.2.5" version = "0.2.5"
@ -435,9 +451,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.151" version = "0.2.152"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -687,6 +709,7 @@ dependencies = [
"bytes", "bytes",
"chrono", "chrono",
"futures", "futures",
"rustix",
"serde", "serde",
"serde_json", "serde_json",
"sysinfo", "sysinfo",
@ -743,7 +766,7 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
] ]
[[package]] [[package]]
@ -804,6 +827,20 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "rustix"
version = "0.38.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
dependencies = [
"bitflags 2.4.2",
"errno",
"itoa",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.21.10" version = "0.21.10"
@ -884,7 +921,7 @@ version = "2.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
"core-foundation", "core-foundation",
"core-foundation-sys", "core-foundation-sys",
"libc", "libc",

View file

@ -9,6 +9,7 @@ async-nats = "0.33.0"
bytes = "1.5.0" bytes = "1.5.0"
chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] } chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] }
futures = { version = "0.3.30", default-features = false, features = ["std"] } futures = { version = "0.3.30", default-features = false, features = ["std"] }
rustix = { version = "0.38.30", features = ["termios", "stdio", "pty", "process"] }
serde = { version = "1.0.195", features = ["derive"] } serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111" serde_json = "1.0.111"
sysinfo = { version = "0.30.5", default-features = false } sysinfo = { version = "0.30.5", default-features = false }

View file

@ -1,6 +1,6 @@
//! System health information and checking //! System health information and checking
use std::{sync::Arc, time::Duration}; use std::{collections::HashMap, sync::Arc, time::Duration};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::watch; use tokio::sync::watch;
@ -9,7 +9,7 @@ use crate::messaging::{Client, Message};
const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0; const MEMORY_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; const CPU_USAGE_CRITICAL_THRESHOLD: f32 = 90.0;
const DISK_USAGE_CRITICAL_THRESHOLD: f32 = 90.0; const DISK_USAGE_CRITICAL_THRESHOLD: f64 = 90.0;
pub struct System { pub struct System {
sys: sysinfo::System, sys: sysinfo::System,
@ -33,7 +33,7 @@ impl System {
.with_cpu(CpuRefreshKind::everything()), .with_cpu(CpuRefreshKind::everything()),
); );
// self.disks.refresh_list(); self.disks.refresh_list();
} }
pub fn system(&self) -> &sysinfo::System { pub fn system(&self) -> &sysinfo::System {
@ -45,18 +45,26 @@ impl System {
} }
} }
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub enum Status { pub enum Status {
#[default] #[default]
Normal, Normal,
Critical, Critical,
} }
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub enum DiskStatus {
#[default]
Normal,
// HighUsage,
VeryHighUsage,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct Health { pub struct Health {
cpu_status: Status, cpu_status: Status,
memory_status: Status, memory_status: Status,
disk_status: Status, disk_status: HashMap<String, DiskStatus>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -80,13 +88,12 @@ impl HealthMonitor {
let cpu_usage = sys.global_cpu_info().cpu_usage(); let cpu_usage = sys.global_cpu_info().cpu_usage();
// for d in system.disks().list() { let disks_usage = system.disks().list().iter().map(|dk| {
// let _avail = if d.total_space() > 0 { (
// (d.available_space() * 100 / d.total_space()) as u8 dk.name().to_str().unwrap_or("<INVALID DISK NAME DETECTED>"),
// } else { (dk.total_space() - dk.available_space()) as f64 / dk.total_space() as f64 * 100.0,
// 0 as u8 )
// }; });
// }
self.0.send_if_modified(|health| { self.0.send_if_modified(|health| {
let cpu_changed = match health.cpu_status { let cpu_changed = match health.cpu_status {
@ -113,7 +120,42 @@ impl HealthMonitor {
_ => false, _ => false,
}; };
cpu_changed || memory_changed let mut disk_changed = false;
disks_usage.for_each(|(name, usage)| match health.disk_status.get_mut(name) {
Some(DiskStatus::Normal) if usage > DISK_USAGE_CRITICAL_THRESHOLD => {
println!("{usage}");
health
.disk_status
.insert(name.to_owned(), DiskStatus::VeryHighUsage);
disk_changed = true;
}
Some(DiskStatus::VeryHighUsage) if usage <= DISK_USAGE_CRITICAL_THRESHOLD => {
health
.disk_status
.insert(name.to_owned(), DiskStatus::Normal);
disk_changed = true;
}
None if usage > DISK_USAGE_CRITICAL_THRESHOLD => {
health
.disk_status
.insert(name.to_owned(), DiskStatus::VeryHighUsage);
disk_changed = true;
}
None => {
health
.disk_status
.insert(name.to_owned(), DiskStatus::Normal);
disk_changed = true;
}
_ => {}
});
cpu_changed || memory_changed || disk_changed
}); });
} }
@ -137,7 +179,7 @@ pub async fn init_health_subsystem(client: Client) -> HealthMonitor {
// Forever refresh system resources and monitor changes // Forever refresh system resources and monitor changes
std::thread::spawn(move || loop { std::thread::spawn(move || loop {
const REFRESH_INTERVAL: Duration = Duration::from_secs(1); const REFRESH_INTERVAL: Duration = Duration::from_secs(5);
system.refresh_resources(); system.refresh_resources();
health_monitor.check_system(&system); health_monitor.check_system(&system);
std::thread::sleep(REFRESH_INTERVAL); std::thread::sleep(REFRESH_INTERVAL);

View file

@ -5,6 +5,7 @@ use tracing::Level;
mod health; mod health;
mod messaging; mod messaging;
mod pty;
mod services; mod services;
#[tokio::main] #[tokio::main]
@ -28,7 +29,7 @@ async fn run() -> anyhow::Result<()> {
let _health_monitor = init_health_subsystem(client.clone()).await; let _health_monitor = init_health_subsystem(client.clone()).await;
tracing::info!("initialized health system"); tracing::info!("initialized health system");
init_services(client).await; init_services(client).await?;
tracing::info!("initialized services"); tracing::info!("initialized services");
tracing::info!("agent is ready"); tracing::info!("agent is ready");

View file

@ -1,9 +1,8 @@
use std::fmt::Display; use std::fmt::{Debug, Display};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::Stream;
use serde::Deserialize;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::health::Health; use crate::health::Health;
@ -18,6 +17,7 @@ pub struct Client {
pub enum Subject { pub enum Subject {
Health, Health,
Exec, Exec,
OpenTerminal,
} }
impl Display for Subject { impl Display for Subject {
@ -25,10 +25,24 @@ impl Display for Subject {
match self { match self {
Subject::Health => write!(f, "health"), Subject::Health => write!(f, "health"),
Subject::Exec => write!(f, "exec"), Subject::Exec => write!(f, "exec"),
Subject::OpenTerminal => write!(f, "open_terminal"),
} }
} }
} }
impl TryFrom<&str> for Subject {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self> {
Ok(match value {
"health" => Subject::Health,
"exec" => Subject::Exec,
"open_terminal" => Subject::OpenTerminal,
_ => return Err(anyhow!("unknown subject '{}'", value)),
})
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Message { pub struct Message {
subject: Subject, subject: Subject,
@ -40,27 +54,19 @@ impl Message {
fn from_transport(msg: async_nats::Message) -> Result<Self> { fn from_transport(msg: async_nats::Message) -> Result<Self> {
let suffix = msg.subject.split_terminator('.').last().unwrap_or_default(); let suffix = msg.subject.split_terminator('.').last().unwrap_or_default();
match suffix { Ok(Message {
"exec" => Ok(Message { subject: suffix.try_into()?,
subject: Subject::Exec,
payload: msg.payload, payload: msg.payload,
reply: msg.reply, reply: msg.reply,
}), })
"health" => Ok(Message {
subject: Subject::Health,
payload: msg.payload,
reply: msg.reply,
}),
_ => Err(anyhow!("unknown subject: {}", msg.subject)),
}
} }
pub fn subject(&self) -> &Subject { pub fn subject(&self) -> &Subject {
&self.subject &self.subject
} }
pub fn parse_payload<'a, T: Deserialize<'a>>(&'a self) -> Result<T> { pub fn body(&self) -> Bytes {
Ok(serde_json::from_slice(&self.payload[..])?) self.payload.clone()
} }
pub fn health(health: Health) -> Result<Message> { pub fn health(health: Health) -> Result<Message> {
@ -125,3 +131,9 @@ impl Client {
Ok(subscriber) Ok(subscriber)
} }
} }
impl Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Client {{ id: {} }}", self.id)
}
}

166
agent/src/pty.rs Normal file
View file

@ -0,0 +1,166 @@
use std::{io, task::ready};
use rustix::{
fd::OwnedFd,
fs::{fcntl_getfl, fcntl_setfl, OFlags},
process::{ioctl_tiocsctty, setsid},
pty::{grantpt, ioctl_tiocgptpeer, openpt, unlockpt, OpenptFlags},
stdio::{dup2_stderr, dup2_stdin, dup2_stdout},
termios::{tcsetwinsize, Winsize},
};
use tokio::{
io::{unix::AsyncFd, AsyncRead, AsyncWrite},
process::Child,
};
#[derive(Debug)]
pub struct Pty {
fd: AsyncFd<OwnedFd>,
}
impl Pty {
pub fn open() -> io::Result<Self> {
let master = openpt(OpenptFlags::RDWR | OpenptFlags::NOCTTY | OpenptFlags::CLOEXEC)?;
grantpt(&master)?;
unlockpt(&master)?;
// Set nonblocking
let flags = fcntl_getfl(&master)?;
fcntl_setfl(&master, flags | OFlags::NONBLOCK)?;
let fd = AsyncFd::new(master)?;
Ok(Self { fd })
}
pub fn child(&self) -> io::Result<PtyChild> {
// NOTE: Linux v4.13 and above
let fd = ioctl_tiocgptpeer(&self.fd, OpenptFlags::RDWR | OpenptFlags::NOCTTY)?;
let child = PtyChild { fd };
Ok(child)
}
pub fn resize_window(&self, rows: u16, cols: u16) -> io::Result<()> {
let winsize = Winsize {
ws_row: rows,
ws_col: cols,
ws_xpixel: 0,
ws_ypixel: 0,
};
tcsetwinsize(&self.fd, winsize)?;
Ok(())
}
pub fn try_clone(&self) -> io::Result<Pty> {
let fd = self.fd.get_ref().try_clone()?;
Ok(Pty {
fd: AsyncFd::new(fd)?,
})
}
}
impl AsyncRead for Pty {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
loop {
let mut guard = ready!(self.fd.poll_read_ready(cx)?);
match guard.try_io(|inner| {
let fd = inner.get_ref();
let n = rustix::io::read(fd, buf.initialize_unfilled())?;
buf.advance(n);
Ok(())
}) {
Ok(result) => return std::task::Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
}
impl AsyncWrite for Pty {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
loop {
let mut guard = ready!(self.fd.poll_write_ready(cx))?;
match guard.try_io(|inner| Ok(rustix::io::write(inner.get_ref(), buf)?)) {
Ok(result) => return std::task::Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
std::task::Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
pub struct PtyChild {
fd: OwnedFd,
}
impl PtyChild {
pub fn login_tty(&self) -> io::Result<()> {
setsid()?;
ioctl_tiocsctty(&self.fd)?;
dup2_stdin(&self.fd)?;
dup2_stdout(&self.fd)?;
dup2_stderr(&self.fd)?;
Ok(())
}
}
pub fn open_shell(pty_child: PtyChild, shell: &str) -> io::Result<Child> {
let mut cmd = tokio::process::Command::new(shell);
unsafe {
cmd.pre_exec(move || {
pty_child.login_tty()?;
Ok(())
});
}
cmd.spawn()
}
#[cfg(test)]
mod test {
use rustix::fd::AsRawFd;
use super::*;
#[tokio::test]
async fn can_open_pty() {
let pty = Pty::open().unwrap();
let child = pty.child().unwrap();
let master_fd = pty.fd.get_ref().as_raw_fd();
let child_fd = child.fd.as_raw_fd();
assert!(master_fd != child_fd);
}
}

View file

@ -1,89 +0,0 @@
use std::process::Stdio;
use bytes::Bytes;
use futures::{FutureExt, Stream};
use serde::Deserialize;
use tokio::process::Command;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use crate::{
health::Health,
messaging::{Client, Message, Subject},
};
pub async fn init_services(client: Client) {
let mut message_stream = client.subscribe().await.unwrap();
tokio::spawn(async move {
while let Some(message) = message_stream.next().await {
let client = client.clone();
tokio::spawn(async move {
if let Err(err) = handle_message(client, message).await {
tracing::warn!("{err}");
}
});
}
});
}
async fn handle_message(client: Client, message: Message) -> anyhow::Result<()> {
match message.subject() {
Subject::Exec => {
let stream = exec_handler(message.parse_payload()?).await?;
client.reply(message, stream).await;
}
Subject::Health => {
let health: Health = message.parse_payload()?;
tracing::info!(?health, "received a health");
}
}
Ok(())
}
/// An operating system program execution.
#[derive(Debug, Deserialize)]
struct ExecMessage {
user: String,
program: String,
#[serde(default)]
args: Vec<String>,
}
async fn exec_handler(req: ExecMessage) -> anyhow::Result<impl Stream<Item = Bytes> + Unpin> {
// TODO: Tasks should be idempontent
// TODO: Root user should be able to run only specific programs
let mut cmd = if req.user != "root" {
let mut cmd = Command::new("sudo");
cmd.arg("-iu").arg(&req.user).arg("--").arg(&req.program);
cmd
} else {
Command::new(&req.program)
};
let mut io = cmd
.args(&req.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new())
.map(|stdout| Bytes::from(stdout.unwrap()));
let stderr = FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new())
.map(|stderr| Bytes::from(stderr.unwrap()));
let exit = async move {
io.wait()
.await
.map(|exit| {
let exit = exit.to_string();
Bytes::from(exit)
})
.unwrap()
}
.into_stream();
Ok(Box::pin(stdout.merge(stderr).chain(exit)))
}

View file

@ -0,0 +1,79 @@
use std::process::Stdio;
use bytes::Bytes;
use futures::FutureExt;
use serde::Deserialize;
use tokio::process::Command;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use super::Ctx;
#[derive(Debug, Deserialize)]
pub struct ExecReq {
user: String,
program: String,
#[serde(default)]
args: Vec<String>,
}
impl TryFrom<Bytes> for ExecReq {
type Error = serde_json::Error;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
serde_json::from_slice(&value[..])
}
}
/// An operating system program execution.
pub async fn exec(ctx: Ctx<ExecReq>) -> anyhow::Result<()> {
// TODO: Tasks should be idempontent
// TODO: Root user should be able to run only specific programs
let mut cmd = if &ctx.body.user != "root" {
let mut cmd = Command::new("sudo");
cmd.arg("-iu")
.arg(&ctx.body.user)
.arg("--")
.arg(&ctx.body.program);
cmd
} else {
Command::new(&ctx.body.program)
};
let mut io = cmd
.args(&ctx.body.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout =
FramedRead::new(io.stdout.take().unwrap(), BytesCodec::new()).filter_map(|stdout| {
stdout
.map(|bytes| bytes.freeze())
.map_err(|err| tracing::error!(%err, "read error on stdout"))
.ok()
});
let stderr =
FramedRead::new(io.stderr.take().unwrap(), BytesCodec::new()).filter_map(|stderr| {
stderr
.map(|bytes| bytes.freeze())
.map_err(|err| tracing::error!(%err, "read error on stderr"))
.ok()
});
let exit = async move {
io.wait()
.await
.map(|exit| {
let exit = exit.to_string();
Bytes::from(exit)
})
.unwrap()
}
.into_stream();
// Ok(Box::pin(stdout.merge(stderr).chain(exit)))
Ok(())
}

67
agent/src/services/mod.rs Normal file
View file

@ -0,0 +1,67 @@
use anyhow::Context;
use bytes::Bytes;
use thiserror::Error;
use tokio_stream::StreamExt;
mod exec;
mod terminal;
#[derive(Debug, Error)]
enum ServiceError {
#[error("received an invalid body format for a valid message")]
BodyFormatError,
}
struct Ctx<T> {
body: T,
}
impl<T> Ctx<T>
where
T: TryFrom<Bytes>,
{
fn with_body(client: crate::messaging::Client, body: Bytes) -> Result<Self, ServiceError> {
Ok(Self {
body: body
.try_into()
.map_err(|_err| ServiceError::BodyFormatError)?,
})
}
}
async fn route_message(
client: crate::messaging::Client,
message: crate::messaging::Message,
) -> Result<(), ServiceError> {
match message.subject() {
crate::messaging::Subject::Health => {}
crate::messaging::Subject::Exec => {
let ctx = Ctx::with_body(client, message.body())?;
let _ = self::exec::exec(ctx).await;
}
crate::messaging::Subject::OpenTerminal => {
let ctx = Ctx::with_body(client, message.body())?;
let _ = self::terminal::open_terminal(ctx).await;
}
}
Ok(())
}
pub async fn init_services(client: crate::messaging::Client) -> anyhow::Result<()> {
let mut message_stream = client
.subscribe()
.await
.context("could not initialize services system")?;
tokio::spawn(async move {
while let Some(message) = message_stream.next().await {
// TODO: How do i handle this error?
if let Err(err) = route_message(client.clone(), message).await {
tracing::warn!("{}", err);
};
}
});
Ok(())
}

View file

@ -0,0 +1,45 @@
use bytes::Bytes;
use futures::Stream;
use serde::Deserialize;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use super::Ctx;
#[derive(Debug, Deserialize)]
pub struct OpenTerminalMessage {
id: String,
}
impl TryFrom<Bytes> for OpenTerminalMessage {
type Error = serde_json::Error;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
serde_json::from_slice(&value[..])
}
}
pub async fn open_terminal(ctx: Ctx<OpenTerminalMessage>) -> anyhow::Result<()> {
let pty = crate::pty::Pty::open()?;
let mut pty_clone = pty.try_clone()?;
tokio::spawn(async move {
while let Some(data) = tokio_stream::once(b"foo").next().await {
if let Err(err) = pty_clone.write_all(&data[..]).await {
tracing::warn!(%err, "pseudoterminal write error");
}
}
});
let _out_stream = FramedRead::new(pty, BytesCodec::new()).filter_map(|inner| {
inner
.map(|bytes| bytes.freeze())
.map_err(|err| {
tracing::warn!(%err, "pseudoterminal read error");
})
.ok()
});
Ok(())
}