From 5c64f025792243d0a63420d75f7d2421a4272f3d Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Tue, 14 Nov 2023 15:23:50 +0000 Subject: [PATCH] Feature: Agent Tasks (#8) Reviewed-on: https://git.nikos.gg/prymn/prymn/pulls/8 Co-authored-by: Nikos Papadakis Co-committed-by: Nikos Papadakis --- .formatter.exs | 2 +- Cargo.lock | 433 +++++++++--------- agent/Cargo.toml | 10 +- agent/src/debian.rs | 186 ++++++++ agent/src/lib.rs | 1 + agent/src/server/agent.rs | 167 ++++--- agent/src/server/mod.rs | 88 +++- agent/src/system/exec.rs | 122 ----- agent/src/system/health.rs | 260 +++-------- agent/src/system/info.rs | 87 ++++ agent/src/system/mod.rs | 11 +- agent/src/system/task.rs | 154 +++++++ agent/tests/task_health.rs | 18 + app/lib/prymn/agents.ex | 39 +- app/lib/prymn/agents/connection.ex | 72 ++- app/lib/prymn/agents/health.ex | 19 +- .../prymn_web/components/core_components.ex | 32 +- app/lib/prymn_web/live/server_live/index.ex | 14 +- .../live/server_live/index.html.heex | 15 +- app/lib/prymn_web/live/server_live/show.ex | 92 ++-- .../prymn_web/live/server_live/show.html.heex | 133 ++++-- proto/agent.proto | 27 +- 22 files changed, 1234 insertions(+), 748 deletions(-) create mode 100644 agent/src/debian.rs delete mode 100644 agent/src/system/exec.rs create mode 100644 agent/src/system/info.rs create mode 100644 agent/src/system/task.rs create mode 100644 agent/tests/task_health.rs diff --git a/.formatter.exs b/.formatter.exs index bd84584..9754d1d 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -2,5 +2,5 @@ import_deps: [:ecto, :ecto_sql, :phoenix], subdirectories: ["priv/*/migrations"], plugins: [Phoenix.LiveView.HTMLFormatter, TailwindFormatter], - inputs: ["*.{heex,ex,exs}", "{config,lib,test}/**/*.{heex,ex,exs}", "priv/*/seeds.exs"] + inputs: ["*.{heex,ex,exs}", "app/{config,lib,test}/**/*.{heex,ex,exs}", "priv/*/seeds.exs"] ] diff --git a/Cargo.lock b/Cargo.lock index 7e4db1e..499873f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -43,9 +43,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -57,15 +57,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -81,9 +81,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys", @@ -96,14 +96,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] -name = "async-trait" -version = "0.1.73" +name = "async-stream" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn", +] + +[[package]] +name = "async-trait" +version = "0.1.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -174,9 +196,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "bitflags" @@ -186,15 +208,15 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "bytes" @@ -219,9 +241,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", @@ -233,18 +255,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.2" +version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a13b88d2c62ff462f88e4a121f17a82c1af05693a2f192b5c38d14de73c19f6" +checksum = "ac495e00dcec98c83465d5ad66c5c4fabd652fd6686e7c6269b117e729a6f17b" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.4.2" +version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08" +checksum = "c77ed9a32a62e6ca27175d00d29d05ca32e396ea1eb5fb01d8256b669cec7663" dependencies = [ "anstream", "anstyle", @@ -254,9 +276,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" [[package]] name = "colorchoice" @@ -264,6 +286,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -302,30 +334,19 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e" dependencies = [ - "errno-dragonfly", "libc", "windows-sys", ] -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "fastrand" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fixedbitset" @@ -350,42 +371,42 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-core", "futures-io", @@ -398,9 +419,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", "libc", @@ -440,9 +461,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" [[package]] name = "heck" @@ -452,9 +473,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "home" @@ -522,7 +543,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -543,16 +564,16 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows", + "windows-core", ] [[package]] @@ -586,28 +607,19 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.2", ] [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" - -[[package]] -name = "itertools" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" -dependencies = [ - "either", -] +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itertools" @@ -626,9 +638,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" dependencies = [ "wasm-bindgen", ] @@ -641,15 +653,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" [[package]] name = "log" @@ -659,15 +671,15 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "matchit" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "mime" @@ -686,9 +698,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", @@ -707,7 +719,7 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "cfg-if", "libc", ] @@ -733,9 +745,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -784,7 +796,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.0", + "indexmap 2.1.0", ] [[package]] @@ -804,7 +816,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn", ] [[package]] @@ -827,28 +839,28 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.1.25" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 1.0.109", + "syn", ] [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", "prost-derive", @@ -856,44 +868,44 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck", - "itertools 0.10.5", - "lazy_static", + "itertools", "log", "multimap", + "once_cell", "petgraph", "prettyplease", "prost", "prost-types", "regex", - "syn 1.0.109", + "syn", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools", "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] name = "prost-types" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ "prost", ] @@ -906,16 +918,18 @@ dependencies = [ "chrono", "clap", "envy", - "itertools 0.11.0", + "itertools", "nix", "once_cell", "prost", + "regex", "reqwest", "serde", "serde_json", "sysinfo", "tokio", "tokio-stream", + "tokio-util", "tonic", "tonic-build", "tower-http", @@ -964,18 +978,18 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.3.5" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ "bitflags 1.3.2", ] [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", @@ -985,9 +999,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", @@ -996,15 +1010,15 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64", "bytes", @@ -1025,6 +1039,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tower-service", "url", @@ -1042,11 +1057,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.12" +version = "0.38.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdf14a7a466ce88b5eac3da815b53aefc208ce7e74d1c263aabb04d88c4abeb1" +checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", @@ -1067,29 +1082,29 @@ checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn", ] [[package]] name = "serde_json" -version = "1.0.106" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -1110,9 +1125,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -1137,15 +1152,15 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi", @@ -1153,9 +1168,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys", @@ -1169,20 +1184,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.109" +version = "2.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "syn" -version = "2.0.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" dependencies = [ "proc-macro2", "quote", @@ -1210,10 +1214,31 @@ dependencies = [ ] [[package]] -name = "tempfile" -version = "3.8.0" +name = "system-configuration" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tempfile" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", "fastrand", @@ -1249,9 +1274,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -1260,7 +1285,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.5.5", "tokio-macros", "windows-sys", ] @@ -1283,7 +1308,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn", ] [[package]] @@ -1300,9 +1325,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", @@ -1314,16 +1339,15 @@ dependencies = [ [[package]] name = "tonic" -version = "0.9.2" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ + "async-stream", "async-trait", "axum", "base64", "bytes", - "futures-core", - "futures-util", "h2", "http", "http-body", @@ -1342,15 +1366,15 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.9.2" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 1.0.109", + "syn", ] [[package]] @@ -1379,7 +1403,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "bytes", "futures-core", "futures-util", @@ -1406,11 +1430,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1418,20 +1441,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn", ] [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -1439,12 +1462,12 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" dependencies = [ - "lazy_static", "log", + "once_cell", "tracing-core", ] @@ -1476,9 +1499,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" @@ -1529,9 +1552,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1539,24 +1562,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.31", + "syn", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +checksum = "9afec9963e3d0994cac82455b2b3502b81a7f40f9a0d32181f7528d9f4b43e02" dependencies = [ "cfg-if", "js-sys", @@ -1566,9 +1589,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1576,28 +1599,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" [[package]] name = "web-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" dependencies = [ "js-sys", "wasm-bindgen", @@ -1638,10 +1661,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows" -version = "0.48.0" +name = "windows-core" +version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ "windows-targets", ] diff --git a/agent/Cargo.toml b/agent/Cargo.toml index ead948c..1f0ea33 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -6,22 +6,24 @@ edition = "2021" [dependencies] anyhow = "1.0.71" chrono = "0.4.26" -clap = { version = "4.3.9" } +clap = "4.3.9" envy = "0.4.2" itertools = "0.11.0" nix = "0.27.1" once_cell = "1.18.0" -prost = "0.11.9" +prost = "0.12.1" +regex = "1.10.2" reqwest = { version = "0.11.18", features = ["blocking", "json"], default-features = false } serde = { version = "1.0.173", features = ["derive"] } serde_json = "1.0.103" sysinfo = { version = "0.29.2", default-features = false } tokio = { version = "1.28.2", features = ["rt-multi-thread", "io-util", "process", "macros", "signal"] } tokio-stream = { version = "0.1.14", features = ["net", "sync"] } -tonic = { version = "0.9.2" } +tokio-util = { version = "0.7.10", features = ["codec"] } +tonic = { version = "0.10.2" } tower-http = { version = "0.4.3", features = ["trace"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" [build-dependencies] -tonic-build = "0.9.2" +tonic-build = "0.10.2" diff --git a/agent/src/debian.rs b/agent/src/debian.rs new file mode 100644 index 0000000..4ae241f --- /dev/null +++ b/agent/src/debian.rs @@ -0,0 +1,186 @@ +use std::process::{Command, Output}; + +use regex::Regex; + +pub fn update_package_index() -> std::io::Result { + Command::new("apt-get").arg("-y").arg("update").output() +} + +pub fn run_updates(dry_run: bool) -> std::io::Result { + let mut command = Command::new("apt-get"); + + if dry_run { + command.arg("-s"); + } + + command.arg("-y").arg("upgrade").output() +} + +pub fn install_packages(packages: &[&str]) -> std::io::Result { + Command::new("apt-get") + .arg("install") + .arg("-y") + .args(packages) + .output() +} + +pub fn get_available_updates() -> std::io::Result> { + let output = Command::new("apt-get").arg("-sV").arg("upgrade").output()?; + let upgradables = parse_upgrade_output(&String::from_utf8_lossy(&output.stdout)); + Ok(upgradables) +} + +fn parse_upgrade_output(output: &str) -> Vec { + output + .split_once("The following packages will be upgraded:\n") + .and_then(|(_, rest)| { + // Find the first line with non-whitespace characters (indicating the end of the list) + let re = Regex::new(r"(?m)^\S").unwrap(); + re.find(rest).map(|m| rest.split_at(m.start()).0) + }) + .map_or_else(Vec::new, |text| { + let lines = text.lines(); + lines.map(|line| line.trim().to_owned()).collect() + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_upgrade_output_correctly() { + // `apt-get -sV upgrade` + let test_output = r" +NOTE: This is only a simulation! + apt-get needs root privileges for real execution. + Keep also in mind that locking is deactivated, + so don't depend on the relevance to the real current situation! +Reading package lists... Done +Building dependency tree... Done +Reading state information... Done +Calculating upgrade... Done +The following packages have been kept back: + linux-image-amd64 (5.10.191-1 => 5.10.197-1) +The following packages will be upgraded: + adduser (3.118 => 3.118+deb11u1) + base-files (11.1+deb11u7 => 11.1+deb11u8) + cpio (2.13+dfsg-4 => 2.13+dfsg-7.1~deb11u1) + dbus (1.12.24-0+deb11u1 => 1.12.28-0+deb11u1) + distro-info-data (0.51+deb11u3 => 0.51+deb11u4) + dpkg (1.20.12 => 1.20.13) + grub-common (2.06-3~deb11u5 => 2.06-3~deb11u6) + grub-pc (2.06-3~deb11u5 => 2.06-3~deb11u6) + grub-pc-bin (2.06-3~deb11u5 => 2.06-3~deb11u6) + grub2-common (2.06-3~deb11u5 => 2.06-3~deb11u6) + krb5-locales (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4) + libbsd0 (0.11.3-1 => 0.11.3-1+deb11u1) + libcurl3-gnutls (7.74.0-1.3+deb11u7 => 7.74.0-1.3+deb11u10) + libdbus-1-3 (1.12.24-0+deb11u1 => 1.12.28-0+deb11u1) + libgssapi-krb5-2 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4) + libk5crypto3 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4) + libkrb5-3 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4) + libkrb5support0 (1.18.3-6+deb11u3 => 1.18.3-6+deb11u4) + libncurses6 (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2) + libncursesw6 (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2) + libnss-systemd (247.3-7+deb11u2 => 247.3-7+deb11u4) + libpam-systemd (247.3-7+deb11u2 => 247.3-7+deb11u4) + libssl1.1 (1.1.1n-0+deb11u5 => 1.1.1w-0+deb11u1) + libsystemd0 (247.3-7+deb11u2 => 247.3-7+deb11u4) + libtinfo6 (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2) + libudev1 (247.3-7+deb11u2 => 247.3-7+deb11u4) + logrotate (3.18.0-2+deb11u1 => 3.18.0-2+deb11u2) + ncurses-base (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2) + ncurses-bin (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2) + ncurses-term (6.2+20201114-2+deb11u1 => 6.2+20201114-2+deb11u2) + openssh-client (1:8.4p1-5+deb11u1 => 1:8.4p1-5+deb11u2) + openssh-server (1:8.4p1-5+deb11u1 => 1:8.4p1-5+deb11u2) + openssh-sftp-server (1:8.4p1-5+deb11u1 => 1:8.4p1-5+deb11u2) + openssl (1.1.1n-0+deb11u5 => 1.1.1w-0+deb11u1) + qemu-utils (1:5.2+dfsg-11+deb11u2 => 1:5.2+dfsg-11+deb11u3) + systemd (247.3-7+deb11u2 => 247.3-7+deb11u4) + systemd-sysv (247.3-7+deb11u2 => 247.3-7+deb11u4) + udev (247.3-7+deb11u2 => 247.3-7+deb11u4) +38 upgraded, 0 newly installed, 0 to remove and 1 not upgraded. +Inst base-files [11.1+deb11u7] (11.1+deb11u8 Debian:11.8/oldstable [amd64]) +Conf base-files (11.1+deb11u8 Debian:11.8/oldstable [amd64]) +Inst dpkg [1.20.12] (1.20.13 Debian:11.8/oldstable [amd64]) +Conf dpkg (1.20.13 Debian:11.8/oldstable [amd64]) +Inst ncurses-bin [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) +Conf ncurses-bin (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) +Inst ncurses-base [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all]) +Conf ncurses-base (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all]) +Inst libnss-systemd [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [] +Inst libsystemd0 [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [systemd:amd64 ] +Conf libsystemd0 (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [systemd:amd64 ] +Inst libpam-systemd [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [systemd:amd64 ] +Inst systemd [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Inst udev [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) [] +Inst libudev1 [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Conf libudev1 (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Inst adduser [3.118] (3.118+deb11u1 Debian:11.8/oldstable [all]) +Conf adduser (3.118+deb11u1 Debian:11.8/oldstable [all]) +Conf systemd (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Inst systemd-sysv [247.3-7+deb11u2] (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Inst dbus [1.12.24-0+deb11u1] (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64]) [] +Inst libdbus-1-3 [1.12.24-0+deb11u1] (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64]) +Inst libk5crypto3 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) +Conf libk5crypto3 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) +Inst libkrb5support0 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libkrb5-3:amd64 ] +Conf libkrb5support0 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libkrb5-3:amd64 ] +Inst libkrb5-3 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libgssapi-krb5-2:amd64 ] +Conf libkrb5-3 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) [libgssapi-krb5-2:amd64 ] +Inst libgssapi-krb5-2 [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) +Conf libgssapi-krb5-2 (1.18.3-6+deb11u4 Debian:11.8/oldstable [amd64]) +Inst libssl1.1 [1.1.1n-0+deb11u5] (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64]) +Conf libssl1.1 (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64]) +Inst libncurses6 [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) [] +Inst libncursesw6 [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) [] +Inst libtinfo6 [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) +Conf libtinfo6 (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) +Inst cpio [2.13+dfsg-4] (2.13+dfsg-7.1~deb11u1 Debian:11.8/oldstable [amd64]) +Inst logrotate [3.18.0-2+deb11u1] (3.18.0-2+deb11u2 Debian:11.8/oldstable [amd64]) +Inst krb5-locales [1.18.3-6+deb11u3] (1.18.3-6+deb11u4 Debian:11.8/oldstable [all]) +Inst ncurses-term [6.2+20201114-2+deb11u1] (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all]) +Inst openssh-sftp-server [1:8.4p1-5+deb11u1] (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) [] +Inst openssh-server [1:8.4p1-5+deb11u1] (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) [] +Inst openssh-client [1:8.4p1-5+deb11u1] (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) +Inst distro-info-data [0.51+deb11u3] (0.51+deb11u4 Debian:11.8/oldstable [all]) +Inst grub2-common [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) [grub-pc:amd64 ] +Inst grub-pc [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) [] +Inst grub-pc-bin [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) [] +Inst grub-common [2.06-3~deb11u5] (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) +Inst libbsd0 [0.11.3-1] (0.11.3-1+deb11u1 Debian:11.8/oldstable [amd64]) +Inst libcurl3-gnutls [7.74.0-1.3+deb11u7] (7.74.0-1.3+deb11u10 Debian-Security:11/oldstable-security [amd64]) +Inst openssl [1.1.1n-0+deb11u5] (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64]) +Inst qemu-utils [1:5.2+dfsg-11+deb11u2] (1:5.2+dfsg-11+deb11u3 Debian:11.8/oldstable [amd64]) +Conf libnss-systemd (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Conf libpam-systemd (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Conf udev (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Conf systemd-sysv (247.3-7+deb11u4 Debian:11.8/oldstable, Debian:11-updates/oldstable-updates [amd64]) +Conf dbus (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64]) +Conf libdbus-1-3 (1.12.28-0+deb11u1 Debian:11.8/oldstable [amd64]) +Conf libncurses6 (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) +Conf libncursesw6 (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [amd64]) +Conf cpio (2.13+dfsg-7.1~deb11u1 Debian:11.8/oldstable [amd64]) +Conf logrotate (3.18.0-2+deb11u2 Debian:11.8/oldstable [amd64]) +Conf krb5-locales (1.18.3-6+deb11u4 Debian:11.8/oldstable [all]) +Conf ncurses-term (6.2+20201114-2+deb11u2 Debian:11.8/oldstable [all]) +Conf openssh-sftp-server (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) +Conf openssh-server (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) +Conf openssh-client (1:8.4p1-5+deb11u2 Debian:11.8/oldstable [amd64]) +Conf distro-info-data (0.51+deb11u4 Debian:11.8/oldstable [all]) +Conf grub2-common (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) +Conf grub-pc (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) +Conf grub-pc-bin (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) +Conf grub-common (2.06-3~deb11u6 Debian-Security:11/oldstable-security [amd64]) +Conf libbsd0 (0.11.3-1+deb11u1 Debian:11.8/oldstable [amd64]) +Conf libcurl3-gnutls (7.74.0-1.3+deb11u10 Debian-Security:11/oldstable-security [amd64]) +Conf openssl (1.1.1w-0+deb11u1 Debian:11.8/oldstable [amd64]) +Conf qemu-utils (1:5.2+dfsg-11+deb11u3 Debian:11.8/oldstable [amd64]) + "; + + let upgradables = parse_upgrade_output(test_output); + assert_eq!(upgradables.len(), 38); + } +} diff --git a/agent/src/lib.rs b/agent/src/lib.rs index 0b21c96..2dfe058 100644 --- a/agent/src/lib.rs +++ b/agent/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod debian; pub mod self_update; pub mod server; pub mod system; diff --git a/agent/src/server/agent.rs b/agent/src/server/agent.rs index 72f3baf..563f8a6 100644 --- a/agent/src/server/agent.rs +++ b/agent/src/server/agent.rs @@ -1,23 +1,28 @@ -use std::pin::Pin; +use std::{pin::Pin, process::Stdio, sync::Mutex}; use tokio_stream::{ wrappers::{ReceiverStream, WatchStream}, Stream, StreamExt, }; +use tokio_util::codec::{BytesCodec, FramedRead}; use tonic::{Request, Response, Status}; -use crate::system::health::HealthMonitor; +use crate::{ + debian, + system::{health::HealthMonitor, info::Info, task::TaskBuilder}, +}; use super::proto::*; type AgentResult = std::result::Result, Status>; -pub struct AgentService { - health: HealthMonitor, +pub struct AgentService<'a> { + pub health: HealthMonitor, + pub info: &'a Mutex, // TODO: Find a way to remove the Mutex dependency here } #[tonic::async_trait] -impl agent_server::Agent for AgentService { +impl agent_server::Agent for AgentService<'static> { type HealthStream = Pin> + Send>>; async fn health(&self, _: Request<()>) -> AgentResult { @@ -30,8 +35,8 @@ impl agent_server::Agent for AgentService { system: Some(health.system().into()), tasks: health .tasks() - .into_iter() - .map(|(key, val)| (key, TaskHealth::from(val))) + .iter() + .map(|(k, v)| (k.clone(), v.into())) .collect(), }) }); @@ -40,79 +45,111 @@ impl agent_server::Agent for AgentService { } async fn get_sys_info(&self, _: Request<()>) -> AgentResult { - use sysinfo::{CpuExt, DiskExt, SystemExt}; + Ok(Response::new(SysInfoResponse::from( + &*self.info.lock().unwrap(), + ))) + } - let mut sys = crate::system::SYSTEM.lock().unwrap(); + type SysUpdateStream = Pin> + Send>>; - sys.refresh_specifics( - sysinfo::RefreshKind::new() - // .with_disks() // what is this? - .with_disks_list() - .with_memory() - .with_processes(sysinfo::ProcessRefreshKind::everything()) - .with_cpu(sysinfo::CpuRefreshKind::everything()), - ); + async fn sys_update( + &self, + req: Request, + ) -> AgentResult { + let dry_run = req.get_ref().dry_run; - let cpus = sys - .cpus() - .iter() - .map(|cpu| sys_info_response::Cpu { - freq_mhz: cpu.frequency(), - usage: cpu.cpu_usage(), + let mut receiver = + TaskBuilder::new("system update".to_owned()).health_monitor(self.health.clone()); + + if dry_run { + receiver = receiver + .add_step(async { Ok("simulating a system update...".to_owned()) }) + .add_step(async { + const DUR: std::time::Duration = std::time::Duration::from_secs(5); + tokio::time::sleep(DUR).await; + Ok("completed running an artifical delay...".to_owned()) + }); + } + + let receiver = receiver + .add_step(async move { + tokio::task::spawn_blocking(move || { + let output = debian::run_updates(dry_run).map_err(|err| { + tracing::error!(%err, "failed to run updates"); + err + })?; + + let out = if !output.status.success() { + tracing::error!(?output, "child process exited unsuccessfuly"); + + match output.status.code() { + Some(exit_code) => Err(Status::internal(format!( + "operation exited with error (code {exit_code})" + ))), + None => Err(Status::cancelled("operation was cancelled by signal")), + } + } else { + Ok(String::from_utf8_lossy(output.stdout.as_slice()).to_string()) + }; + + // TODO: We could split the output by lines and emit those as "steps" so the + // upgrade process is more interactive + out + }) + .await + .unwrap() }) - .collect(); + .build() + .into_background(); - let disks = sys - .disks() - .iter() - .map(|disk| sys_info_response::Disk { - name: disk.name().to_string_lossy().into_owned(), - total_bytes: disk.total_space(), - avail_bytes: disk.available_space(), - mount_point: disk.mount_point().to_string_lossy().into_owned(), - }) - .collect(); - - let response = Response::new(SysInfoResponse { - uptime: sys.uptime(), - hostname: sys.host_name().unwrap_or_default(), - os: sys.long_os_version().unwrap_or_default(), - mem_total_bytes: sys.total_memory(), - mem_avail_bytes: sys.available_memory(), - swap_total_bytes: sys.total_swap(), - swap_free_bytes: sys.free_swap(), - cpus, - disks, + let stream = ReceiverStream::new(receiver).map(|output| { + output + .map(|output| SysUpdateResponse { + output, + progress: 1, + }) + .map_err(|err| Status::internal(err.to_string())) }); - Ok(response) + Ok(Response::new(Box::pin(stream))) } type ExecStream = Pin> + Send>>; async fn exec(&self, req: Request) -> AgentResult { - use crate::system::exec::*; + use exec_response::Out; let ExecRequest { program, args } = req.get_ref(); - match exec(program, args) { - Ok(receiver) => { - let stream = ReceiverStream::new(receiver).map(|_inner| { - Ok(ExecResponse { - // TODO - response: None, - }) - }); + let mut command = tokio::process::Command::new(program) + .args(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; - Ok(Response::new(Box::pin(stream))) - } - Err(err) => Err(Status::failed_precondition(err.to_string())), - } + let stdout = + FramedRead::new(command.stdout.take().unwrap(), BytesCodec::new()).map(|stdout| { + let stdout = stdout.unwrap(); + Out::Stdout(String::from_utf8_lossy(&stdout[..]).to_string()) + }); + + let stderr = + FramedRead::new(command.stderr.take().unwrap(), BytesCodec::new()).map(|stderr| { + let stderr = stderr.unwrap(); + Out::Stderr(String::from_utf8_lossy(&stderr[..]).to_string()) + }); + + let exit = TaskBuilder::new(format!("exec {program}")) + .health_monitor(self.health.clone()) + .add_step(async move { command.wait().await.unwrap() }) + .build() + .into_stream(); + + let stream = stdout + .merge(stderr) + .chain(exit.map(|code| Out::ExitCode(code.code().unwrap_or_default()))) + .map(|out| Ok(ExecResponse { out: Some(out) })); + + Ok(Response::new(Box::pin(stream))) } } - -pub fn server(health_monitor: HealthMonitor) -> agent_server::AgentServer { - agent_server::AgentServer::new(AgentService { - health: health_monitor, - }) -} diff --git a/agent/src/server/mod.rs b/agent/src/server/mod.rs index bbf1f30..099a84a 100644 --- a/agent/src/server/mod.rs +++ b/agent/src/server/mod.rs @@ -1,16 +1,19 @@ use std::time::Duration; -use tokio::{signal, sync::oneshot, time::sleep}; +use tokio::{signal, sync::oneshot}; use tower_http::trace::TraceLayer; -use crate::system::health::HealthMonitor; +use crate::{ + server::{agent::AgentService, proto::agent_server}, + system::{self, health::HealthMonitor}, +}; mod agent; mod proto { tonic::include_proto!("prymn"); - impl From for SystemHealth { - fn from(val: crate::system::health::SystemHealth) -> Self { + impl From<&crate::system::health::SystemHealth> for SystemHealth { + fn from(val: &crate::system::health::SystemHealth) -> Self { if let crate::system::health::SystemStatus::Critical(ref reasons) = val.status { SystemHealth { status: itertools::join(reasons.iter().map(ToString::to_string), ","), @@ -23,13 +26,52 @@ mod proto { } } - impl From for TaskHealth { - fn from(val: crate::system::health::TaskHealth) -> Self { - TaskHealth { - status: val.status().to_string(), - message: val.message().to_owned(), - started_on: val.started_on().to_string(), - progress: val.progress() as i32, + impl From<&crate::system::task::TaskStatus> for TaskHealth { + fn from(value: &crate::system::task::TaskStatus) -> Self { + Self { + started_on: value.started_on().to_string(), + progress: value.progress(), + } + } + } + + impl From<&crate::system::info::Info> for SysInfoResponse { + fn from(info: &crate::system::info::Info) -> Self { + use sysinfo::{CpuExt, DiskExt, SystemExt}; + + let system = info.system(); + + let cpus = system + .cpus() + .iter() + .map(|cpu| sys_info_response::Cpu { + freq_mhz: cpu.frequency(), + usage: cpu.cpu_usage(), + }) + .collect(); + + let disks = system + .disks() + .iter() + .map(|disk| sys_info_response::Disk { + name: disk.name().to_string_lossy().into_owned(), + total_bytes: disk.total_space(), + avail_bytes: disk.available_space(), + mount_point: disk.mount_point().to_string_lossy().into_owned(), + }) + .collect(); + + Self { + uptime: system.uptime(), + hostname: system.host_name().unwrap_or_default(), + os: system.long_os_version().unwrap_or_default(), + mem_total_bytes: system.total_memory(), + mem_avail_bytes: system.available_memory(), + swap_total_bytes: system.total_swap(), + swap_free_bytes: system.free_swap(), + updates_available: info.updates().len() as u32, + cpus, + disks, } } } @@ -49,14 +91,26 @@ pub async fn run() -> anyhow::Result<()> { let _ = shutdown_tx.send(()); }); + let info = system::info::spawn_info_subsystem(); let health_monitor = HealthMonitor::new(); - let agent_service = agent::server(health_monitor.clone()); - tokio::spawn(async move { - loop { - health_monitor.check_system().await; - sleep(Duration::from_secs(1)).await; - } + // Monitor system info forever + // TODO: Maybe we can move it inside the server response function? + // We could spawn a new loop whenever we need it, but the problem is when does it get + // destroyed? + { + let health_monitor = health_monitor.clone(); + tokio::spawn(async move { + loop { + health_monitor.check_system_info(&info.lock().unwrap()); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + } + + let agent_service = agent_server::AgentServer::new(AgentService { + health: health_monitor.clone(), + info, }); let addr = "[::]:50012".parse()?; diff --git a/agent/src/system/exec.rs b/agent/src/system/exec.rs deleted file mode 100644 index 9336c9f..0000000 --- a/agent/src/system/exec.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::{ - ffi::OsStr, - process::{ExitStatus, Stdio}, -}; - -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - process::{Child, Command}, - sync::mpsc, -}; - -#[derive(Debug)] -pub enum ExecOutput { - Output { - stdout: Option, - stderr: Option, - }, - Exit(ExitStatus), - Error(String), -} - -pub fn exec(program: P, args: &[A]) -> anyhow::Result> -where - P: AsRef, - A: AsRef, -{ - let (tx, rx) = mpsc::channel(4); - - let command = Command::new(program) - .args(args) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - let fut = run_process(command, tx); - tokio::spawn(fut); - Ok(rx) -} - -async fn run_process(mut command: Child, sender: mpsc::Sender) { - let mut stdout = { - let stdout = command.stdout.take().expect("bug: no pipe for stdout"); - BufReader::new(stdout).lines() - }; - - let mut stderr = { - let stderr = command.stderr.take().expect("bug: no pipe for stderr"); - BufReader::new(stderr).lines() - }; - - loop { - match (stdout.next_line().await, stderr.next_line().await) { - (Ok(None), Ok(None)) => break, - (Ok(stdout), Ok(stderr)) => sender - .send(ExecOutput::Output { stdout, stderr }) - .await - .expect("stream closed"), - (Err(err), _) | (_, Err(err)) => sender - .send(ExecOutput::Error(err.to_string())) - .await - .expect("stream closed"), - } - } - - match command.wait().await { - Ok(exit_status) => sender - .send(ExecOutput::Exit(exit_status)) - .await - .expect("stream closed"), - Err(err) => sender - .send(ExecOutput::Error(err.to_string())) - .await - .expect("stream closed"), - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn exec_works() { - let mut rx = exec("echo", &["1\n2\n3"]).expect("to spawn command"); - - let mut outputs = vec![]; - while let Some(output) = rx.recv().await { - outputs.push(output); - } - - assert_eq!(outputs.len(), 4); - - let ExecOutput::Output { - ref stdout, - ref stderr, - } = outputs[0] - else { - panic!() - }; - assert_eq!(*stdout, Some("1".to_owned())); - assert_eq!(*stderr, None); - - let ExecOutput::Output { - ref stdout, - ref stderr, - } = outputs[1] - else { - panic!() - }; - assert_eq!(*stdout, Some("2".to_owned())); - assert_eq!(*stderr, None); - - let ExecOutput::Output { - ref stdout, - ref stderr, - } = outputs[2] - else { - panic!() - }; - assert_eq!(*stdout, Some("3".to_owned())); - assert_eq!(*stderr, None); - } -} diff --git a/agent/src/system/health.rs b/agent/src/system/health.rs index 7deb0db..1a0a1b6 100644 --- a/agent/src/system/health.rs +++ b/agent/src/system/health.rs @@ -1,10 +1,9 @@ //! System health module use std::{collections::HashMap, sync::Arc}; -use chrono::{DateTime, Utc}; use tokio::sync::watch; -use super::SYSTEM; +use super::{info::Info, task::TaskStatus}; const MEMORY_USAGE_CRITICAL_THRESHOLD: u64 = 90; const CPU_USAGE_CRITICAL_THRESHOLD: u64 = 90; @@ -31,180 +30,97 @@ pub struct SystemHealth { pub status: SystemStatus, } -#[derive(Clone, PartialEq, Debug)] -pub enum TaskStatus { - Normal, - Warning, - Error, - Completed, -} - -#[derive(Clone)] -pub struct TaskHealth { - status: TaskStatus, - started_on: DateTime, - message: String, - progress: u8, -} - -impl TaskHealth { - pub fn new(message: String) -> Self { - let started_on = chrono::Utc::now(); - - Self { - status: TaskStatus::Normal, - started_on, - message, - progress: 0, - } - } - - pub fn set_normal(&mut self, message: String) { - self.status = TaskStatus::Normal; - self.message = message; - } - - pub fn set_warning(&mut self, message: String) { - self.status = TaskStatus::Warning; - self.message = message; - } - - pub fn set_error(&mut self, message: String) { - self.status = TaskStatus::Error; - self.message = message; - } - - pub fn set_completed(mut self, message: String) { - self.status = TaskStatus::Completed; - self.progress = 100; - self.message = message; - } - - pub fn set_progress(&mut self, message: String, progress: u8) { - self.progress = progress; - self.message = message; - } - - pub fn status(&self) -> &TaskStatus { - &self.status - } - - pub fn started_on(&self) -> &DateTime { - &self.started_on - } - - pub fn message(&self) -> &str { - &self.message - } - - pub fn progress(&self) -> u8 { - self.progress - } -} - #[derive(Default, Clone)] pub struct Health { system: SystemHealth, - tasks: HashMap, + tasks: HashMap, } impl Health { - pub fn system(&self) -> SystemHealth { - self.system.clone() + pub fn system(&self) -> &SystemHealth { + &self.system } - pub fn tasks(self) -> HashMap { - self.tasks + pub fn tasks(&self) -> &HashMap { + &self.tasks } } -/// `HealthMonitor` gives access to shared system health state, allowing to watch health and update +/// [HealthMonitor] gives access to shared system health state, allowing to watch health and update /// task health status. /// /// # Usage -/// Internally `HealthMonitor` uses [Arc] so it can be cheaply cloned and shared. -/// -/// ```no_run -/// use prymn_agent::system::health::{HealthMonitor, TaskHealth}; +/// Internally it uses [Arc] so it can be cheaply cloned and shared. +/// ``` +/// use prymn_agent::system::health::HealthMonitor; +/// use prymn_agent::system::info::Info; /// +/// let mut info = Info::new(); /// let health_monitor = HealthMonitor::new(); -/// let health_monitor_clone = health_monitor.clone(); -/// tokio::spawn(async move { -/// loop { -/// health_monitor_clone.check_system().await; -/// } -/// }); -/// tokio::spawn(async move { -/// health_monitor.set_task_health( -/// "some_task".to_string(), -/// TaskHealth::new("example".to_string()) -/// ); -/// }); +/// +/// // Monitor health changes +/// let _receiver = health_monitor.monitor(); +/// +/// // Refresh system resources +/// info.refresh_resources(); +/// +/// // Update the health monitor with the refreshed info +/// health_monitor.check_system_info(&info); /// ``` #[derive(Clone)] pub struct HealthMonitor { sender: Arc>, - receiver: watch::Receiver, } impl HealthMonitor { pub fn new() -> Self { - let (sender, receiver) = watch::channel(Health::default()); + let (sender, _) = watch::channel(Health::default()); Self { sender: Arc::new(sender), - receiver, } } - // TODO: Remove async from here (so it can be consistent) - // Move system checking task into it's own thing - pub async fn check_system(&self) { + pub fn check_system_info(&self, info: &Info) { use sysinfo::{CpuExt, DiskExt, SystemExt}; - let status = tokio::task::spawn_blocking(|| { - let mut status = SystemStatus::Normal; + let sys = info.system(); + let mut status = SystemStatus::Normal; + let mut statuses = vec![]; - // TODO: For testability, dependency inject this System struct in this function. - let mut sys = SYSTEM.lock().unwrap(); + // Check for critical memory usage + let memory_usage = if sys.total_memory() > 0 { + sys.used_memory() * 100 / sys.total_memory() + } else { + 0 + }; - // Refresh system resources usage - sys.refresh_specifics( - sysinfo::RefreshKind::new() - .with_memory() - .with_disks() - .with_cpu(sysinfo::CpuRefreshKind::new().with_cpu_usage()), - ); + if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD { + statuses.push(CriticalReason::HighMemoryUsage); + } - let mut statuses = vec![]; + // Check for critical CPU usage + let cpu_usage = sys.global_cpu_info().cpu_usage(); - // Check for critical memory usage - let memory_usage = sys.used_memory() * 100 / sys.total_memory(); - if memory_usage > MEMORY_USAGE_CRITICAL_THRESHOLD { - statuses.push(CriticalReason::HighMemoryUsage); + if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD as f32 { + statuses.push(CriticalReason::HighCpuUsage); + } + + // Check for any disk usage that is critical + for disk in sys.disks() { + let available_disk = if disk.total_space() > 0 { + disk.available_space() * 100 / disk.total_space() + } else { + 0 + }; + + if available_disk < 100 - DISK_USAGE_CRITICAL_THRESHOLD { + statuses.push(CriticalReason::HighDiskUsage); } + } - // Check for critical CPU usage - let cpu_usage = sys.global_cpu_info().cpu_usage(); - if cpu_usage > CPU_USAGE_CRITICAL_THRESHOLD as f32 { - statuses.push(CriticalReason::HighCpuUsage); - } - - // Check for any disk usage that is critical - for disk in sys.disks() { - let available_disk = disk.available_space() * 100 / disk.total_space(); - if available_disk < 100 - DISK_USAGE_CRITICAL_THRESHOLD { - statuses.push(CriticalReason::HighDiskUsage); - } - } - - if !statuses.is_empty() { - status = SystemStatus::Critical(statuses); - } - - status - }) - .await - .expect("system checking task panicked - possibly due to panicked mutex lock"); + if !statuses.is_empty() { + status = SystemStatus::Critical(statuses); + } self.sender.send_if_modified(|Health { system, .. }| { if system.status == status { @@ -216,10 +132,23 @@ impl HealthMonitor { }); } - pub fn set_task_health(&self, task_name: String, health: TaskHealth) { - // Always send a notification in this case since it is an explicit action. - self.sender.send_modify(|Health { tasks, .. }| { - tasks.insert(task_name, health); + /// Spawns a new tokio task that tracks from the [watch::Receiver] the status of a Prymn task + /// via [TaskStatus] + pub fn track_task(&self, name: String, mut task_recv: watch::Receiver) { + let sender = self.sender.clone(); + + tokio::task::spawn(async move { + while task_recv.changed().await.is_ok() { + sender.send_modify(|health| { + health + .tasks + .insert(String::from(&name), task_recv.borrow().clone()); + }); + } + + // At this point the Sender part of the watch dropped, meaning we can clear the task + // because it is complete. + sender.send_if_modified(|health| health.tasks.remove(&name).is_some()); }); } @@ -229,13 +158,13 @@ impl HealthMonitor { } pub fn monitor(&self) -> watch::Receiver { - self.receiver.clone() + self.sender.subscribe() } } impl Default for HealthMonitor { fn default() -> Self { - HealthMonitor::new() + Self::new() } } @@ -259,44 +188,3 @@ impl std::fmt::Display for CriticalReason { } } } - -impl std::fmt::Display for TaskStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TaskStatus::Normal => write!(f, "normal"), - TaskStatus::Warning => write!(f, "warning"), - TaskStatus::Error => write!(f, "error"), - TaskStatus::Completed => write!(f, "completed"), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_task_monitor() { - let health_monitor = HealthMonitor::new(); - let receiver = health_monitor.monitor(); - - assert!(receiver.has_changed().is_ok_and(|changed| !changed)); - - let health = TaskHealth::new("this is normal".to_owned()); - health_monitor.set_task_health("some_task".to_string(), health); - - assert!(receiver.has_changed().is_ok_and(|changed| changed)); - - { - let health = receiver.borrow(); - let task_health = health.tasks.get("some_task").expect("a task should exist"); - - assert_eq!(task_health.status, TaskStatus::Normal); - assert_eq!(task_health.progress, 0); - assert_eq!(task_health.message, "this is normal"); - } - - health_monitor.clear_task("some_task"); - assert!(!receiver.borrow().tasks.contains_key("some_task")); - } -} diff --git a/agent/src/system/info.rs b/agent/src/system/info.rs new file mode 100644 index 0000000..4cbfd07 --- /dev/null +++ b/agent/src/system/info.rs @@ -0,0 +1,87 @@ +//! System info + +use std::{sync::Mutex, time::Duration}; + +use anyhow::Context; +use sysinfo::{CpuRefreshKind, SystemExt}; + +use crate::debian; + +pub struct Info { + system: sysinfo::System, + updates: Vec, +} + +impl Info { + pub fn new() -> Self { + Self { + system: sysinfo::System::new(), + updates: Vec::new(), + } + } + + pub fn refresh_resources(&mut self) { + self.system.refresh_specifics( + sysinfo::RefreshKind::new() + .with_disks_list() + .with_memory() + .with_cpu(CpuRefreshKind::new().with_cpu_usage()), + ); + } + + pub fn refresh_updates(&mut self) -> anyhow::Result<()> { + debian::update_package_index().context("while fetching the package index")?; + + let updates = + debian::get_available_updates().context("while fetching available updates")?; + + self.updates = updates; + + Ok(()) + } + + pub fn system(&self) -> &sysinfo::System { + &self.system + } + + pub fn updates(&self) -> &Vec { + &self.updates + } +} + +impl Default for Info { + fn default() -> Self { + Self::new() + } +} + +/// Spawns a new thread that forever gathers system information. +pub fn spawn_info_subsystem() -> &'static Mutex { + const REFRESH_RESOURCES_INTERVAL: Duration = Duration::from_secs(5); + const REFRESH_UPDATES_INTERVAL: Duration = Duration::from_secs(3600); + + let info = Box::new(Mutex::new(Info::new())); + let info = Box::leak(info); + + std::thread::spawn(|| loop { + tracing::debug!("refreshing system resources"); + + #[allow(clippy::mut_mutex_lock)] + info.lock().unwrap().refresh_resources(); + + std::thread::sleep(REFRESH_RESOURCES_INTERVAL); + }); + + std::thread::spawn(|| loop { + tracing::debug!("refreshing available system updates"); + + #[allow(clippy::mut_mutex_lock)] + if let Err(err) = info.lock().unwrap().refresh_updates() { + tracing::warn!(?err, "failed to refresh updates"); + } + + std::thread::sleep(REFRESH_UPDATES_INTERVAL); + }); + + info +} diff --git a/agent/src/system/mod.rs b/agent/src/system/mod.rs index 42b3aa8..0e753a6 100644 --- a/agent/src/system/mod.rs +++ b/agent/src/system/mod.rs @@ -1,12 +1,5 @@ //! System boundary and modules that interact with the operating system and programs. -use std::sync::Mutex; - -use once_cell::sync::Lazy; -use sysinfo::SystemExt; - -pub mod exec; pub mod health; - -// TODO: Make this mock-able so we can test code that interacts with it -pub static SYSTEM: Lazy> = Lazy::new(|| Mutex::new(sysinfo::System::new())); +pub mod info; +pub mod task; diff --git a/agent/src/system/task.rs b/agent/src/system/task.rs new file mode 100644 index 0000000..323c6ea --- /dev/null +++ b/agent/src/system/task.rs @@ -0,0 +1,154 @@ +//! A task is an atomic executing routine that the agent is running, potentially in the background. +//! The task is tracked by the system monitor. + +// TODO: Take a look at futures::stream::FuturesOrdered +// It is used to store futures in an ordered fashion, and it also implements Stream + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use chrono::{DateTime, Utc}; +use tokio::sync::{mpsc, watch}; +use tokio_stream::{Stream, StreamExt}; + +use super::health::HealthMonitor; + +#[derive(Clone, Default)] +pub struct TaskStatus { + started_on: DateTime, + curr_step: usize, + max_steps: usize, +} + +impl TaskStatus { + /// Returns the task progress as a percentage value + pub fn progress(&self) -> f32 { + 100.0 * (self.curr_step as f32 / self.max_steps as f32) + } + + /// Returns the datetime when this task began executing + pub fn started_on(&self) -> &DateTime { + &self.started_on + } + + fn next_step(&mut self) { + if self.curr_step < self.max_steps { + self.curr_step += 1; + } + } +} + +type BoxFuture = Pin + Send>>; + +pub struct TaskBuilder { + task: Task, +} + +impl TaskBuilder { + pub fn new(name: String) -> Self { + let (sender, _) = watch::channel(TaskStatus::default()); + + Self { + task: Task { + name, + health_monitor: None, + status_channel: sender, + steps: Vec::new(), + }, + } + } + + /// Attaches a health monitor to notify the health system on progress made. + pub fn health_monitor(mut self, health_monitor: HealthMonitor) -> Self { + self.task.health_monitor = Some(health_monitor); + self + } + + pub fn build(self) -> Task { + self.task + } +} + +impl TaskBuilder> { + pub fn add_step(mut self, step: impl Future + Send + 'static) -> Self { + self.task.add_step(step); + self + } +} + +pub struct Task { + name: String, + health_monitor: Option, + status_channel: watch::Sender, + steps: Vec, +} + +impl Task> { + fn add_step(&mut self, step: impl Future + Send + 'static) { + self.steps.push(Box::pin(step)) + } + + /// Turn this Task into an object that implements [Stream]. + /// + /// The new stream will output each step's future output. + pub fn into_stream(self) -> TaskStream { + if let Some(health) = &self.health_monitor { + health.track_task(self.name.clone(), self.status_channel.subscribe()); + } + + // Immediately notify the initial status (step 0) + self.status_channel.send_replace(TaskStatus { + started_on: Utc::now(), + curr_step: 0, + max_steps: self.steps.len(), + }); + + TaskStream { inner: self } + } + + /// Run this task concurrently in the background. + /// + /// Returns a [mpsc::Receiver] which receives the returned values of each step's future + /// output. + pub fn into_background(self) -> mpsc::Receiver { + let (sender, receiver) = mpsc::channel(10); + + tokio::spawn(async move { + let mut stream = self.into_stream(); + while let Some(value) = stream.next().await { + let _ = sender.send(value).await; + } + }); + + receiver + } +} + +pub struct TaskStream { + inner: Task>, +} + +impl Stream for TaskStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.steps.get_mut(0) { + Some(fut) => match fut.as_mut().poll(cx) { + Poll::Ready(value) => { + self.inner.steps.remove(0); + + self.inner + .status_channel + .send_modify(|task| task.next_step()); + + Poll::Ready(Some(value)) + } + Poll::Pending => Poll::Pending, + }, + None => Poll::Ready(None), + } + } +} diff --git a/agent/tests/task_health.rs b/agent/tests/task_health.rs new file mode 100644 index 0000000..6db7fff --- /dev/null +++ b/agent/tests/task_health.rs @@ -0,0 +1,18 @@ +use prymn_agent::system::{health::HealthMonitor, task::TaskBuilder}; + +#[tokio::test] +async fn task_is_gone_from_health_monitor_when_complete() { + let health_monitor = HealthMonitor::new(); + let health_recv = health_monitor.monitor(); + + let mut task_recv = TaskBuilder::new("test task".to_owned()) + .health_monitor(health_monitor) + .add_step(async { "foo" }) + .add_step(async { "bar" }) + .build() + .into_background(); + + assert_eq!(task_recv.recv().await.unwrap(), "foo"); + assert_eq!(task_recv.recv().await.unwrap(), "bar"); + assert!(health_recv.borrow().tasks().is_empty()); +} diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index 162cb54..cedb2ce 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -38,20 +38,31 @@ defmodule Prymn.Agents do Health.lookup(host_address) end - # TODO: We should not expose this api, instead wrap every GRPC call in this - # module GRPC is an "internal implementation detail" (although it probably - # wont ever change) - # - # E.g. - # def get_sys_info(agent) do - # PrymnProto.Prymn.Agent.Stub.get_sys_info(agent.channel, %Google.Protobuf.Empty{}) - # end - def get_channel(host_address) do - with [{pid, _}] <- Registry.lookup(Prymn.Agents.Registry, host_address), - channel when channel != nil <- Connection.get_channel(pid) do - {:ok, channel} - else - _ -> {:error, :not_found} + @doc """ + Get the system's information (CPU, Memory usage, etc.). + """ + def get_sys_info(host_address) do + lookup_connection(host_address) + |> Connection.get_sys_info() + end + + @doc """ + Perform a system update. + + ## Asynchronous call + Messages are sent to the caller in the form of the struct: + + %PrymnProto.Prymn.SysUpdateResponse{} + """ + def sys_update(host_address, dry_run) when is_boolean(dry_run) do + lookup_connection(host_address) + |> Connection.sys_update(dry_run) + end + + defp lookup_connection(host_address) when is_binary(host_address) do + case Registry.lookup(Prymn.Agents.Registry, host_address) do + [{pid, _}] -> pid + [] -> nil end end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 5b4a56d..9f5ec9b 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -2,22 +2,29 @@ defmodule Prymn.Agents.Connection do @moduledoc false alias Prymn.Agents.Health + alias PrymnProto.Prymn.Agent.Stub require Logger use GenServer, restart: :transient @timeout :timer.minutes(2) - @spec start_link(String.t()) :: GenServer.on_start() def start_link(host_address) do GenServer.start_link(__MODULE__, host_address, name: via(host_address)) end - @spec get_channel(pid) :: GRPC.Channel.t() | nil def get_channel(server) do GenServer.call(server, :get_channel) end + def get_sys_info(server) when is_pid(server) do + GenServer.call(server, :get_sys_info) + end + + def sys_update(server, dry_run) when is_pid(server) and is_boolean(dry_run) do + GenServer.call(server, {:sys_update, dry_run}) + end + ## ## Server callbacks ## @@ -46,40 +53,53 @@ defmodule Prymn.Agents.Connection do pid = self() Task.start_link(fn -> - {:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{}) + case Stub.health(channel, %Google.Protobuf.Empty{}) do + {:ok, stream} -> + # Read from the stream forever and send data back to parent + stream + |> Stream.each(fn {_, data} -> send(pid, data) end) + |> Enum.take_while(fn _ -> true end) - # Read from the stream forever and send data back to parent - stream - |> Stream.each(fn {_, data} -> send(pid, data) end) - |> Enum.take_while(fn _ -> true end) + {:error, _rpcerror} -> + send(pid, {:connect_error, :rpc_error}) + end end) {:noreply, state} end - @impl true - def handle_cast(_, state) do - {:noreply, state} - end - @impl true def handle_call(:get_channel, _from, {_, channel} = state) do {:reply, channel, state, @timeout} end + def handle_call(:get_sys_info, _from, {_, channel} = state) do + reply = Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) + {:reply, reply, state, @timeout} + end + + def handle_call({:sys_update, dry_run}, {from, _}, {_, channel} = state) do + request = %PrymnProto.Prymn.SysUpdateRequest{dry_run: dry_run} + streaming_call(fn -> Stub.sys_update(channel, request) end, from) + {:reply, :ok, state, @timeout} + end + @impl true def handle_info(%GRPC.Channel{} = channel, {host, _}) do {:noreply, {host, channel}, {:continue, :health}} end def handle_info({:connect_error, reason}, {host, _} = state) do - if reason == :timeout do - Health.lookup(host, default: true) - |> Health.make_timed_out() - |> Health.update_and_broadcast() - end + health = Health.lookup(host, default: true) - {:stop, reason, state} + case reason do + :timeout -> Health.make_timed_out(health) + :rpc_error -> Health.make_disconnected(health) + end + |> Health.update_and_broadcast() + + # NOTE: Here we terminate normally, which means we won't be retrying. Maybe we want to? + {:stop, :normal, state} end def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do @@ -102,7 +122,7 @@ defmodule Prymn.Agents.Connection do end def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do - Health.lookup(host) + Health.lookup(host, default: true) |> Health.make_disconnected() |> Health.update_and_broadcast() @@ -136,4 +156,18 @@ defmodule Prymn.Agents.Connection do receive_loop(pid) end + + defp streaming_call(fun, from) do + Task.start_link(fn -> + case fun.() do + {:ok, stream} -> + stream + |> Stream.each(fn {:ok, data} -> send(from, data) end) + |> Enum.to_list() + + {:error, _error} -> + :todo + end + end) + end end diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex index f6dcedd..9769250 100644 --- a/app/lib/prymn/agents/health.ex +++ b/app/lib/prymn/agents/health.ex @@ -5,13 +5,14 @@ defmodule Prymn.Agents.Health do getting depleted, or if it's unable be reached. """ - defstruct [:host, :version, message: "Unknown"] + defstruct [:host, :version, :status, tasks: [], message: "Unknown"] alias PrymnProto.Prymn.HealthResponse @type t :: %{ host: String.t(), version: String.t(), + status: atom(), message: String.t() } @@ -51,15 +52,15 @@ defmodule Prymn.Agents.Health do end def make_timed_out(%__MODULE__{} = health) do - %__MODULE__{health | message: "Connect timed out"} + %__MODULE__{health | status: :unreachable, message: "Connect timed out"} end def make_disconnected(%__MODULE__{} = health) do - %__MODULE__{health | message: "Disconnected"} + %__MODULE__{health | status: :disconnected, message: "Disconnected"} end def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do - %__MODULE__{host: host} + %__MODULE__{host: host, status: :connected} |> do_version(version) |> do_system(system) |> do_tasks(tasks) @@ -76,7 +77,13 @@ defmodule Prymn.Agents.Health do end end - defp do_tasks(health, _tasks) do - health + defp do_tasks(health, tasks) do + tasks = + Enum.map(tasks, fn {task_key, task_value} -> + progress = Float.round(task_value.progress, 2) + {task_key, %{task_value | progress: "#{progress}%"}} + end) + + %__MODULE__{health | tasks: tasks} end end diff --git a/app/lib/prymn_web/components/core_components.ex b/app/lib/prymn_web/components/core_components.ex index e005c89..12b0e4e 100644 --- a/app/lib/prymn_web/components/core_components.ex +++ b/app/lib/prymn_web/components/core_components.ex @@ -212,8 +212,9 @@ defmodule PrymnWeb.CoreComponents do def button(assigns) do assigns = assign(assigns, :style, [ - "phx-submit-loading:opacity-75 rounded-lg bg-zinc-900 hover:bg-zinc-700 py-2 px-3", - "text-sm font-semibold leading-6 text-white active:text-white/80", + "inline-flex items-center rounded-lg bg-zinc-900 hover:bg-zinc-700 py-2 px-3", + "font-semibold leading-6 text-sm text-white active:text-white/80", + "phx-submit-loading:opacity-75 phx-click-loading:opacity-75 disabled:cursor-not-allowed", assigns.class ]) @@ -228,7 +229,8 @@ defmodule PrymnWeb.CoreComponents do _ -> ~H""" """ end @@ -594,6 +596,30 @@ defmodule PrymnWeb.CoreComponents do """ end + @doc """ + Renders a spinner. + """ + attr :class, :string, default: nil + attr :rest, :global + + def spinner(assigns) do + ~H""" +
+ + + + +
+ """ + end + ## JS Commands def show(js \\ %JS{}, selector) do diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 3f86cdd..be9c520 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -58,29 +58,29 @@ defmodule PrymnWeb.ServerLive.Index do case {assigns.status, assigns.health} do {:unregistered, _} -> ~H""" - Needs registration + Needs registration """ {:registered, nil} -> ~H""" - Connecting... + <.spinner class="w-5" /> """ - {:registered, %Agents.Health{message: "Connected"}} -> + {:registered, %Agents.Health{status: :connected}} -> ~H""" - Connected + Connected """ - {:registered, %Agents.Health{message: "Disconnected"}} -> + {:registered, %Agents.Health{status: :disconnected}} -> ~H""" - Disconnected + Disconnected """ {:registered, %Agents.Health{message: message}} -> assigns = assign(assigns, :message, message) ~H""" - <%= @message %> + <%= @message %> """ end end diff --git a/app/lib/prymn_web/live/server_live/index.html.heex b/app/lib/prymn_web/live/server_live/index.html.heex index dcd7ec3..2039b9b 100644 --- a/app/lib/prymn_web/live/server_live/index.html.heex +++ b/app/lib/prymn_web/live/server_live/index.html.heex @@ -16,12 +16,19 @@ >

<%= server.name %>

- - <.server_status status={server.status} health={@healths[server.public_ip]} /> - + <.server_status status={server.status} health={@healths[server.public_ip]} />
-
+
IP: <%= server.public_ip || "N/A" %> + + <%= for {name, task} <- Enum.take(@healths[server.public_ip].tasks, 1) do %> +
In progress: <%= name %>
+
<%= task.progress %>
+ <% end %> +
diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex index 9120c99..5eb0683 100644 --- a/app/lib/prymn_web/live/server_live/show.ex +++ b/app/lib/prymn_web/live/server_live/show.ex @@ -1,6 +1,7 @@ defmodule PrymnWeb.ServerLive.Show do use PrymnWeb, :live_view + require Logger alias Prymn.{Agents, Servers} @impl true @@ -11,12 +12,11 @@ defmodule PrymnWeb.ServerLive.Show do @impl true def handle_params(%{"id" => id}, _, socket) do server = Servers.get_server!(id) - pid = self() if connected?(socket) and server.status == :registered do Agents.subscribe_to_health(server.public_ip) Agents.start_connection(server.public_ip) - Task.start_link(fn -> get_sys_info(pid, server.public_ip) end) + send(self(), :get_sys_info) end health = Agents.get_health(server.public_ip) @@ -26,32 +26,83 @@ defmodule PrymnWeb.ServerLive.Show do |> assign(:health, health || %{message: "Connecting..."}) |> assign(:page_title, server.name) |> assign(:server, server) - |> assign(:uptime, 0) - |> assign(:cpus, []) - |> assign(:used_disk, 0) - |> assign(:total_memory, 0) - |> assign(:used_memory, 0) + |> assign(:dry_run, false) + |> assign(:update_output, []) + |> assign(:sys_info, nil) + # TODO: Do not assign this to the socket - instead generate it in the HTML |> assign(:registration_command, Servers.create_setup_command(server))} end @impl true + def handle_info(:get_sys_info, socket) do + status = get_in(socket.assigns, [:health, Access.key(:status)]) + host_address = get_in(socket.assigns, [:server, Access.key(:public_ip)]) + pid = self() + + if host_address != nil and status == :connected do + Task.start_link(fn -> + {:ok, sys_info} = Agents.get_sys_info(host_address) + send(pid, sys_info) + end) + end + + # 10 seconds is >5 which is gun's timeout duration (which might have a race + # condition if they are equal) + Process.send_after(self(), :get_sys_info, :timer.seconds(10)) + + {:noreply, socket} + end + def handle_info(%PrymnProto.Prymn.SysInfoResponse{} = response, socket) do + # TODO: GRPC calls should be done through wrapper functions. Necessary + # calculations should be done then and there. {:noreply, socket - |> assign(:uptime, response.uptime) - |> assign( - :used_memory, - bytes_to_gigabytes(response.mem_total_bytes - response.mem_avail_bytes) - ) - |> assign(:total_memory, bytes_to_gigabytes(response.mem_total_bytes)) - |> assign(:used_disk, calculate_disk_used_percent(response.disks)) + |> assign(:sys_info, response) + |> assign(:updates_available, response.updates_available) |> assign(:cpus, response.cpus)} end + def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do + output = String.split(response.output, "\n") + socket = assign(socket, :update_output, output) + {:noreply, socket} + end + def handle_info(%Agents.Health{} = health, socket) do {:noreply, assign(socket, :health, health)} end + @impl true + def handle_event("system_update", _params, socket) do + host_address = get_in(socket.assigns, [:server, Access.key(:public_ip)]) + server_name = get_in(socket.assigns, [:server, Access.key(:name)]) + + socket = + if host_address do + Agents.sys_update(host_address, socket.assigns.dry_run) + put_flash(socket, :info, "Started a system update on server #{server_name}.") + else + put_flash( + socket, + :error, + "Could not perform the update. Your server does not seem to have an address" + ) + end + + {:noreply, socket} + end + + def handle_event("change_dry_run", %{"dry_run" => enabled}, socket) do + enabled = (enabled == "true" && true) || false + {:noreply, assign(socket, :dry_run, enabled)} + end + + defp calculate_cpu_usage(cpus) do + (Enum.reduce(cpus, 0, fn x, acc -> x.usage + acc end) / Enum.count(cpus)) + |> Float.round(2) + end + defp bytes_to_gigabytes(bytes) do Float.round(bytes / Integer.pow(1024, 3), 2) end @@ -66,17 +117,4 @@ defmodule PrymnWeb.ServerLive.Show do Float.round(100 * used / total, 2) end - - defp get_sys_info(from, host_address) do - alias PrymnProto.Prymn.Agent - - with {:ok, channel} <- Agents.get_channel(host_address), - {:ok, reply} <- Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) do - send(from, reply) - end - - Process.sleep(:timer.seconds(5)) - - get_sys_info(from, host_address) - end end diff --git a/app/lib/prymn_web/live/server_live/show.html.heex b/app/lib/prymn_web/live/server_live/show.html.heex index 0cfc367..b381b89 100644 --- a/app/lib/prymn_web/live/server_live/show.html.heex +++ b/app/lib/prymn_web/live/server_live/show.html.heex @@ -1,28 +1,41 @@ <.header> - - <%= case @health.message do %> - <% "Connected" -> %> - - - <% "Disconnected" -> %> - - <% _ -> %> - - <% end %> + + Server <%= @server.name %> + + <%= case @health.message do %> + <% "Connected" -> %> + + + <% "Disconnected" -> %> + + <% _ -> %> + + <% end %> + - Server <%= @server.name %> + <:subtitle> + <%= @server.public_ip %> + -
+
+ <%= for {name, task} <- @health.tasks do %> + Background task in progress: <%= name %> +

<%= task.progress %> complete

+ <% end %> +
+ +

Connect to your server using root credentials and execute the following command:

@@ -40,34 +53,58 @@ />
-
+ -
-
-

<%= @uptime || "" %>s

-

Uptime

-
-
-

<%= Enum.count(@cpus || []) %>

-

CPUs

-
-
-

- <%= @used_memory || 0 %> / <%= @total_memory || 0 %> - GiB +

+
+
+

<%= @sys_info.uptime || "" %>s

+

Uptime

+
+
+

<%= Enum.count(@sys_info.cpus || []) %>

+

CPUs

+
+
+

<%= calculate_cpu_usage(@sys_info.cpus) %>

+

CPU%

+
+
+

+ <%= bytes_to_gigabytes(@sys_info.mem_total_bytes - @sys_info.mem_avail_bytes) %> + / + <%= bytes_to_gigabytes(@sys_info.mem_total_bytes) %> + GiB +

+

Memory

+
+
+

+ <%= calculate_disk_used_percent(@sys_info.disks) %> + % +

+

Used Disk

+
+
+ +
+
+ <.input type="checkbox" name="dry_run" value={@dry_run} label="Enable dry-run operations" /> +
+
+ +
+

System

+

+ Updates: <%= @sys_info.updates_available %> pending updates. + <.button type="button" class="ml-4" phx-click="system_update"> + Update now + +

+ <%= output %> +

-

Memory

-
-
-

- <%= @used_disk %> - % -

-

Used Disk

-
-
+ + <.back navigate={~p"/servers"}>Back to servers diff --git a/proto/agent.proto b/proto/agent.proto index 6bcb6c3..6c1e998 100644 --- a/proto/agent.proto +++ b/proto/agent.proto @@ -10,10 +10,8 @@ message SystemHealth { } message TaskHealth { - string status = 1; - string message = 2; - string started_on = 3; - int32 progress = 4; + string started_on = 1; + float progress = 2; } message HealthResponse { @@ -44,6 +42,7 @@ message SysInfoResponse { uint64 swap_free_bytes = 7; repeated Cpu cpus = 8; repeated Disk disks = 9; + uint32 updates_available = 10; } message ExecRequest { @@ -52,20 +51,26 @@ message ExecRequest { } message ExecResponse { - message Output { + oneof out { string stdout = 1; string stderr = 2; - } - - oneof response { - Output output = 1; - int32 exit_code = 2; string error = 3; + int32 exit_code = 4; } } +message SysUpdateRequest { + bool dry_run = 1; +} + +message SysUpdateResponse { + string output = 1; + int32 progress = 2; +} + service Agent { rpc Health(google.protobuf.Empty) returns (stream HealthResponse); - rpc GetSysInfo(google.protobuf.Empty) returns (SysInfoResponse); rpc Exec(ExecRequest) returns (stream ExecResponse); + rpc GetSysInfo(google.protobuf.Empty) returns (SysInfoResponse); + rpc SysUpdate(SysUpdateRequest) returns (stream SysUpdateResponse); }