From ecf03b8f93ba423e2e608eb77f12153e37edafe9 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Tue, 27 Jun 2023 22:28:00 +0300 Subject: [PATCH] backend: add "agents" context Manages connection processes to a remote agent --- agent/src/bin/prymn_agent.rs | 4 +- agent/src/server/mod.rs | 19 +++- agent/src/server/rpc.rs | 2 +- backend/lib/prymn/agents.ex | 51 ++++++++++ backend/lib/prymn/agents/connection.ex | 95 +++++++++++++++++++ backend/lib/prymn/application.ex | 7 +- .../lib/prymn_web/live/server_live/index.ex | 4 + backend/mix.exs | 1 + backend/mix.lock | 1 + 9 files changed, 176 insertions(+), 8 deletions(-) create mode 100644 backend/lib/prymn/agents.ex create mode 100644 backend/lib/prymn/agents/connection.ex diff --git a/agent/src/bin/prymn_agent.rs b/agent/src/bin/prymn_agent.rs index 4d000f7..153e7c3 100644 --- a/agent/src/bin/prymn_agent.rs +++ b/agent/src/bin/prymn_agent.rs @@ -15,8 +15,8 @@ async fn main() -> anyhow::Result<()> { shutdown_tx.send(()).expect("bug: channel closed"); }); - let addr = "0.0.0.0:5012".parse()?; - tracing::info!("Serving new agent at address: {}", addr); + let addr = "0.0.0.0:50012".parse()?; + tracing::info!(message = "starting new agent", %addr); new_server() .serve_with_shutdown(addr, async { diff --git a/agent/src/server/mod.rs b/agent/src/server/mod.rs index 7dd54f7..1a2f9be 100644 --- a/agent/src/server/mod.rs +++ b/agent/src/server/mod.rs @@ -18,14 +18,24 @@ type Result = std::result::Result; #[tonic::async_trait] impl rpc::agent_server::Agent for Server { + #[tracing::instrument(skip(self))] async fn echo(&self, req: Request) -> Result> { Ok(Response::new(rpc::EchoResponse { message: req.into_inner().message, })) } + #[tracing::instrument(skip(self))] async fn get_sys_info(&self, _: Request<()>) -> Result> { - let sys = self.sys.lock().unwrap(); + let mut sys = self.sys.lock().unwrap(); + + sys.refresh_specifics( + sysinfo::RefreshKind::new() + .with_disks() + .with_memory() + .with_processes(sysinfo::ProcessRefreshKind::everything()) + .with_cpu(sysinfo::CpuRefreshKind::everything()), + ); let cpus = sys .cpus() @@ -64,6 +74,7 @@ impl rpc::agent_server::Agent for Server { type ExecStream = Pin> + Send + Sync>>; + #[tracing::instrument(skip(self))] async fn exec(&self, req: Request) -> Result> { use exec::*; @@ -89,7 +100,11 @@ pub fn new_server() -> Router { sys: Arc::new(Mutex::new(System::new_all())), }; - tonic::transport::Server::builder().add_service(rpc::agent_server::AgentServer::new(server)) + let service = rpc::agent_server::AgentServer::new(server); + + tonic::transport::Server::builder() + .trace_fn(|_| tracing::info_span!("agent_server")) + .add_service(service) } #[cfg(test)] diff --git a/agent/src/server/rpc.rs b/agent/src/server/rpc.rs index 7b3b329..241538e 100644 --- a/agent/src/server/rpc.rs +++ b/agent/src/server/rpc.rs @@ -1 +1 @@ -tonic::include_proto!("agent"); +tonic::include_proto!("prymn"); diff --git a/backend/lib/prymn/agents.ex b/backend/lib/prymn/agents.ex new file mode 100644 index 0000000..caaf190 --- /dev/null +++ b/backend/lib/prymn/agents.ex @@ -0,0 +1,51 @@ +defmodule Prymn.Agents do + @moduledoc ~S""" + Prymn Agents are programs that manage a remote client machine. Prymn backend + communicates with them using GRPC calls. GRPC connections are started using + the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the + Prymn.Agents.Registry. + + ## Examples + + TODO + """ + + @doc """ + Ensures a connection with the Prymn Agent exists and is kept in memory. + + Returns `:ok` when a new connection is successfuly established or is already established + + Returns `{:error, reason}` when the connection could not be established + """ + @spec ensure_connection(String.t()) :: :ok | {:error, term} + def ensure_connection(address) do + child = {Prymn.Agents.Connection, address} + + case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, error} -> {:error, error} + end + end + + @doc """ + Terminates the process and drops the connection gracefully. + """ + @spec drop_connection(String.t()) :: :ok | {:error, :not_found} + def drop_connection(address) do + :ok = Prymn.Agents.Connection.drop(address) + catch + :exit, _ -> {:error, :not_found} + end + + @doc """ + Get the channel for the given `address`. The channel is used to make GRPC + calls. + """ + @spec get_channel(String.t()) :: GRPC.Channel.t() | {:error, :not_found} + def get_channel(address) do + Prymn.Agents.Connection.get_channel(address) + catch + :exit, _ -> {:error, :not_found} + end +end diff --git a/backend/lib/prymn/agents/connection.ex b/backend/lib/prymn/agents/connection.ex new file mode 100644 index 0000000..1b75b8f --- /dev/null +++ b/backend/lib/prymn/agents/connection.ex @@ -0,0 +1,95 @@ +defmodule Prymn.Agents.Connection do + @moduledoc false + + defstruct [:channel, up?: false] + + @ping_interval 20000 + + use GenServer, restart: :transient + + require Logger + + @spec start_link(String.t()) :: GenServer.on_start() + def start_link(addr) do + GenServer.start_link(__MODULE__, addr, name: via(addr)) + end + + @spec get_channel(String.t()) :: GRPC.Channel.t() + def get_channel(addr) do + GenServer.call(via(addr), :get_channel) + end + + @spec drop(String.t()) :: :ok + def drop(addr) do + GenServer.stop(via(addr), :shutdown) + end + + @impl true + def init(addr) do + case GRPC.Stub.connect(addr) do + {:ok, channel} -> + Logger.info("Starting new connection at address #{addr}") + + state = %__MODULE__{channel: channel, up?: true} + + Process.send_after(self(), :do_healthcheck, @ping_interval) + {:ok, state} + + {:error, error} -> + {:stop, error} + end + end + + @impl true + def handle_call(:get_channel, _from, state) do + {:reply, state.channel, state} + end + + @impl true + def handle_info({:gun_up, _pid, _protocol}, state) do + Logger.info("regained connection (#{inspect(state.channel)})") + + {:noreply, %{state | up?: true}} + end + + @impl true + def handle_info({:gun_down, _pid, _protocol, reason, _killed_streams}, state) do + Logger.info("lost connection (reason: #{inspect(reason)}, #{inspect(state.channel)})") + + {:noreply, %{state | up?: false}} + end + + @impl true + def handle_info(:do_healthcheck, %{channel: channel} = state) do + request = %PrymnProto.Prymn.EchoRequest{message: "hello"} + + case PrymnProto.Prymn.Agent.Stub.echo(channel, request) do + {:ok, _reply} -> + :noop + + # IO.inspect(reply) + + {:error, error} -> + Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}") + end + + Process.send_after(self(), :do_healthcheck, @ping_interval) + {:noreply, state} + end + + @impl true + def handle_info(msg, state) do + Logger.info("received unexpected message: #{inspect(msg)}") + + {:noreply, state} + end + + @impl true + def terminate(_reason, %{channel: channel}) do + GRPC.Stub.disconnect(channel) + end + + defp via(name) do + {:via, Registry, {Prymn.Agents.Registry, name}} + end +end diff --git a/backend/lib/prymn/application.ex b/backend/lib/prymn/application.ex index b1ab597..c44afd6 100644 --- a/backend/lib/prymn/application.ex +++ b/backend/lib/prymn/application.ex @@ -17,9 +17,10 @@ defmodule Prymn.Application do # Start Finch {Finch, name: Prymn.Finch}, # Start the Endpoint (http/https) - PrymnWeb.Endpoint - # Start a worker by calling: Prymn.Worker.start_link(arg) - # {Prymn.Worker, arg} + PrymnWeb.Endpoint, + # Start the prymn agent (grpc) registry and the supervisor + {Registry, keys: :unique, name: Prymn.Agents.Registry}, + {DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/backend/lib/prymn_web/live/server_live/index.ex b/backend/lib/prymn_web/live/server_live/index.ex index d896183..40dc3a0 100644 --- a/backend/lib/prymn_web/live/server_live/index.ex +++ b/backend/lib/prymn_web/live/server_live/index.ex @@ -5,6 +5,10 @@ defmodule PrymnWeb.ServerLive.Index do @impl true def mount(_params, _session, socket) do + # Run this for every server: + # make sure an agent connection is made (async "cheap" request) + # then wait for events + # pubsub will eventually send a connected or a disconnected (and anything else) event {:ok, stream(socket, :servers, Servers.list_servers())} end diff --git a/backend/mix.exs b/backend/mix.exs index 33dd4d9..4f8b9d6 100644 --- a/backend/mix.exs +++ b/backend/mix.exs @@ -49,6 +49,7 @@ defmodule Prymn.MixProject do {:plug_cowboy, "~> 2.5"}, {:grpc, github: "elixir-grpc/grpc", ref: "691ac2146eac1691e703e31985765f042ec5e91a"}, {:protobuf, "~> 0.12.0"}, + {:google_protos, "~> 0.3.0"}, # Test {:floki, ">= 0.30.0", only: :test}, diff --git a/backend/mix.lock b/backend/mix.lock index c5d94ef..5910e2c 100644 --- a/backend/mix.lock +++ b/backend/mix.lock @@ -15,6 +15,7 @@ "finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"}, "floki": {:hex, :floki, "0.34.3", "5e2dcaec5d7c228ce5b1d3501502e308b2d79eb655e4191751a1fe491c37feac", [:mix], [], "hexpm", "9577440eea5b97924b4bf3c7ea55f7b8b6dce589f9b28b096cc294a8dc342341"}, "gettext": {:hex, :gettext, "0.22.2", "6bfca374de34ecc913a28ba391ca184d88d77810a3e427afa8454a71a51341ac", [:mix], [{:expo, "~> 0.4.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "8a2d389673aea82d7eae387e6a2ccc12660610080ae7beb19452cfdc1ec30f60"}, + "google_protos": {:hex, :google_protos, "0.3.0", "15faf44dce678ac028c289668ff56548806e313e4959a3aaf4f6e1ebe8db83f4", [:mix], [{:protobuf, "~> 0.10", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "1f6b7fb20371f72f418b98e5e48dae3e022a9a6de1858d4b254ac5a5d0b4035f"}, "grpc": {:git, "https://github.com/elixir-grpc/grpc.git", "691ac2146eac1691e703e31985765f042ec5e91a", [ref: "691ac2146eac1691e703e31985765f042ec5e91a"]}, "gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},