From 26ba60b95dfe4d81495b021524610ad284d28e8d Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Mon, 28 Aug 2023 23:32:42 +0300 Subject: [PATCH] app: refactor the Connection to make it asynchronous when connecting Some work has been done on making the Connection feel nicer, but also more work is needed to not have the channel be exposed to the upper layers of the application. We should wrap all the GRPC calls in the GenServer (which may also allow caching on certain calls such as get_sys_info) --- agent/src/server/agent.rs | 3 +- app/lib/prymn/agents.ex | 57 +++++---- app/lib/prymn/agents/connection.ex | 109 ++++++++++-------- app/lib/prymn/agents/health.ex | 84 ++++++++------ app/lib/prymn/servers/server.ex | 2 - app/lib/prymn_web/live/server_live/index.ex | 38 +++--- .../live/server_live/index.html.heex | 2 +- app/lib/prymn_web/live/server_live/show.ex | 38 +++++- .../prymn_web/live/server_live/show.html.heex | 29 ++++- 9 files changed, 231 insertions(+), 131 deletions(-) diff --git a/agent/src/server/agent.rs b/agent/src/server/agent.rs index f99352d..72f3baf 100644 --- a/agent/src/server/agent.rs +++ b/agent/src/server/agent.rs @@ -46,7 +46,8 @@ impl agent_server::Agent for AgentService { sys.refresh_specifics( sysinfo::RefreshKind::new() - .with_disks() + // .with_disks() // what is this? + .with_disks_list() .with_memory() .with_processes(sysinfo::ProcessRefreshKind::everything()) .with_cpu(sysinfo::CpuRefreshKind::everything()), diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index fbf7c87..162cb54 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -7,42 +7,51 @@ defmodule Prymn.Agents do """ - @doc """ - Establishes a dynamically supervised, background connection with the target - host agent, keeping it alive if it is already started. - """ - @spec start_connection_or_keep_alive(String.t()) :: :ok - def start_connection_or_keep_alive(host_address) do - spec = {Prymn.Agents.Connection, host_address} + alias Prymn.Agents.Connection + alias Prymn.Agents.Health + + def start_connection(host_address) do + spec = {Connection, host_address} case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do - {:error, {:already_started, pid}} -> - :ok = Prymn.Agents.Connection.keep_alive(pid) - :ok - - {:ok, _pid} -> - :ok + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, error} -> {:error, error} end end @doc """ Subscribe to the host's Health using Phoenix.PubSub Broadcasted messages are - in the form of: + the Health struct: - {host_address, %Prymn.Agents.Health{}} - - ## Returns - - This function returns the last health status of the Agent that was saved. + %Prymn.Agents.Health{} """ - @spec subscribe_to_health(String.t()) :: Prymn.Agents.Health.t() def subscribe_to_health(host_address) do - Prymn.Agents.Health.subscribe(host_address) - Prymn.Agents.Health.lookup(host_address) + :ok = Health.subscribe(host_address) end - @spec get_channel(String.t()) :: GRPC.Channel.t() + @doc """ + Return the last known health status of the Agent, or `nil` if it doesn't + exist. + """ + def get_health(host_address) do + Health.lookup(host_address) + end + + # TODO: We should not expose this api, instead wrap every GRPC call in this + # module GRPC is an "internal implementation detail" (although it probably + # wont ever change) + # + # E.g. + # def get_sys_info(agent) do + # PrymnProto.Prymn.Agent.Stub.get_sys_info(agent.channel, %Google.Protobuf.Empty{}) + # end def get_channel(host_address) do - Prymn.Agents.Connection.get_channel(host_address) + with [{pid, _}] <- Registry.lookup(Prymn.Agents.Registry, host_address), + channel when channel != nil <- Connection.get_channel(pid) do + {:ok, channel} + else + _ -> {:error, :not_found} + end end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 3416653..7576653 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -1,6 +1,7 @@ defmodule Prymn.Agents.Connection do @moduledoc false + alias Prymn.Agents.Health require Logger use GenServer, restart: :transient @@ -8,39 +9,36 @@ defmodule Prymn.Agents.Connection do @timeout :timer.minutes(2) @spec start_link(String.t()) :: GenServer.on_start() - def start_link(addr) when is_binary(addr) do - GenServer.start_link(__MODULE__, addr, name: via(addr)) + def start_link(host_address) do + GenServer.start_link(__MODULE__, host_address, name: via(host_address)) end - @spec keep_alive(pid | String.t()) :: :ok - def keep_alive(server) when is_pid(server), do: GenServer.cast(server, :keep_alive) - def keep_alive(server) when is_binary(server), do: GenServer.cast(via(server), :keep_alive) - - @spec stop(pid | String.t()) :: :ok - def stop(server) when is_pid(server), do: GenServer.stop(server, :shutdown) - def stop(server) when is_binary(server), do: GenServer.stop(via(server), :shutdown) - - @spec get_channel(String.t()) :: GRPC.Channel.t() + @spec get_channel(pid) :: GRPC.Channel.t() | nil def get_channel(server) do - GenServer.call(via(server), :get_channel) + GenServer.call(server, :get_channel) end + ## + ## Server callbacks + ## + @impl true def init(host) do - Process.flag(:trap_exit, true) - {:ok, {host, nil}, {:continue, :connect}} - end + # Process.flag(:trap_exit, true) + pid = self() - @impl true - def handle_continue(:connect, {host_address, _} = state) do - case GRPC.Stub.connect(host_address, 50012, []) do - {:ok, channel} -> - keep_alive(self()) - {:noreply, {host_address, channel}, {:continue, :health}} + # Start a connection without blocking the GenServer + Task.start_link(fn -> + case GRPC.Stub.connect(host, 50012, []) do + {:ok, channel} -> send(pid, channel) + {:error, error} -> send(pid, {:connect_error, error}) + end - {:error, error} -> - {:stop, error, state} - end + # Keep receiving and sending back any messages to the GenServer forever + receive_loop(pid) + end) + + {:ok, {host, nil}} end @impl true @@ -56,7 +54,12 @@ defmodule Prymn.Agents.Connection do |> Enum.take_while(fn _ -> true end) end) - {:noreply, state, @timeout} + {:noreply, state} + end + + @impl true + def handle_cast(_, state) do + {:noreply, state} end @impl true @@ -65,37 +68,44 @@ defmodule Prymn.Agents.Connection do end @impl true - def handle_cast(:keep_alive, state) do - {:noreply, state, @timeout} + def handle_info(%GRPC.Channel{} = channel, {host, _}) do + {:noreply, {host, channel}, {:continue, :health}} + end + + def handle_info({:connect_error, reason}, {host, _} = state) do + if reason == :timeout do + Health.lookup(host, default: true) + |> Health.make_timed_out() + |> Health.update_and_broadcast() + end + + {:stop, reason, state} end - @impl true def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do response - |> Prymn.Agents.Health.new_from_proto() - |> Prymn.Agents.Health.update(host) + |> Health.make_from_proto(host) + |> Health.update_and_broadcast() {:noreply, state, @timeout} end - def handle_info(%GRPC.RPCError{} = response, {host, _} = state) do + def handle_info(%GRPC.RPCError{} = response, state) do Logger.debug("received a GRPC error: #{inspect(response)}") - - response - |> Prymn.Agents.Health.new_from_proto() - |> Prymn.Agents.Health.update(host) - - {:noreply, state, @timeout} + {:noreply, state} end def handle_info({:gun_up, _pid, _protocol}, state) do - # TODO: If it's possible for the GRPC connection to be down when we receive - # this message, we should `{:continue, state.channel.host}` + # NOTE: If it's possible for the GRPC connection to be down when we receive + # this message, maybe we should restart the connection {:noreply, state, {:continue, :health}} end def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do - Logger.debug("disconnected from #{host}") + Health.lookup(host) + |> Health.make_disconnected() + |> Health.update_and_broadcast() + {:noreply, state, @timeout} end @@ -103,24 +113,27 @@ defmodule Prymn.Agents.Connection do {:stop, {:shutdown, :timeout}, state} end - def handle_info({:EXIT, _from, _reason}, state) do - # TODO: How to handle this... (this happens when a linked process exists - # e.g the one with the health stream) - {:noreply, state, @timeout} - end - def handle_info(msg, state) do - Logger.warning("received unexpected message: #{inspect(msg)}") - {:noreply, state, @timeout} + Logger.debug("received unhandled message #{inspect(msg)}") + {:noreply, state} end @impl true def terminate(reason, {host, channel}) do Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})") + Health.delete(host) if channel, do: GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end + + defp receive_loop(pid) do + receive do + msg -> send(pid, msg) + end + + receive_loop(pid) + end end diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex index 56639b4..f6dcedd 100644 --- a/app/lib/prymn/agents/health.ex +++ b/app/lib/prymn/agents/health.ex @@ -1,66 +1,82 @@ defmodule Prymn.Agents.Health do - @doc """ - The Health struct keeps simple health information of - whether or not the target host machine is up to date, has any tasks running, - its resources are getting depleted, or if it's unable be reached. + @moduledoc """ + The Health struct keeps simple health information of whether or not the + target host machine is up to date, has any tasks running, its resources are + getting depleted, or if it's unable be reached. """ - defstruct [:version, message: "Unknown"] + defstruct [:host, :version, message: "Unknown"] alias PrymnProto.Prymn.HealthResponse @type t :: %{ + host: String.t(), version: String.t(), message: String.t() } - @doc false def start() do :ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true]) end - @doc false - def subscribe(host_address) do - Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host_address}") + def subscribe(host) do + Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host}") end - @doc false - def update(health, host_address) do - :ets.insert(__MODULE__, {host_address, health}) - Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host_address}", {host_address, health}) + def broadcast!(%__MODULE__{host: host} = health) do + Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host}", health) end - @doc false - def lookup(host_address) do + def update_and_broadcast(nil) do + nil + end + + def update_and_broadcast(%__MODULE__{host: host} = health) do + :ets.insert(__MODULE__, {host, health}) + broadcast!(health) + end + + def delete(host_address) do + :ets.delete(__MODULE__, host_address) + end + + def lookup(host_address, opts \\ []) do + default = Keyword.get(opts, :default, false) + case :ets.lookup(__MODULE__, host_address) do [{^host_address, value}] -> value + [] when default -> %__MODULE__{host: host_address} [] -> nil end end - @doc false - def new_from_proto(proto_health) do - case proto_health do - %HealthResponse{} = health -> - from_health(health) + def make_timed_out(%__MODULE__{} = health) do + %__MODULE__{health | message: "Connect timed out"} + end - %GRPC.RPCError{message: ":stream_error: :closed"} -> - %__MODULE__{message: "Disconnected"} + def make_disconnected(%__MODULE__{} = health) do + %__MODULE__{health | message: "Disconnected"} + end - %GRPC.RPCError{} = error -> - require Logger - Logger.error("unhandled GRPC error received in Health module: #{inspect(error)}") - %__MODULE__{message: "Error retrieving server status"} + def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do + %__MODULE__{host: host} + |> do_version(version) + |> do_system(system) + |> do_tasks(tasks) + end + + defp do_version(health, version) do + %__MODULE__{health | version: version} + end + + defp do_system(health, system) do + case system.status do + "normal" -> %__MODULE__{health | message: "Connected"} + status -> %__MODULE__{health | message: "Alert: #{status}"} end end - defp from_health(%HealthResponse{system: system, version: version}) do - case system.status do - "normal" -> - %__MODULE__{message: "Connected", version: version} - - status -> - %__MODULE__{message: status, version: version} - end + defp do_tasks(health, _tasks) do + health end end diff --git a/app/lib/prymn/servers/server.ex b/app/lib/prymn/servers/server.ex index 3798098..4075f0e 100644 --- a/app/lib/prymn/servers/server.ex +++ b/app/lib/prymn/servers/server.ex @@ -12,8 +12,6 @@ defmodule Prymn.Servers.Server do values: [:unregistered, :registered], default: :unregistered - field :connection_status, :string, virtual: true - timestamps() end diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 24176cc..8039289 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -1,7 +1,6 @@ defmodule PrymnWeb.ServerLive.Index do require Logger - alias Prymn.Agents.Health alias Prymn.{Servers, Agents} use PrymnWeb, :live_view @@ -13,8 +12,9 @@ defmodule PrymnWeb.ServerLive.Index do healths = if connected?(socket) do for %Servers.Server{status: :registered, public_ip: ip} <- servers, into: %{} do - Agents.start_connection_or_keep_alive(ip) - {ip, Agents.subscribe_to_health(ip)} + Agents.subscribe_to_health(ip) + Agents.start_connection(ip) + {ip, Agents.get_health(ip)} end else %{} @@ -44,8 +44,8 @@ defmodule PrymnWeb.ServerLive.Index do |> update(:servers, fn servers -> [server | servers] end)} end - def handle_info({host, %Prymn.Agents.Health{} = health}, socket) do - healths = Map.put(socket.assigns.healths, host, health) + def handle_info(%Agents.Health{} = health, socket) do + healths = Map.put(socket.assigns.healths, health.host, health) {:noreply, assign(socket, :healths, healths)} end @@ -55,22 +55,32 @@ defmodule PrymnWeb.ServerLive.Index do end defp server_status(assigns) do - case {assigns.server, assigns.health} do - {%{status: :registered}, nil} -> + case {assigns.status, assigns.health} do + {:unregistered, _} -> ~H""" - Unknown + Needs registration """ - {%{status: :registered}, %Health{message: message}} -> - assigns = assign(assigns, :status, message) - + {:registered, nil} -> ~H""" - <%= @status %> + Connecting... """ - {_, _} -> + {:registered, %Agents.Health{message: "Connected"}} -> ~H""" - Not registered + Connected + """ + + {:registered, %Agents.Health{message: "Disconnected"}} -> + ~H""" + Disconnected + """ + + {:registered, %Agents.Health{message: message}} -> + assigns = assign(assigns, :message, message) + + ~H""" + <%= @message %> """ end end diff --git a/app/lib/prymn_web/live/server_live/index.html.heex b/app/lib/prymn_web/live/server_live/index.html.heex index 1dcdefa..14a9028 100644 --- a/app/lib/prymn_web/live/server_live/index.html.heex +++ b/app/lib/prymn_web/live/server_live/index.html.heex @@ -19,7 +19,7 @@

<%= server.name %>

- <.server_status server={server} health={@healths[server.public_ip]} /> + <.server_status status={server.status} health={@healths[server.public_ip]} />
diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex index c4fd82d..9120c99 100644 --- a/app/lib/prymn_web/live/server_live/show.ex +++ b/app/lib/prymn_web/live/server_live/show.ex @@ -13,17 +13,22 @@ defmodule PrymnWeb.ServerLive.Show do server = Servers.get_server!(id) pid = self() - if connected?(socket) do - Agents.start_connection_or_keep_alive(server.public_ip) + if connected?(socket) and server.status == :registered do + Agents.subscribe_to_health(server.public_ip) + Agents.start_connection(server.public_ip) Task.start_link(fn -> get_sys_info(pid, server.public_ip) end) end + health = Agents.get_health(server.public_ip) + {:noreply, socket + |> assign(:health, health || %{message: "Connecting..."}) |> assign(:page_title, server.name) |> assign(:server, server) |> assign(:uptime, 0) |> assign(:cpus, []) + |> assign(:used_disk, 0) |> assign(:total_memory, 0) |> assign(:used_memory, 0) |> assign(:registration_command, Servers.create_setup_command(server))} @@ -39,18 +44,39 @@ defmodule PrymnWeb.ServerLive.Show do bytes_to_gigabytes(response.mem_total_bytes - response.mem_avail_bytes) ) |> assign(:total_memory, bytes_to_gigabytes(response.mem_total_bytes)) + |> assign(:used_disk, calculate_disk_used_percent(response.disks)) |> assign(:cpus, response.cpus)} end + def handle_info(%Agents.Health{} = health, socket) do + {:noreply, assign(socket, :health, health)} + end + defp bytes_to_gigabytes(bytes) do Float.round(bytes / Integer.pow(1024, 3), 2) end + defp calculate_disk_used_percent(disks) do + alias PrymnProto.Prymn.SysInfoResponse.Disk + + {used, total} = + Enum.reduce(disks, {0, 0}, fn %Disk{} = disk, {used, total} -> + {used + disk.total_bytes - disk.avail_bytes, total + disk.total_bytes} + end) + + Float.round(100 * used / total, 2) + end + defp get_sys_info(from, host_address) do - channel = Agents.get_channel(host_address) - {:ok, reply} = PrymnProto.Prymn.Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) - send(from, reply) - Process.sleep(:timer.seconds(2)) + alias PrymnProto.Prymn.Agent + + with {:ok, channel} <- Agents.get_channel(host_address), + {:ok, reply} <- Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) do + send(from, reply) + end + + Process.sleep(:timer.seconds(5)) + get_sys_info(from, host_address) end end diff --git a/app/lib/prymn_web/live/server_live/show.html.heex b/app/lib/prymn_web/live/server_live/show.html.heex index e10ac16..8d56c80 100644 --- a/app/lib/prymn_web/live/server_live/show.html.heex +++ b/app/lib/prymn_web/live/server_live/show.html.heex @@ -1,5 +1,25 @@ <.header> - Server <%= @server.name %> + + <%= case @health.message do %> + <% "Connected" -> %> + + + <% "Disconnected" -> %> + + <% _ -> %> + + <% end %> + + Server <%= @server.name %>
@@ -38,6 +58,13 @@

Memory

+
+

+ <%= @used_disk %> + % +

+

Used Disk

+
<.back navigate={~p"/servers"}>Back to servers