diff --git a/agent/src/server/agent.rs b/agent/src/server/agent.rs index f99352d..72f3baf 100644 --- a/agent/src/server/agent.rs +++ b/agent/src/server/agent.rs @@ -46,7 +46,8 @@ impl agent_server::Agent for AgentService { sys.refresh_specifics( sysinfo::RefreshKind::new() - .with_disks() + // .with_disks() // what is this? + .with_disks_list() .with_memory() .with_processes(sysinfo::ProcessRefreshKind::everything()) .with_cpu(sysinfo::CpuRefreshKind::everything()), diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index fbf7c87..162cb54 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -7,42 +7,51 @@ defmodule Prymn.Agents do """ - @doc """ - Establishes a dynamically supervised, background connection with the target - host agent, keeping it alive if it is already started. - """ - @spec start_connection_or_keep_alive(String.t()) :: :ok - def start_connection_or_keep_alive(host_address) do - spec = {Prymn.Agents.Connection, host_address} + alias Prymn.Agents.Connection + alias Prymn.Agents.Health + + def start_connection(host_address) do + spec = {Connection, host_address} case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do - {:error, {:already_started, pid}} -> - :ok = Prymn.Agents.Connection.keep_alive(pid) - :ok - - {:ok, _pid} -> - :ok + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, error} -> {:error, error} end end @doc """ Subscribe to the host's Health using Phoenix.PubSub Broadcasted messages are - in the form of: + the Health struct: - {host_address, %Prymn.Agents.Health{}} - - ## Returns - - This function returns the last health status of the Agent that was saved. + %Prymn.Agents.Health{} """ - @spec subscribe_to_health(String.t()) :: Prymn.Agents.Health.t() def subscribe_to_health(host_address) do - Prymn.Agents.Health.subscribe(host_address) - Prymn.Agents.Health.lookup(host_address) + :ok = Health.subscribe(host_address) end - @spec get_channel(String.t()) :: GRPC.Channel.t() + @doc """ + Return the last known health status of the Agent, or `nil` if it doesn't + exist. + """ + def get_health(host_address) do + Health.lookup(host_address) + end + + # TODO: We should not expose this api, instead wrap every GRPC call in this + # module GRPC is an "internal implementation detail" (although it probably + # wont ever change) + # + # E.g. + # def get_sys_info(agent) do + # PrymnProto.Prymn.Agent.Stub.get_sys_info(agent.channel, %Google.Protobuf.Empty{}) + # end def get_channel(host_address) do - Prymn.Agents.Connection.get_channel(host_address) + with [{pid, _}] <- Registry.lookup(Prymn.Agents.Registry, host_address), + channel when channel != nil <- Connection.get_channel(pid) do + {:ok, channel} + else + _ -> {:error, :not_found} + end end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 3416653..7576653 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -1,6 +1,7 @@ defmodule Prymn.Agents.Connection do @moduledoc false + alias Prymn.Agents.Health require Logger use GenServer, restart: :transient @@ -8,39 +9,36 @@ defmodule Prymn.Agents.Connection do @timeout :timer.minutes(2) @spec start_link(String.t()) :: GenServer.on_start() - def start_link(addr) when is_binary(addr) do - GenServer.start_link(__MODULE__, addr, name: via(addr)) + def start_link(host_address) do + GenServer.start_link(__MODULE__, host_address, name: via(host_address)) end - @spec keep_alive(pid | String.t()) :: :ok - def keep_alive(server) when is_pid(server), do: GenServer.cast(server, :keep_alive) - def keep_alive(server) when is_binary(server), do: GenServer.cast(via(server), :keep_alive) - - @spec stop(pid | String.t()) :: :ok - def stop(server) when is_pid(server), do: GenServer.stop(server, :shutdown) - def stop(server) when is_binary(server), do: GenServer.stop(via(server), :shutdown) - - @spec get_channel(String.t()) :: GRPC.Channel.t() + @spec get_channel(pid) :: GRPC.Channel.t() | nil def get_channel(server) do - GenServer.call(via(server), :get_channel) + GenServer.call(server, :get_channel) end + ## + ## Server callbacks + ## + @impl true def init(host) do - Process.flag(:trap_exit, true) - {:ok, {host, nil}, {:continue, :connect}} - end + # Process.flag(:trap_exit, true) + pid = self() - @impl true - def handle_continue(:connect, {host_address, _} = state) do - case GRPC.Stub.connect(host_address, 50012, []) do - {:ok, channel} -> - keep_alive(self()) - {:noreply, {host_address, channel}, {:continue, :health}} + # Start a connection without blocking the GenServer + Task.start_link(fn -> + case GRPC.Stub.connect(host, 50012, []) do + {:ok, channel} -> send(pid, channel) + {:error, error} -> send(pid, {:connect_error, error}) + end - {:error, error} -> - {:stop, error, state} - end + # Keep receiving and sending back any messages to the GenServer forever + receive_loop(pid) + end) + + {:ok, {host, nil}} end @impl true @@ -56,7 +54,12 @@ defmodule Prymn.Agents.Connection do |> Enum.take_while(fn _ -> true end) end) - {:noreply, state, @timeout} + {:noreply, state} + end + + @impl true + def handle_cast(_, state) do + {:noreply, state} end @impl true @@ -65,37 +68,44 @@ defmodule Prymn.Agents.Connection do end @impl true - def handle_cast(:keep_alive, state) do - {:noreply, state, @timeout} + def handle_info(%GRPC.Channel{} = channel, {host, _}) do + {:noreply, {host, channel}, {:continue, :health}} + end + + def handle_info({:connect_error, reason}, {host, _} = state) do + if reason == :timeout do + Health.lookup(host, default: true) + |> Health.make_timed_out() + |> Health.update_and_broadcast() + end + + {:stop, reason, state} end - @impl true def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do response - |> Prymn.Agents.Health.new_from_proto() - |> Prymn.Agents.Health.update(host) + |> Health.make_from_proto(host) + |> Health.update_and_broadcast() {:noreply, state, @timeout} end - def handle_info(%GRPC.RPCError{} = response, {host, _} = state) do + def handle_info(%GRPC.RPCError{} = response, state) do Logger.debug("received a GRPC error: #{inspect(response)}") - - response - |> Prymn.Agents.Health.new_from_proto() - |> Prymn.Agents.Health.update(host) - - {:noreply, state, @timeout} + {:noreply, state} end def handle_info({:gun_up, _pid, _protocol}, state) do - # TODO: If it's possible for the GRPC connection to be down when we receive - # this message, we should `{:continue, state.channel.host}` + # NOTE: If it's possible for the GRPC connection to be down when we receive + # this message, maybe we should restart the connection {:noreply, state, {:continue, :health}} end def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do - Logger.debug("disconnected from #{host}") + Health.lookup(host) + |> Health.make_disconnected() + |> Health.update_and_broadcast() + {:noreply, state, @timeout} end @@ -103,24 +113,27 @@ defmodule Prymn.Agents.Connection do {:stop, {:shutdown, :timeout}, state} end - def handle_info({:EXIT, _from, _reason}, state) do - # TODO: How to handle this... (this happens when a linked process exists - # e.g the one with the health stream) - {:noreply, state, @timeout} - end - def handle_info(msg, state) do - Logger.warning("received unexpected message: #{inspect(msg)}") - {:noreply, state, @timeout} + Logger.debug("received unhandled message #{inspect(msg)}") + {:noreply, state} end @impl true def terminate(reason, {host, channel}) do Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})") + Health.delete(host) if channel, do: GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end + + defp receive_loop(pid) do + receive do + msg -> send(pid, msg) + end + + receive_loop(pid) + end end diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex index 56639b4..f6dcedd 100644 --- a/app/lib/prymn/agents/health.ex +++ b/app/lib/prymn/agents/health.ex @@ -1,66 +1,82 @@ defmodule Prymn.Agents.Health do - @doc """ - The Health struct keeps simple health information of - whether or not the target host machine is up to date, has any tasks running, - its resources are getting depleted, or if it's unable be reached. + @moduledoc """ + The Health struct keeps simple health information of whether or not the + target host machine is up to date, has any tasks running, its resources are + getting depleted, or if it's unable be reached. """ - defstruct [:version, message: "Unknown"] + defstruct [:host, :version, message: "Unknown"] alias PrymnProto.Prymn.HealthResponse @type t :: %{ + host: String.t(), version: String.t(), message: String.t() } - @doc false def start() do :ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true]) end - @doc false - def subscribe(host_address) do - Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host_address}") + def subscribe(host) do + Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host}") end - @doc false - def update(health, host_address) do - :ets.insert(__MODULE__, {host_address, health}) - Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host_address}", {host_address, health}) + def broadcast!(%__MODULE__{host: host} = health) do + Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host}", health) end - @doc false - def lookup(host_address) do + def update_and_broadcast(nil) do + nil + end + + def update_and_broadcast(%__MODULE__{host: host} = health) do + :ets.insert(__MODULE__, {host, health}) + broadcast!(health) + end + + def delete(host_address) do + :ets.delete(__MODULE__, host_address) + end + + def lookup(host_address, opts \\ []) do + default = Keyword.get(opts, :default, false) + case :ets.lookup(__MODULE__, host_address) do [{^host_address, value}] -> value + [] when default -> %__MODULE__{host: host_address} [] -> nil end end - @doc false - def new_from_proto(proto_health) do - case proto_health do - %HealthResponse{} = health -> - from_health(health) + def make_timed_out(%__MODULE__{} = health) do + %__MODULE__{health | message: "Connect timed out"} + end - %GRPC.RPCError{message: ":stream_error: :closed"} -> - %__MODULE__{message: "Disconnected"} + def make_disconnected(%__MODULE__{} = health) do + %__MODULE__{health | message: "Disconnected"} + end - %GRPC.RPCError{} = error -> - require Logger - Logger.error("unhandled GRPC error received in Health module: #{inspect(error)}") - %__MODULE__{message: "Error retrieving server status"} + def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do + %__MODULE__{host: host} + |> do_version(version) + |> do_system(system) + |> do_tasks(tasks) + end + + defp do_version(health, version) do + %__MODULE__{health | version: version} + end + + defp do_system(health, system) do + case system.status do + "normal" -> %__MODULE__{health | message: "Connected"} + status -> %__MODULE__{health | message: "Alert: #{status}"} end end - defp from_health(%HealthResponse{system: system, version: version}) do - case system.status do - "normal" -> - %__MODULE__{message: "Connected", version: version} - - status -> - %__MODULE__{message: status, version: version} - end + defp do_tasks(health, _tasks) do + health end end diff --git a/app/lib/prymn/servers/server.ex b/app/lib/prymn/servers/server.ex index 3798098..4075f0e 100644 --- a/app/lib/prymn/servers/server.ex +++ b/app/lib/prymn/servers/server.ex @@ -12,8 +12,6 @@ defmodule Prymn.Servers.Server do values: [:unregistered, :registered], default: :unregistered - field :connection_status, :string, virtual: true - timestamps() end diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 24176cc..8039289 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -1,7 +1,6 @@ defmodule PrymnWeb.ServerLive.Index do require Logger - alias Prymn.Agents.Health alias Prymn.{Servers, Agents} use PrymnWeb, :live_view @@ -13,8 +12,9 @@ defmodule PrymnWeb.ServerLive.Index do healths = if connected?(socket) do for %Servers.Server{status: :registered, public_ip: ip} <- servers, into: %{} do - Agents.start_connection_or_keep_alive(ip) - {ip, Agents.subscribe_to_health(ip)} + Agents.subscribe_to_health(ip) + Agents.start_connection(ip) + {ip, Agents.get_health(ip)} end else %{} @@ -44,8 +44,8 @@ defmodule PrymnWeb.ServerLive.Index do |> update(:servers, fn servers -> [server | servers] end)} end - def handle_info({host, %Prymn.Agents.Health{} = health}, socket) do - healths = Map.put(socket.assigns.healths, host, health) + def handle_info(%Agents.Health{} = health, socket) do + healths = Map.put(socket.assigns.healths, health.host, health) {:noreply, assign(socket, :healths, healths)} end @@ -55,22 +55,32 @@ defmodule PrymnWeb.ServerLive.Index do end defp server_status(assigns) do - case {assigns.server, assigns.health} do - {%{status: :registered}, nil} -> + case {assigns.status, assigns.health} do + {:unregistered, _} -> ~H""" - Unknown + Needs registration """ - {%{status: :registered}, %Health{message: message}} -> - assigns = assign(assigns, :status, message) - + {:registered, nil} -> ~H""" - <%= @status %> + Connecting... """ - {_, _} -> + {:registered, %Agents.Health{message: "Connected"}} -> ~H""" - Not registered + Connected + """ + + {:registered, %Agents.Health{message: "Disconnected"}} -> + ~H""" + Disconnected + """ + + {:registered, %Agents.Health{message: message}} -> + assigns = assign(assigns, :message, message) + + ~H""" + <%= @message %> """ end end diff --git a/app/lib/prymn_web/live/server_live/index.html.heex b/app/lib/prymn_web/live/server_live/index.html.heex index 1dcdefa..14a9028 100644 --- a/app/lib/prymn_web/live/server_live/index.html.heex +++ b/app/lib/prymn_web/live/server_live/index.html.heex @@ -19,7 +19,7 @@
Memory
+ <%= @used_disk %> + % +
+Used Disk
+