diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index c0c6139..fbf7c87 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -2,18 +2,47 @@ defmodule Prymn.Agents do @moduledoc ~S""" Prymn Agents are programs that manage a remote client machine. Prymn backend communicates with them using GRPC calls. GRPC connections are started using - the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the + the Prymn.Agents.ConnectionSupervisor and are book-kept using the Prymn.Agents.Registry. + """ - def start_connection_or_keep_alive(ip) do - case Registry.lookup(Prymn.Agents.Registry, ip) do - [{pid, _}] -> - Prymn.Agents.Connection.keep_alive(pid) + @doc """ + Establishes a dynamically supervised, background connection with the target + host agent, keeping it alive if it is already started. + """ + @spec start_connection_or_keep_alive(String.t()) :: :ok + def start_connection_or_keep_alive(host_address) do + spec = {Prymn.Agents.Connection, host_address} - [] -> - child = {Prymn.Agents.Connection, ip} - {:ok, _pid} = DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) + case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do + {:error, {:already_started, pid}} -> + :ok = Prymn.Agents.Connection.keep_alive(pid) + :ok + + {:ok, _pid} -> + :ok end end + + @doc """ + Subscribe to the host's Health using Phoenix.PubSub Broadcasted messages are + in the form of: + + {host_address, %Prymn.Agents.Health{}} + + ## Returns + + This function returns the last health status of the Agent that was saved. + """ + @spec subscribe_to_health(String.t()) :: Prymn.Agents.Health.t() + def subscribe_to_health(host_address) do + Prymn.Agents.Health.subscribe(host_address) + Prymn.Agents.Health.lookup(host_address) + end + + @spec get_channel(String.t()) :: GRPC.Channel.t() + def get_channel(host_address) do + Prymn.Agents.Connection.get_channel(host_address) + end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 3ee50d3..3416653 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -1,97 +1,122 @@ defmodule Prymn.Agents.Connection do @moduledoc false - @timer_interval 120_000 - - defstruct [:channel, :timer_ref] - require Logger use GenServer, restart: :transient + @timeout :timer.minutes(2) + @spec start_link(String.t()) :: GenServer.on_start() - def start_link(addr) do + def start_link(addr) when is_binary(addr) do GenServer.start_link(__MODULE__, addr, name: via(addr)) end - @spec keep_alive(pid) :: :ok - def keep_alive(pid) do - GenServer.cast(pid, :reset_timer) + @spec keep_alive(pid | String.t()) :: :ok + def keep_alive(server) when is_pid(server), do: GenServer.cast(server, :keep_alive) + def keep_alive(server) when is_binary(server), do: GenServer.cast(via(server), :keep_alive) + + @spec stop(pid | String.t()) :: :ok + def stop(server) when is_pid(server), do: GenServer.stop(server, :shutdown) + def stop(server) when is_binary(server), do: GenServer.stop(via(server), :shutdown) + + @spec get_channel(String.t()) :: GRPC.Channel.t() + def get_channel(server) do + GenServer.call(via(server), :get_channel) end @impl true def init(host) do Process.flag(:trap_exit, true) - {:ok, %__MODULE__{}, {:continue, host}} + {:ok, {host, nil}, {:continue, :connect}} end @impl true - def handle_continue(host, state) when is_binary(host) do - case GRPC.Stub.connect(host, 50012, []) do + def handle_continue(:connect, {host_address, _} = state) do + case GRPC.Stub.connect(host_address, 50012, []) do {:ok, channel} -> - GenServer.cast(self(), :reset_timer) - {:noreply, %__MODULE__{channel: channel}, {:continue, :health}} + keep_alive(self()) + {:noreply, {host_address, channel}, {:continue, :health}} {:error, error} -> - {:stop, {:error, error}, state} + {:stop, error, state} end end @impl true - def handle_continue(:health, state) do + def handle_continue(:health, {_, channel} = state) do pid = self() Task.start_link(fn -> - {:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(state.channel, %Google.Protobuf.Empty{}) + {:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{}) + # Read from the stream forever and send data back to parent stream - |> Stream.each(fn health -> send(pid, {:health, health}) end) + |> Stream.each(fn {_, data} -> send(pid, data) end) |> Enum.take_while(fn _ -> true end) end) - {:noreply, state} + {:noreply, state, @timeout} end @impl true - def handle_cast(:reset_timer, state) do - if state.timer_ref, do: Process.cancel_timer(state.timer_ref) - ref = Process.send_after(self(), :drop_connection, @timer_interval) - {:noreply, put_in(state.timer_ref, ref)} + def handle_call(:get_channel, _from, {_, channel} = state) do + {:reply, channel, state, @timeout} end @impl true - def handle_info(:drop_connection, state) do - Logger.debug("shutting down connection with agent host #{inspect(state.channel.host)}") - {:stop, :shutdown, state} + def handle_cast(:keep_alive, state) do + {:noreply, state, @timeout} end @impl true - def handle_info({:health, health}, state) do - IO.inspect(health) - {:noreply, state} + def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do + response + |> Prymn.Agents.Health.new_from_proto() + |> Prymn.Agents.Health.update(host) + + {:noreply, state, @timeout} + end + + def handle_info(%GRPC.RPCError{} = response, {host, _} = state) do + Logger.debug("received a GRPC error: #{inspect(response)}") + + response + |> Prymn.Agents.Health.new_from_proto() + |> Prymn.Agents.Health.update(host) + + {:noreply, state, @timeout} end - @impl true def handle_info({:gun_up, _pid, _protocol}, state) do # TODO: If it's possible for the GRPC connection to be down when we receive # this message, we should `{:continue, state.channel.host}` {:noreply, state, {:continue, :health}} end - @impl true - def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do - Logger.debug("disconnected from #{inspect(channel)}") - {:noreply, state} + def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do + Logger.debug("disconnected from #{host}") + {:noreply, state, @timeout} + end + + def handle_info(:timeout, state) do + {:stop, {:shutdown, :timeout}, state} + end + + def handle_info({:EXIT, _from, _reason}, state) do + # TODO: How to handle this... (this happens when a linked process exists + # e.g the one with the health stream) + {:noreply, state, @timeout} end - @impl true def handle_info(msg, state) do Logger.warning("received unexpected message: #{inspect(msg)}") - {:noreply, state} + {:noreply, state, @timeout} end @impl true - def terminate(_reason, %{channel: channel}) do + def terminate(reason, {host, channel}) do + Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})") if channel, do: GRPC.Stub.disconnect(channel) end diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex new file mode 100644 index 0000000..56639b4 --- /dev/null +++ b/app/lib/prymn/agents/health.ex @@ -0,0 +1,66 @@ +defmodule Prymn.Agents.Health do + @doc """ + The Health struct keeps simple health information of + whether or not the target host machine is up to date, has any tasks running, + its resources are getting depleted, or if it's unable be reached. + """ + + defstruct [:version, message: "Unknown"] + + alias PrymnProto.Prymn.HealthResponse + + @type t :: %{ + version: String.t(), + message: String.t() + } + + @doc false + def start() do + :ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true]) + end + + @doc false + def subscribe(host_address) do + Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host_address}") + end + + @doc false + def update(health, host_address) do + :ets.insert(__MODULE__, {host_address, health}) + Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host_address}", {host_address, health}) + end + + @doc false + def lookup(host_address) do + case :ets.lookup(__MODULE__, host_address) do + [{^host_address, value}] -> value + [] -> nil + end + end + + @doc false + def new_from_proto(proto_health) do + case proto_health do + %HealthResponse{} = health -> + from_health(health) + + %GRPC.RPCError{message: ":stream_error: :closed"} -> + %__MODULE__{message: "Disconnected"} + + %GRPC.RPCError{} = error -> + require Logger + Logger.error("unhandled GRPC error received in Health module: #{inspect(error)}") + %__MODULE__{message: "Error retrieving server status"} + end + end + + defp from_health(%HealthResponse{system: system, version: version}) do + case system.status do + "normal" -> + %__MODULE__{message: "Connected", version: version} + + status -> + %__MODULE__{message: status, version: version} + end + end +end diff --git a/app/lib/prymn/agents/supervisor.ex b/app/lib/prymn/agents/supervisor.ex new file mode 100644 index 0000000..51055c9 --- /dev/null +++ b/app/lib/prymn/agents/supervisor.ex @@ -0,0 +1,27 @@ +defmodule Prymn.Agents.Supervisor do + @moduledoc false + + use Supervisor + + @dynamic_supervisor Prymn.Agents.ConnectionSupervisor + + def start_link(init_arg) do + Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl true + def init(_init_arg) do + children = [ + # The registry will be used to register `Connection` processes with their + # name as their address + {Registry, keys: :unique, name: Prymn.Agents.Registry}, + # Dynamically start `Connection` processes + {DynamicSupervisor, name: @dynamic_supervisor, strategy: :one_for_one, max_seconds: 60} + ] + + # Register a "health" table that stores in-memory any agent health data + Prymn.Agents.Health.start() + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/app/lib/prymn/application.ex b/app/lib/prymn/application.ex index feda28d..51e7a87 100644 --- a/app/lib/prymn/application.ex +++ b/app/lib/prymn/application.ex @@ -18,9 +18,8 @@ defmodule Prymn.Application do {Finch, name: Prymn.Finch}, # Start the Endpoint (http/https) PrymnWeb.Endpoint, - # Start the prymn agent (grpc) registry and the supervisor - {Registry, keys: :unique, name: Prymn.Agents.Registry}, - {DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one, max_seconds: 60} + # Start the Agents (dynamic GRPC connections) supervisor + Prymn.Agents.Supervisor ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 4cbf0f1..24176cc 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -1,5 +1,7 @@ defmodule PrymnWeb.ServerLive.Index do require Logger + + alias Prymn.Agents.Health alias Prymn.{Servers, Agents} use PrymnWeb, :live_view @@ -8,13 +10,20 @@ defmodule PrymnWeb.ServerLive.Index do def mount(_params, _session, socket) do servers = Servers.list_servers() - if connected?(socket) do - for %Servers.Server{status: :registered, public_ip: ip} <- servers do - Agents.start_connection_or_keep_alive(ip) + healths = + if connected?(socket) do + for %Servers.Server{status: :registered, public_ip: ip} <- servers, into: %{} do + Agents.start_connection_or_keep_alive(ip) + {ip, Agents.subscribe_to_health(ip)} + end + else + %{} end - end - {:ok, assign(socket, :servers, servers)} + {:ok, + socket + |> assign(:servers, servers) + |> assign(:healths, healths)} end @impl true @@ -35,21 +44,34 @@ defmodule PrymnWeb.ServerLive.Index do |> update(:servers, fn servers -> [server | servers] end)} end - @impl true - def handle_info({:healthcheck, ip, _message}, socket) do - servers = - update_in( - socket.assigns.servers, - [Access.filter(&match?(%{public_ip: ^ip}, &1))], - &Map.merge(&1, %{connection_status: "Connected"}) - ) - - {:noreply, assign(socket, :servers, servers)} + def handle_info({host, %Prymn.Agents.Health{} = health}, socket) do + healths = Map.put(socket.assigns.healths, host, health) + {:noreply, assign(socket, :healths, healths)} end - @impl true def handle_info(msg, state) do Logger.debug("received unexpected message #{inspect(msg)}") {:noreply, state} end + + defp server_status(assigns) do + case {assigns.server, assigns.health} do + {%{status: :registered}, nil} -> + ~H""" + Unknown + """ + + {%{status: :registered}, %Health{message: message}} -> + assigns = assign(assigns, :status, message) + + ~H""" + <%= @status %> + """ + + {_, _} -> + ~H""" + Not registered + """ + end + end end diff --git a/app/lib/prymn_web/live/server_live/index.html.heex b/app/lib/prymn_web/live/server_live/index.html.heex index 61c8e28..1dcdefa 100644 --- a/app/lib/prymn_web/live/server_live/index.html.heex +++ b/app/lib/prymn_web/live/server_live/index.html.heex @@ -19,11 +19,7 @@

<%= server.name %>

- <%= if server.status == :registered do %> - <%= server.connection_status || "..." %> - <% else %> - Not registered - <% end %> + <.server_status server={server} health={@healths[server.public_ip]} />
diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex index fd48621..c4fd82d 100644 --- a/app/lib/prymn_web/live/server_live/show.ex +++ b/app/lib/prymn_web/live/server_live/show.ex @@ -1,7 +1,7 @@ defmodule PrymnWeb.ServerLive.Show do use PrymnWeb, :live_view - alias Prymn.Servers + alias Prymn.{Agents, Servers} @impl true def mount(_params, _session, socket) do @@ -11,11 +11,46 @@ defmodule PrymnWeb.ServerLive.Show do @impl true def handle_params(%{"id" => id}, _, socket) do server = Servers.get_server!(id) + pid = self() + + if connected?(socket) do + Agents.start_connection_or_keep_alive(server.public_ip) + Task.start_link(fn -> get_sys_info(pid, server.public_ip) end) + end {:noreply, socket |> assign(:page_title, server.name) |> assign(:server, server) + |> assign(:uptime, 0) + |> assign(:cpus, []) + |> assign(:total_memory, 0) + |> assign(:used_memory, 0) |> assign(:registration_command, Servers.create_setup_command(server))} end + + @impl true + def handle_info(%PrymnProto.Prymn.SysInfoResponse{} = response, socket) do + {:noreply, + socket + |> assign(:uptime, response.uptime) + |> assign( + :used_memory, + bytes_to_gigabytes(response.mem_total_bytes - response.mem_avail_bytes) + ) + |> assign(:total_memory, bytes_to_gigabytes(response.mem_total_bytes)) + |> assign(:cpus, response.cpus)} + end + + defp bytes_to_gigabytes(bytes) do + Float.round(bytes / Integer.pow(1024, 3), 2) + end + + defp get_sys_info(from, host_address) do + channel = Agents.get_channel(host_address) + {:ok, reply} = PrymnProto.Prymn.Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) + send(from, reply) + Process.sleep(:timer.seconds(2)) + get_sys_info(from, host_address) + end end diff --git a/app/lib/prymn_web/live/server_live/show.html.heex b/app/lib/prymn_web/live/server_live/show.html.heex index 528ae9a..e10ac16 100644 --- a/app/lib/prymn_web/live/server_live/show.html.heex +++ b/app/lib/prymn_web/live/server_live/show.html.heex @@ -22,4 +22,22 @@
+
+
+

<%= @uptime || "" %>s

+

Uptime

+
+
+

<%= Enum.count(@cpus || []) %>

+

CPUs

+
+
+

+ <%= @used_memory || 0 %> / <%= @total_memory || 0 %> + GiB +

+

Memory

+
+
+ <.back navigate={~p"/servers"}>Back to servers diff --git a/app/mix.exs b/app/mix.exs index 32d1fbb..d8d4ed7 100644 --- a/app/mix.exs +++ b/app/mix.exs @@ -20,7 +20,7 @@ defmodule Prymn.MixProject do def application do [ mod: {Prymn.Application, []}, - extra_applications: [:logger, :runtime_tools] + extra_applications: [:logger, :runtime_tools, :os_mon] ] end