From 5d948f4c199e86a2f8dd77fbb1e0744cdb0c7c39 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Sat, 12 Aug 2023 09:37:01 +0000 Subject: [PATCH] Add health checking system to agent Adds a health checking endpoint on the GRPC server. This is a stream that changes whenever a health status update occurs. Reviewed-on: https://git.nikos.gg/prymn/prymn/pulls/5 Co-authored-by: Nikos Papadakis Co-committed-by: Nikos Papadakis --- .woodpecker/release-agent.yml | 4 +- .woodpecker/release-install-script.yml | 4 +- agent/Cargo.lock | 318 +++++++++++++++---------- agent/Cargo.toml | 8 +- agent/build.rs | 2 +- agent/src/bin/prymn_agent.rs | 10 +- agent/src/lib.rs | 1 + agent/src/self_update.rs | 2 +- agent/src/server/agent.rs | 117 +++++++++ agent/src/server/mod.rs | 207 ++++------------ agent/src/server/rpc.rs | 1 - agent/src/{server => system}/exec.rs | 43 ++-- agent/src/system/health.rs | 299 +++++++++++++++++++++++ agent/src/system/mod.rs | 12 + proto/agent.proto | 20 +- 15 files changed, 720 insertions(+), 328 deletions(-) create mode 100644 agent/src/server/agent.rs delete mode 100644 agent/src/server/rpc.rs rename agent/src/{server => system}/exec.rs (72%) create mode 100644 agent/src/system/health.rs create mode 100644 agent/src/system/mod.rs diff --git a/.woodpecker/release-agent.yml b/.woodpecker/release-agent.yml index 0c378fa..61acbe5 100644 --- a/.woodpecker/release-agent.yml +++ b/.woodpecker/release-agent.yml @@ -1,6 +1,6 @@ when: - path: "agent/src/*" - branch: main + - event: push + branch: main matrix: BUILD_TARGET: diff --git a/.woodpecker/release-install-script.yml b/.woodpecker/release-install-script.yml index 43d334d..3c5acc5 100644 --- a/.woodpecker/release-install-script.yml +++ b/.woodpecker/release-install-script.yml @@ -1,6 +1,6 @@ when: - path: "get_prymn.sh" - branch: main + - event: push + branch: main steps: upload: diff --git a/agent/Cargo.lock b/agent/Cargo.lock index ed11bf6..5f011c3 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -19,13 +19,28 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.3.2" @@ -67,9 +82,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" dependencies = [ "anstyle", "windows-sys", @@ -100,18 +115,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] name = "async-trait" -version = "0.1.71" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] @@ -122,9 +137,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.18" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", @@ -194,9 +209,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" [[package]] name = "bumpalo" @@ -212,9 +227,12 @@ checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "cc" -version = "1.0.79" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" +dependencies = [ + "libc", +] [[package]] name = "cfg-if" @@ -223,19 +241,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "clap" -version = "4.3.12" +name = "chrono" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eab9e8ceb9afdade1ab3f0fd8dbce5b1b2f468ad653baf10e771781b2b67b73" +checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "time", + "wasm-bindgen", + "winapi", +] + +[[package]] +name = "clap" +version = "4.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c27cdf28c0f604ba3f512b0c9a409f8de8513e4816705deb0498b627e7c3a3fd" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.3.12" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f2763db829349bf00cfc06251268865ed4363b93a943174f638daf3ecdba2cd" +checksum = "08a9f1ab5e9f01a9b81f202e8562eb9a10de70abf9eaeac1be465c28b75aa4aa" dependencies = [ "anstream", "anstyle", @@ -263,9 +296,9 @@ checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" [[package]] name = "either" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" [[package]] name = "encoding_rs" @@ -287,9 +320,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f" dependencies = [ "errno-dragonfly", "libc", @@ -308,12 +341,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "1.9.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] +checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" [[package]] name = "fixedbitset" @@ -365,7 +395,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] @@ -404,7 +434,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -501,7 +531,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -534,6 +564,29 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.4.0" @@ -554,26 +607,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys", -] - [[package]] name = "ipnet" version = "2.8.0" @@ -587,7 +620,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "rustix 0.38.4", + "rustix", "windows-sys", ] @@ -600,6 +633,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -629,15 +671,9 @@ checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "linux-raw-sys" -version = "0.3.8" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - -[[package]] -name = "linux-raw-sys" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "log" @@ -647,9 +683,9 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" [[package]] name = "matchit" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" +checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" [[package]] name = "memchr" @@ -688,7 +724,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -731,6 +767,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -780,29 +825,29 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -828,9 +873,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92de25114670a878b1261c79c9f8f729fb97e95bac93f6312f583c60dd6a1dfe" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -853,7 +898,7 @@ checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.10.5", "lazy_static", "log", "multimap", @@ -874,7 +919,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -894,8 +939,10 @@ name = "prymn_agent" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap", "envy", + "itertools 0.11.0", "nix", "once_cell", "prost", @@ -914,9 +961,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.30" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5907a1b7c277254a8b15170f6e7c97cfa60ee7872a3217663bb81151e48184bb" +checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" dependencies = [ "proc-macro2", ] @@ -962,9 +1009,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", @@ -974,9 +1021,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.3" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -1051,36 +1098,22 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.37.23" +version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys", -] - -[[package]] -name = "rustix" -version = "0.38.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" -dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys 0.4.3", + "linux-raw-sys", "windows-sys", ] [[package]] name = "rustls" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", "ring", @@ -1099,9 +1132,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.1" +version = "0.101.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" dependencies = [ "ring", "untrusted", @@ -1131,29 +1164,29 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.173" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91f70896d6720bc714a4a57d22fc91f1db634680e65c8efe13323f1fa38d53f" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.173" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6250dde8342e0232232be9ca3db7aa40aceb5a3e5dd9bddbc00d99a007cde49" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] name = "serde_json" -version = "1.0.103" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" +checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" dependencies = [ "itoa", "ryu", @@ -1215,6 +1248,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "spin" version = "0.5.2" @@ -1246,9 +1289,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.26" +version = "2.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" +checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" dependencies = [ "proc-macro2", "quote", @@ -1263,9 +1306,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "sysinfo" -version = "0.29.4" +version = "0.29.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751e810399bba86e9326f5762b7f32ac5a085542df78da6a78d94e07d14d7c11" +checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db" dependencies = [ "cfg-if", "core-foundation-sys", @@ -1277,15 +1320,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.6.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ - "autocfg", "cfg-if", "fastrand", "redox_syscall", - "rustix 0.37.23", + "rustix", "windows-sys", ] @@ -1299,6 +1341,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -1316,11 +1369,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "2d3ce25f50619af8b0aec2eb23deebe84249e19e2ddd393a6e16e3300a6dadfd" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -1328,7 +1380,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.3", "tokio-macros", "windows-sys", ] @@ -1351,7 +1403,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] @@ -1373,6 +1425,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -1485,7 +1538,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", ] [[package]] @@ -1588,6 +1641,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1615,7 +1674,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", "wasm-bindgen-shared", ] @@ -1649,7 +1708,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.28", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1722,6 +1781,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 6be735b..428b721 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -5,8 +5,10 @@ edition = "2021" [dependencies] anyhow = "1.0.71" +chrono = "0.4.26" clap = { version = "4.3.9" } envy = "0.4.2" +itertools = "0.11.0" nix = "0.26.2" once_cell = "1.18.0" prost = "0.11.9" @@ -14,12 +16,14 @@ reqwest = { version = "0.11.18", features = ["blocking", "rustls-tls", "json"], serde = { version = "1.0.173", features = ["derive"] } serde_json = "1.0.103" sysinfo = { version = "0.29.2", default-features = false } -tempfile = "3.6.0" tokio = { version = "1.28.2", features = ["rt-multi-thread", "io-util", "process", "macros", "signal"] } -tokio-stream = { version = "0.1.14", features = ["net"] } +tokio-stream = { version = "0.1.14", features = ["net", "sync"] } tonic = { version = "0.9.2", features = ["tls"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" +[dev-dependencies] +tempfile = "3.6.0" + [build-dependencies] tonic-build = "0.9.2" diff --git a/agent/build.rs b/agent/build.rs index 51f3f08..84fcb7f 100644 --- a/agent/build.rs +++ b/agent/build.rs @@ -1,6 +1,6 @@ fn main() { tonic_build::configure() - .build_client(true) + .build_client(false) .compile(&["../proto/agent.proto"], &["../proto"]) .unwrap(); } diff --git a/agent/src/bin/prymn_agent.rs b/agent/src/bin/prymn_agent.rs index 77d1c47..706166c 100644 --- a/agent/src/bin/prymn_agent.rs +++ b/agent/src/bin/prymn_agent.rs @@ -7,19 +7,13 @@ fn main() -> anyhow::Result<()> { let command = clap::Command::new(env!("CARGO_BIN_NAME")) .version(env!("CARGO_PKG_VERSION")) - .arg(arg!(-d --daemon "Run agent as daemon").exclusive(true)) .arg(arg!(--install "Install this agent binary to the system").exclusive(true)) - .arg_required_else_help(true) .try_get_matches() .unwrap_or_else(|e| e.exit()); - if command.get_flag("daemon") { - tracing::info!("starting agent"); - server::main() - } else if let Some(token) = command.get_one::("install") { - tracing::info!("starting installation..."); + if let Some(token) = command.get_one::("install") { self_update::install(token).context("failed to install the agent to the system") } else { - unreachable!() + server::run() } } diff --git a/agent/src/lib.rs b/agent/src/lib.rs index a65f914..0b21c96 100644 --- a/agent/src/lib.rs +++ b/agent/src/lib.rs @@ -1,3 +1,4 @@ pub mod config; pub mod self_update; pub mod server; +pub mod system; diff --git a/agent/src/self_update.rs b/agent/src/self_update.rs index 237a4b8..7f5cd77 100644 --- a/agent/src/self_update.rs +++ b/agent/src/self_update.rs @@ -51,7 +51,7 @@ Description=Prymn Agent Service After=network.target [Service] -ExecStart={PRYMN_PATH} -d +ExecStart={PRYMN_PATH} Type=simple Restart=always diff --git a/agent/src/server/agent.rs b/agent/src/server/agent.rs new file mode 100644 index 0000000..f99352d --- /dev/null +++ b/agent/src/server/agent.rs @@ -0,0 +1,117 @@ +use std::pin::Pin; + +use tokio_stream::{ + wrappers::{ReceiverStream, WatchStream}, + Stream, StreamExt, +}; +use tonic::{Request, Response, Status}; + +use crate::system::health::HealthMonitor; + +use super::proto::*; + +type AgentResult = std::result::Result, Status>; + +pub struct AgentService { + health: HealthMonitor, +} + +#[tonic::async_trait] +impl agent_server::Agent for AgentService { + type HealthStream = Pin> + Send>>; + + async fn health(&self, _: Request<()>) -> AgentResult { + let receiver = self.health.monitor(); + let version = env!("CARGO_PKG_VERSION"); + + let output = WatchStream::new(receiver).map(|health| { + Ok(HealthResponse { + version: version.to_owned(), + system: Some(health.system().into()), + tasks: health + .tasks() + .into_iter() + .map(|(key, val)| (key, TaskHealth::from(val))) + .collect(), + }) + }); + + Ok(Response::new(Box::pin(output))) + } + + async fn get_sys_info(&self, _: Request<()>) -> AgentResult { + use sysinfo::{CpuExt, DiskExt, SystemExt}; + + let mut sys = crate::system::SYSTEM.lock().unwrap(); + + sys.refresh_specifics( + sysinfo::RefreshKind::new() + .with_disks() + .with_memory() + .with_processes(sysinfo::ProcessRefreshKind::everything()) + .with_cpu(sysinfo::CpuRefreshKind::everything()), + ); + + let cpus = sys + .cpus() + .iter() + .map(|cpu| sys_info_response::Cpu { + freq_mhz: cpu.frequency(), + usage: cpu.cpu_usage(), + }) + .collect(); + + let disks = sys + .disks() + .iter() + .map(|disk| sys_info_response::Disk { + name: disk.name().to_string_lossy().into_owned(), + total_bytes: disk.total_space(), + avail_bytes: disk.available_space(), + mount_point: disk.mount_point().to_string_lossy().into_owned(), + }) + .collect(); + + let response = Response::new(SysInfoResponse { + uptime: sys.uptime(), + hostname: sys.host_name().unwrap_or_default(), + os: sys.long_os_version().unwrap_or_default(), + mem_total_bytes: sys.total_memory(), + mem_avail_bytes: sys.available_memory(), + swap_total_bytes: sys.total_swap(), + swap_free_bytes: sys.free_swap(), + cpus, + disks, + }); + + Ok(response) + } + + type ExecStream = Pin> + Send>>; + + async fn exec(&self, req: Request) -> AgentResult { + use crate::system::exec::*; + + let ExecRequest { program, args } = req.get_ref(); + + match exec(program, args) { + Ok(receiver) => { + let stream = ReceiverStream::new(receiver).map(|_inner| { + Ok(ExecResponse { + // TODO + response: None, + }) + }); + + Ok(Response::new(Box::pin(stream))) + } + Err(err) => Err(Status::failed_precondition(err.to_string())), + } + } +} + +pub fn server(health_monitor: HealthMonitor) -> agent_server::AgentServer { + agent_server::AgentServer::new(AgentService { + health: health_monitor, + }) +} diff --git a/agent/src/server/mod.rs b/agent/src/server/mod.rs index fe11ba6..2d634d1 100644 --- a/agent/src/server/mod.rs +++ b/agent/src/server/mod.rs @@ -1,184 +1,69 @@ -mod exec; -mod rpc; +use std::time::Duration; -use std::{ - pin::Pin, - sync::{Arc, Mutex}, -}; +use tokio::{signal, sync::oneshot, time::sleep}; -use anyhow::Context; -use sysinfo::{CpuExt, DiskExt, System, SystemExt}; -use tokio::{net::TcpListener, signal, sync::oneshot}; -use tokio_stream::{ - wrappers::{ReceiverStream, TcpListenerStream}, - Stream, StreamExt, -}; -use tonic::{transport::server::Router, Request, Response, Status}; +use crate::system::health::HealthMonitor; -struct Server { - sys: Arc>, -} +mod agent; +mod proto { + tonic::include_proto!("prymn"); -type Result = std::result::Result; - -#[tonic::async_trait] -impl rpc::agent_server::Agent for Server { - #[tracing::instrument(skip(self))] - async fn echo(&self, req: Request) -> Result> { - Ok(Response::new(rpc::EchoResponse { - message: req.into_inner().message, - })) - } - - #[tracing::instrument(skip(self))] - async fn get_sys_info(&self, _: Request<()>) -> Result> { - let mut sys = self.sys.lock().unwrap(); - - sys.refresh_specifics( - sysinfo::RefreshKind::new() - .with_disks() - .with_memory() - .with_processes(sysinfo::ProcessRefreshKind::everything()) - .with_cpu(sysinfo::CpuRefreshKind::everything()), - ); - - let cpus = sys - .cpus() - .iter() - .map(|cpu| rpc::sys_info_response::Cpu { - freq_mhz: cpu.frequency(), - usage: cpu.cpu_usage(), - }) - .collect(); - - let disks = sys - .disks() - .iter() - .map(|disk| rpc::sys_info_response::Disk { - name: disk.name().to_string_lossy().into_owned(), - total_bytes: disk.total_space(), - avail_bytes: disk.available_space(), - mount_point: disk.mount_point().to_string_lossy().into_owned(), - }) - .collect(); - - let response = Response::new(rpc::SysInfoResponse { - uptime: sys.uptime(), - hostname: sys.host_name().unwrap_or_default(), - os: sys.long_os_version().unwrap_or_default(), - mem_total_bytes: sys.total_memory(), - mem_avail_bytes: sys.available_memory(), - swap_total_bytes: sys.total_swap(), - swap_free_bytes: sys.free_swap(), - cpus, - disks, - }); - - Ok(response) - } - - type ExecStream = Pin> + Send + Sync>>; - - #[tracing::instrument(skip(self))] - async fn exec(&self, req: Request) -> Result> { - use exec::*; - - let rpc::ExecRequest { program, args } = req.into_inner(); - - match exec(&program, &args) { - Ok(receiver) => { - let stream = ReceiverStream::new(receiver).map(|inner| { - Ok(rpc::ExecResponse { - response: Some(inner.into()), - }) - }); - - Ok(Response::new(Box::pin(stream))) + impl From for SystemHealth { + fn from(val: crate::system::health::SystemHealth) -> Self { + if let crate::system::health::SystemStatus::Critical(ref reasons) = val.status { + SystemHealth { + status: itertools::join(reasons.iter().map(ToString::to_string), ","), + } + } else { + SystemHealth { + status: val.status.to_string(), + } + } + } + } + + impl From for TaskHealth { + fn from(val: crate::system::health::TaskHealth) -> Self { + TaskHealth { + status: val.status().to_string(), + message: val.message().to_owned(), + started_on: val.started_on().to_string(), + progress: val.progress() as i32, } - Err(err) => Err(Status::failed_precondition(err.to_string())), } } } +/// Run the server. This is the main entry point of the application. #[tokio::main] -pub async fn main() -> anyhow::Result<()> { - let address = "0.0.0.0:50012"; - - let incoming = { - let listener = TcpListener::bind(address) - .await - .with_context(|| format!("can't bind deamon to address {address}"))?; - - TcpListenerStream::new(listener) - }; - +pub async fn run() -> anyhow::Result<()> { let (shutdown_tx, shutdown_rx) = oneshot::channel(); + // Listen for shutdown signals tokio::spawn(async { signal::ctrl_c() .await - .expect("could not listen to shutdown signals"); + .expect("failed to listen to a ctrl-c signal"); - shutdown_tx.send(()).expect("bug: channel closed"); + let _ = shutdown_tx.send(()); }); - new_server() - .serve_with_incoming_shutdown(incoming, async { + let health_monitor = HealthMonitor::new(); + let agent_service = agent::server(health_monitor.clone()); + + tokio::spawn(async move { + loop { + health_monitor.check_system().await; + sleep(Duration::from_secs(1)).await; + } + }); + + tonic::transport::Server::builder() + .add_service(agent_service) + .serve_with_shutdown("[::]:50012".parse()?, async { let _ = shutdown_rx.await; }) .await?; + Ok(()) } - -fn new_server() -> Router { - let server = Server { - sys: Arc::new(Mutex::new(System::new_all())), - }; - - let service = rpc::agent_server::AgentServer::new(server); - - tonic::transport::Server::builder() - .trace_fn(|_| tracing::info_span!("agent_server")) - .add_service(service) -} - -#[cfg(test)] -mod tests { - use std::net::SocketAddr; - - use tokio::net::TcpListener; - use tokio_stream::wrappers::TcpListenerStream; - - use super::*; - - async fn spawn_server() -> SocketAddr { - let listener = TcpListener::bind("[::]:0").await.unwrap(); - let address = listener.local_addr().unwrap(); - - let listener = TcpListenerStream::new(listener); - - tokio::spawn(async move { new_server().serve_with_incoming(listener).await.unwrap() }); - - address - } - - #[tokio::test] - async fn echo_works() { - let addr = spawn_server().await; - - let mut client = - rpc::agent_client::AgentClient::connect(format!("http://[::]:{}", addr.port())) - .await - .unwrap(); - - let message = "Hello!".to_owned(); - let response = client - .echo(rpc::EchoRequest { - message: message.clone(), - }) - .await - .expect("to respond"); - - assert_eq!(rpc::EchoResponse { message }, response.into_inner()); - } -} diff --git a/agent/src/server/rpc.rs b/agent/src/server/rpc.rs deleted file mode 100644 index 241538e..0000000 --- a/agent/src/server/rpc.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("prymn"); diff --git a/agent/src/server/exec.rs b/agent/src/system/exec.rs similarity index 72% rename from agent/src/server/exec.rs rename to agent/src/system/exec.rs index e89cbd3..1347036 100644 --- a/agent/src/server/exec.rs +++ b/agent/src/system/exec.rs @@ -9,10 +9,8 @@ use tokio::{ sync::mpsc, }; -use super::rpc; - #[derive(Debug)] -pub(super) enum ExecOutput { +pub enum ExecOutput { Output { stdout: Option, stderr: Option, @@ -21,7 +19,7 @@ pub(super) enum ExecOutput { Error(String), } -pub(super) fn exec(program: P, args: &[A]) -> anyhow::Result> +pub fn exec(program: P, args: &[A]) -> anyhow::Result> where P: AsRef, A: AsRef, @@ -76,19 +74,6 @@ async fn run_process(mut command: Child, sender: mpsc::Sender) { } } -impl From for rpc::exec_response::Response { - fn from(value: ExecOutput) -> Self { - match value { - ExecOutput::Output { stdout, stderr } => Self::Output(rpc::exec_response::Output { - stdout: stdout.unwrap_or_default(), - stderr: stderr.unwrap_or_default(), - }), - ExecOutput::Exit(code) => Self::ExitCode(code.code().unwrap_or_default()), - ExecOutput::Error(err) => Self::Error(err), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -104,15 +89,33 @@ mod tests { assert_eq!(outputs.len(), 4); - let ExecOutput::Output { ref stdout, ref stderr } = outputs[0] else { panic!() }; + let ExecOutput::Output { + ref stdout, + ref stderr, + } = outputs[0] + else { + panic!() + }; assert_eq!(*stdout, Some("1".to_owned())); assert_eq!(*stderr, None); - let ExecOutput::Output { ref stdout, ref stderr } = outputs[1] else { panic!() }; + let ExecOutput::Output { + ref stdout, + ref stderr, + } = outputs[1] + else { + panic!() + }; assert_eq!(*stdout, Some("2".to_owned())); assert_eq!(*stderr, None); - let ExecOutput::Output { ref stdout, ref stderr } = outputs[2] else { panic!() }; + let ExecOutput::Output { + ref stdout, + ref stderr, + } = outputs[2] + else { + panic!() + }; assert_eq!(*stdout, Some("3".to_owned())); assert_eq!(*stderr, None); } diff --git a/agent/src/system/health.rs b/agent/src/system/health.rs new file mode 100644 index 0000000..7733a2c --- /dev/null +++ b/agent/src/system/health.rs @@ -0,0 +1,299 @@ +//! System health module +use std::{collections::HashMap, sync::Arc}; + +use chrono::{DateTime, Utc}; +use tokio::sync::watch; + +use super::SYSTEM; + +const MEMORY_USAGE_CRITICAL_THRESHOLD: u64 = 90; +const CPU_USAGE_CRITICAL_THRESHOLD: u64 = 90; +const DISK_USAGE_CRITICAL_THRESHOLD: u64 = 90; + +#[derive(Clone, PartialEq)] +pub enum CriticalReason { + HighMemoryUsage, + HighCpuUsage, + HighDiskUsage, +} + +#[derive(Clone, Default, PartialEq)] +pub enum SystemStatus { + #[default] + Normal, + OutOfDate, + Updating, + Critical(Vec), +} + +#[derive(Clone, Default)] +pub struct SystemHealth { + pub status: SystemStatus, +} + +#[derive(Clone, PartialEq, Debug)] +pub enum TaskStatus { + Normal, + Warning, + Error, + Completed, +} + +#[derive(Clone)] +pub struct TaskHealth { + status: TaskStatus, + started_on: DateTime, + message: String, + progress: u8, +} + +impl TaskHealth { + pub fn new(message: String) -> Self { + let started_on = chrono::Utc::now(); + + Self { + status: TaskStatus::Normal, + started_on, + message, + progress: 0, + } + } + + pub fn set_normal(&mut self, message: String) { + self.status = TaskStatus::Normal; + self.message = message; + } + + pub fn set_warning(&mut self, message: String) { + self.status = TaskStatus::Warning; + self.message = message; + } + + pub fn set_error(&mut self, message: String) { + self.status = TaskStatus::Error; + self.message = message; + } + + pub fn set_completed(mut self, message: String) { + self.status = TaskStatus::Completed; + self.progress = 100; + self.message = message; + } + + pub fn set_progress(&mut self, message: String, progress: u8) { + self.progress = progress; + self.message = message; + } + + pub fn status(&self) -> &TaskStatus { + &self.status + } + + pub fn started_on(&self) -> &DateTime { + &self.started_on + } + + pub fn message(&self) -> &str { + &self.message + } + + pub fn progress(&self) -> u8 { + self.progress + } +} + +#[derive(Default, Clone)] +pub struct Health { + system: SystemHealth, + tasks: HashMap, +} + +impl Health { + pub fn system(&self) -> SystemHealth { + self.system.clone() + } + + pub fn tasks(self) -> HashMap { + self.tasks + } +} + +/// `HealthMonitor` gives access to shared system health state, allowing to watch health and update +/// task health status. +/// +/// # Usage +/// Internally `HealthMonitor` uses [Arc] so it can be cheaply cloned and shared. +/// +/// ```no_run +/// use prymn_agent::system::health::{HealthMonitor, TaskHealth}; +/// +/// let health_monitor = HealthMonitor::new(); +/// let health_monitor_clone = health_monitor.clone(); +/// tokio::spawn(async move { +/// loop { +/// health_monitor_clone.check_system().await; +/// } +/// }); +/// tokio::spawn(async move { +/// health_monitor.set_task_health("some_task".to_string(), TaskHealth::new(None)).await; +/// }); +/// ``` +#[derive(Clone)] +pub struct HealthMonitor { + sender: Arc>, + receiver: watch::Receiver, +} + +impl HealthMonitor { + pub fn new() -> Self { + let (sender, receiver) = watch::channel(Health::default()); + Self { + sender: Arc::new(sender), + receiver, + } + } + + // TODO: Remove async from here (so it can be consistent) + // Move system checking task into it's own thing + pub async fn check_system(&self) { + use sysinfo::{CpuExt, DiskExt, SystemExt}; + + let status = tokio::task::spawn_blocking(|| { + let mut status = SystemStatus::Normal; + + // TODO: For testability, dependency inject this System struct in this function. + let mut sys = SYSTEM.lock().unwrap(); + + // Refresh system resources usage + sys.refresh_specifics( + sysinfo::RefreshKind::new() + .with_memory() + .with_disks() + .with_cpu(sysinfo::CpuRefreshKind::new().with_cpu_usage()), + ); + + let mut statuses = vec![]; + + // Check for critical memory usage + let memory_usage = sys.used_memory() * 100 / sys.total_memory(); + if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD { + statuses.push(CriticalReason::HighMemoryUsage); + } + + // Check for critical CPU usage + let cpu_usage = sys.global_cpu_info().cpu_usage(); + if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD as f32 { + statuses.push(CriticalReason::HighCpuUsage); + } + + // Check for any disk usage that is critical + for disk in sys.disks() { + let available_disk = disk.available_space() * 100 / disk.total_space(); + if available_disk < 100 - DISK_USAGE_CRITICAL_THRESHOLD { + statuses.push(CriticalReason::HighDiskUsage); + } + } + + if !statuses.is_empty() { + status = SystemStatus::Critical(statuses); + } + + status + }) + .await + .expect("system checking task panicked - possibly due to panicked mutex lock"); + + self.sender.send_if_modified(|Health { system, .. }| { + if system.status == status { + return false; + } + + system.status = status; + true + }); + } + + pub fn set_task_health(&self, task_name: String, health: TaskHealth) { + // Always send a notification in this case since it is an explicit action. + self.sender.send_modify(|Health { tasks, .. }| { + tasks.insert(task_name, health); + }); + } + + pub fn clear_task(&self, task_name: &str) { + self.sender + .send_if_modified(|Health { tasks, .. }| tasks.remove(task_name).is_some()); + } + + pub fn monitor(&self) -> watch::Receiver { + self.receiver.clone() + } +} + +impl Default for HealthMonitor { + fn default() -> Self { + HealthMonitor::new() + } +} + +impl std::fmt::Display for SystemStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SystemStatus::Normal => write!(f, "normal"), + SystemStatus::OutOfDate => write!(f, "out of date"), + SystemStatus::Updating => write!(f, "updating"), + SystemStatus::Critical(_) => write!(f, "critical"), + } + } +} + +impl std::fmt::Display for CriticalReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CriticalReason::HighMemoryUsage => write!(f, "high memory usage"), + CriticalReason::HighCpuUsage => write!(f, "high cpu usage"), + CriticalReason::HighDiskUsage => write!(f, "high disk usage"), + } + } +} + +impl std::fmt::Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskStatus::Normal => write!(f, "normal"), + TaskStatus::Warning => write!(f, "warning"), + TaskStatus::Error => write!(f, "error"), + TaskStatus::Completed => write!(f, "completed"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_monitor() { + let health_monitor = HealthMonitor::new(); + let receiver = health_monitor.monitor(); + + assert!(receiver.has_changed().is_ok_and(|changed| !changed)); + + let health = TaskHealth::new("this is normal".to_owned()); + health_monitor.set_task_health("some_task".to_string(), health); + + assert!(receiver.has_changed().is_ok_and(|changed| changed)); + + { + let health = receiver.borrow(); + let task_health = health.tasks.get("some_task").expect("a task should exist"); + + assert_eq!(task_health.status, TaskStatus::Normal); + assert_eq!(task_health.progress, 0); + assert_eq!(task_health.message, "this is normal"); + } + + health_monitor.clear_task("some_task"); + assert!(!receiver.borrow().tasks.contains_key("some_task")); + } +} diff --git a/agent/src/system/mod.rs b/agent/src/system/mod.rs new file mode 100644 index 0000000..42b3aa8 --- /dev/null +++ b/agent/src/system/mod.rs @@ -0,0 +1,12 @@ +//! System boundary and modules that interact with the operating system and programs. + +use std::sync::Mutex; + +use once_cell::sync::Lazy; +use sysinfo::SystemExt; + +pub mod exec; +pub mod health; + +// TODO: Make this mock-able so we can test code that interacts with it +pub static SYSTEM: Lazy> = Lazy::new(|| Mutex::new(sysinfo::System::new())); diff --git a/proto/agent.proto b/proto/agent.proto index c935b0e..6bcb6c3 100644 --- a/proto/agent.proto +++ b/proto/agent.proto @@ -4,12 +4,22 @@ import "google/protobuf/empty.proto"; package prymn; -message EchoRequest { - string message = 1; +message SystemHealth { + // Comma-separated statuses + string status = 1; } -message EchoResponse { - string message = 1; +message TaskHealth { + string status = 1; + string message = 2; + string started_on = 3; + int32 progress = 4; +} + +message HealthResponse { + string version = 1; + SystemHealth system = 2; + map tasks = 3; } message SysInfoResponse { @@ -55,7 +65,7 @@ message ExecResponse { } service Agent { - rpc Echo(EchoRequest) returns (EchoResponse); + rpc Health(google.protobuf.Empty) returns (stream HealthResponse); rpc GetSysInfo(google.protobuf.Empty) returns (SysInfoResponse); rpc Exec(ExecRequest) returns (stream ExecResponse); }