app: a nice poc background connection with the agent

Dynamically supervise a GRPC connection and keep it alive for a while,
using a timeout to kill the process on inactivity.

The Connection GenServer keeps the connection alive, while listening to
a health stream from the GRPC endpoint, notifying any subscribed
listeners about changes to health.

This health is then propagated to the main servers page.

Also create a looping get_sys_info which retrieves some basic system
information.
This commit is contained in:
Nikos Papadakis 2023-08-26 00:51:04 +03:00
parent 89ffca8833
commit be7f584010
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
10 changed files with 287 additions and 70 deletions

View file

@ -2,18 +2,47 @@ defmodule Prymn.Agents do
@moduledoc ~S"""
Prymn Agents are programs that manage a remote client machine. Prymn backend
communicates with them using GRPC calls. GRPC connections are started using
the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the
the Prymn.Agents.ConnectionSupervisor and are book-kept using the
Prymn.Agents.Registry.
"""
def start_connection_or_keep_alive(ip) do
case Registry.lookup(Prymn.Agents.Registry, ip) do
[{pid, _}] ->
Prymn.Agents.Connection.keep_alive(pid)
@doc """
Establishes a dynamically supervised, background connection with the target
host agent, keeping it alive if it is already started.
"""
@spec start_connection_or_keep_alive(String.t()) :: :ok
def start_connection_or_keep_alive(host_address) do
spec = {Prymn.Agents.Connection, host_address}
[] ->
child = {Prymn.Agents.Connection, ip}
{:ok, _pid} = DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child)
case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do
{:error, {:already_started, pid}} ->
:ok = Prymn.Agents.Connection.keep_alive(pid)
:ok
{:ok, _pid} ->
:ok
end
end
@doc """
Subscribe to the host's Health using Phoenix.PubSub Broadcasted messages are
in the form of:
{host_address, %Prymn.Agents.Health{}}
## Returns
This function returns the last health status of the Agent that was saved.
"""
@spec subscribe_to_health(String.t()) :: Prymn.Agents.Health.t()
def subscribe_to_health(host_address) do
Prymn.Agents.Health.subscribe(host_address)
Prymn.Agents.Health.lookup(host_address)
end
@spec get_channel(String.t()) :: GRPC.Channel.t()
def get_channel(host_address) do
Prymn.Agents.Connection.get_channel(host_address)
end
end

View file

@ -1,97 +1,122 @@
defmodule Prymn.Agents.Connection do
@moduledoc false
@timer_interval 120_000
defstruct [:channel, :timer_ref]
require Logger
use GenServer, restart: :transient
@timeout :timer.minutes(2)
@spec start_link(String.t()) :: GenServer.on_start()
def start_link(addr) do
def start_link(addr) when is_binary(addr) do
GenServer.start_link(__MODULE__, addr, name: via(addr))
end
@spec keep_alive(pid) :: :ok
def keep_alive(pid) do
GenServer.cast(pid, :reset_timer)
@spec keep_alive(pid | String.t()) :: :ok
def keep_alive(server) when is_pid(server), do: GenServer.cast(server, :keep_alive)
def keep_alive(server) when is_binary(server), do: GenServer.cast(via(server), :keep_alive)
@spec stop(pid | String.t()) :: :ok
def stop(server) when is_pid(server), do: GenServer.stop(server, :shutdown)
def stop(server) when is_binary(server), do: GenServer.stop(via(server), :shutdown)
@spec get_channel(String.t()) :: GRPC.Channel.t()
def get_channel(server) do
GenServer.call(via(server), :get_channel)
end
@impl true
def init(host) do
Process.flag(:trap_exit, true)
{:ok, %__MODULE__{}, {:continue, host}}
{:ok, {host, nil}, {:continue, :connect}}
end
@impl true
def handle_continue(host, state) when is_binary(host) do
case GRPC.Stub.connect(host, 50012, []) do
def handle_continue(:connect, {host_address, _} = state) do
case GRPC.Stub.connect(host_address, 50012, []) do
{:ok, channel} ->
GenServer.cast(self(), :reset_timer)
{:noreply, %__MODULE__{channel: channel}, {:continue, :health}}
keep_alive(self())
{:noreply, {host_address, channel}, {:continue, :health}}
{:error, error} ->
{:stop, {:error, error}, state}
{:stop, error, state}
end
end
@impl true
def handle_continue(:health, state) do
def handle_continue(:health, {_, channel} = state) do
pid = self()
Task.start_link(fn ->
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(state.channel, %Google.Protobuf.Empty{})
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{})
# Read from the stream forever and send data back to parent
stream
|> Stream.each(fn health -> send(pid, {:health, health}) end)
|> Stream.each(fn {_, data} -> send(pid, data) end)
|> Enum.take_while(fn _ -> true end)
end)
{:noreply, state}
{:noreply, state, @timeout}
end
@impl true
def handle_cast(:reset_timer, state) do
if state.timer_ref, do: Process.cancel_timer(state.timer_ref)
ref = Process.send_after(self(), :drop_connection, @timer_interval)
{:noreply, put_in(state.timer_ref, ref)}
def handle_call(:get_channel, _from, {_, channel} = state) do
{:reply, channel, state, @timeout}
end
@impl true
def handle_info(:drop_connection, state) do
Logger.debug("shutting down connection with agent host #{inspect(state.channel.host)}")
{:stop, :shutdown, state}
def handle_cast(:keep_alive, state) do
{:noreply, state, @timeout}
end
@impl true
def handle_info({:health, health}, state) do
IO.inspect(health)
{:noreply, state}
def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do
response
|> Prymn.Agents.Health.new_from_proto()
|> Prymn.Agents.Health.update(host)
{:noreply, state, @timeout}
end
def handle_info(%GRPC.RPCError{} = response, {host, _} = state) do
Logger.debug("received a GRPC error: #{inspect(response)}")
response
|> Prymn.Agents.Health.new_from_proto()
|> Prymn.Agents.Health.update(host)
{:noreply, state, @timeout}
end
@impl true
def handle_info({:gun_up, _pid, _protocol}, state) do
# TODO: If it's possible for the GRPC connection to be down when we receive
# this message, we should `{:continue, state.channel.host}`
{:noreply, state, {:continue, :health}}
end
@impl true
def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do
Logger.debug("disconnected from #{inspect(channel)}")
{:noreply, state}
def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do
Logger.debug("disconnected from #{host}")
{:noreply, state, @timeout}
end
def handle_info(:timeout, state) do
{:stop, {:shutdown, :timeout}, state}
end
def handle_info({:EXIT, _from, _reason}, state) do
# TODO: How to handle this... (this happens when a linked process exists
# e.g the one with the health stream)
{:noreply, state, @timeout}
end
@impl true
def handle_info(msg, state) do
Logger.warning("received unexpected message: #{inspect(msg)}")
{:noreply, state}
{:noreply, state, @timeout}
end
@impl true
def terminate(_reason, %{channel: channel}) do
def terminate(reason, {host, channel}) do
Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})")
if channel, do: GRPC.Stub.disconnect(channel)
end

View file

@ -0,0 +1,66 @@
defmodule Prymn.Agents.Health do
@doc """
The Health struct keeps simple health information of
whether or not the target host machine is up to date, has any tasks running,
its resources are getting depleted, or if it's unable be reached.
"""
defstruct [:version, message: "Unknown"]
alias PrymnProto.Prymn.HealthResponse
@type t :: %{
version: String.t(),
message: String.t()
}
@doc false
def start() do
:ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true])
end
@doc false
def subscribe(host_address) do
Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host_address}")
end
@doc false
def update(health, host_address) do
:ets.insert(__MODULE__, {host_address, health})
Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host_address}", {host_address, health})
end
@doc false
def lookup(host_address) do
case :ets.lookup(__MODULE__, host_address) do
[{^host_address, value}] -> value
[] -> nil
end
end
@doc false
def new_from_proto(proto_health) do
case proto_health do
%HealthResponse{} = health ->
from_health(health)
%GRPC.RPCError{message: ":stream_error: :closed"} ->
%__MODULE__{message: "Disconnected"}
%GRPC.RPCError{} = error ->
require Logger
Logger.error("unhandled GRPC error received in Health module: #{inspect(error)}")
%__MODULE__{message: "Error retrieving server status"}
end
end
defp from_health(%HealthResponse{system: system, version: version}) do
case system.status do
"normal" ->
%__MODULE__{message: "Connected", version: version}
status ->
%__MODULE__{message: status, version: version}
end
end
end

View file

@ -0,0 +1,27 @@
defmodule Prymn.Agents.Supervisor do
@moduledoc false
use Supervisor
@dynamic_supervisor Prymn.Agents.ConnectionSupervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
# The registry will be used to register `Connection` processes with their
# name as their address
{Registry, keys: :unique, name: Prymn.Agents.Registry},
# Dynamically start `Connection` processes
{DynamicSupervisor, name: @dynamic_supervisor, strategy: :one_for_one, max_seconds: 60}
]
# Register a "health" table that stores in-memory any agent health data
Prymn.Agents.Health.start()
Supervisor.init(children, strategy: :one_for_one)
end
end

View file

@ -18,9 +18,8 @@ defmodule Prymn.Application do
{Finch, name: Prymn.Finch},
# Start the Endpoint (http/https)
PrymnWeb.Endpoint,
# Start the prymn agent (grpc) registry and the supervisor
{Registry, keys: :unique, name: Prymn.Agents.Registry},
{DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one, max_seconds: 60}
# Start the Agents (dynamic GRPC connections) supervisor
Prymn.Agents.Supervisor
]
# See https://hexdocs.pm/elixir/Supervisor.html

View file

@ -1,5 +1,7 @@
defmodule PrymnWeb.ServerLive.Index do
require Logger
alias Prymn.Agents.Health
alias Prymn.{Servers, Agents}
use PrymnWeb, :live_view
@ -8,13 +10,20 @@ defmodule PrymnWeb.ServerLive.Index do
def mount(_params, _session, socket) do
servers = Servers.list_servers()
if connected?(socket) do
for %Servers.Server{status: :registered, public_ip: ip} <- servers do
Agents.start_connection_or_keep_alive(ip)
healths =
if connected?(socket) do
for %Servers.Server{status: :registered, public_ip: ip} <- servers, into: %{} do
Agents.start_connection_or_keep_alive(ip)
{ip, Agents.subscribe_to_health(ip)}
end
else
%{}
end
end
{:ok, assign(socket, :servers, servers)}
{:ok,
socket
|> assign(:servers, servers)
|> assign(:healths, healths)}
end
@impl true
@ -35,21 +44,34 @@ defmodule PrymnWeb.ServerLive.Index do
|> update(:servers, fn servers -> [server | servers] end)}
end
@impl true
def handle_info({:healthcheck, ip, _message}, socket) do
servers =
update_in(
socket.assigns.servers,
[Access.filter(&match?(%{public_ip: ^ip}, &1))],
&Map.merge(&1, %{connection_status: "Connected"})
)
{:noreply, assign(socket, :servers, servers)}
def handle_info({host, %Prymn.Agents.Health{} = health}, socket) do
healths = Map.put(socket.assigns.healths, host, health)
{:noreply, assign(socket, :healths, healths)}
end
@impl true
def handle_info(msg, state) do
Logger.debug("received unexpected message #{inspect(msg)}")
{:noreply, state}
end
defp server_status(assigns) do
case {assigns.server, assigns.health} do
{%{status: :registered}, nil} ->
~H"""
Unknown
"""
{%{status: :registered}, %Health{message: message}} ->
assigns = assign(assigns, :status, message)
~H"""
<%= @status %>
"""
{_, _} ->
~H"""
Not registered
"""
end
end
end

View file

@ -19,11 +19,7 @@
<div class="flex flex-row flex-wrap justify-between">
<h2 class="text-xl"><%= server.name %></h2>
<span class="self-center text-sm">
<%= if server.status == :registered do %>
<%= server.connection_status || "..." %>
<% else %>
Not registered
<% end %>
<.server_status server={server} health={@healths[server.public_ip]} />
</span>
</div>
<div class="lg:text-sm">

View file

@ -1,7 +1,7 @@
defmodule PrymnWeb.ServerLive.Show do
use PrymnWeb, :live_view
alias Prymn.Servers
alias Prymn.{Agents, Servers}
@impl true
def mount(_params, _session, socket) do
@ -11,11 +11,46 @@ defmodule PrymnWeb.ServerLive.Show do
@impl true
def handle_params(%{"id" => id}, _, socket) do
server = Servers.get_server!(id)
pid = self()
if connected?(socket) do
Agents.start_connection_or_keep_alive(server.public_ip)
Task.start_link(fn -> get_sys_info(pid, server.public_ip) end)
end
{:noreply,
socket
|> assign(:page_title, server.name)
|> assign(:server, server)
|> assign(:uptime, 0)
|> assign(:cpus, [])
|> assign(:total_memory, 0)
|> assign(:used_memory, 0)
|> assign(:registration_command, Servers.create_setup_command(server))}
end
@impl true
def handle_info(%PrymnProto.Prymn.SysInfoResponse{} = response, socket) do
{:noreply,
socket
|> assign(:uptime, response.uptime)
|> assign(
:used_memory,
bytes_to_gigabytes(response.mem_total_bytes - response.mem_avail_bytes)
)
|> assign(:total_memory, bytes_to_gigabytes(response.mem_total_bytes))
|> assign(:cpus, response.cpus)}
end
defp bytes_to_gigabytes(bytes) do
Float.round(bytes / Integer.pow(1024, 3), 2)
end
defp get_sys_info(from, host_address) do
channel = Agents.get_channel(host_address)
{:ok, reply} = PrymnProto.Prymn.Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{})
send(from, reply)
Process.sleep(:timer.seconds(2))
get_sys_info(from, host_address)
end
end

View file

@ -22,4 +22,22 @@
</div>
</section>
<section :if={@server.status == :registered} class="my-10 flex rounded bg-gray-800 p-5 text-white">
<div>
<p class="text-xl"><%= @uptime || "" %>s</p>
<p class="text-sm">Uptime</p>
</div>
<div class="ml-4">
<p class="text-xl"><%= Enum.count(@cpus || []) %></p>
<p class="text-sm">CPUs</p>
</div>
<div class="ml-4">
<p class="text-xl">
<%= @used_memory || 0 %> / <%= @total_memory || 0 %>
<span>GiB</span>
</p>
<p class="text-sm">Memory</p>
</div>
</section>
<.back navigate={~p"/servers"}>Back to servers</.back>

View file

@ -20,7 +20,7 @@ defmodule Prymn.MixProject do
def application do
[
mod: {Prymn.Application, []},
extra_applications: [:logger, :runtime_tools]
extra_applications: [:logger, :runtime_tools, :os_mon]
]
end