app: refactor the Connection to make it asynchronous when connecting

Some work has been done on making the Connection feel nicer, but also
more work is needed to not have the channel be exposed to the upper
layers of the application. We should wrap all the GRPC calls in the
GenServer (which may also allow caching on certain calls such as
get_sys_info)
This commit is contained in:
Nikos Papadakis 2023-08-28 23:32:42 +03:00
parent be7f584010
commit 26ba60b95d
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
9 changed files with 231 additions and 131 deletions

View file

@ -46,7 +46,8 @@ impl agent_server::Agent for AgentService {
sys.refresh_specifics(
sysinfo::RefreshKind::new()
.with_disks()
// .with_disks() // what is this?
.with_disks_list()
.with_memory()
.with_processes(sysinfo::ProcessRefreshKind::everything())
.with_cpu(sysinfo::CpuRefreshKind::everything()),

View file

@ -7,42 +7,51 @@ defmodule Prymn.Agents do
"""
@doc """
Establishes a dynamically supervised, background connection with the target
host agent, keeping it alive if it is already started.
"""
@spec start_connection_or_keep_alive(String.t()) :: :ok
def start_connection_or_keep_alive(host_address) do
spec = {Prymn.Agents.Connection, host_address}
alias Prymn.Agents.Connection
alias Prymn.Agents.Health
def start_connection(host_address) do
spec = {Connection, host_address}
case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do
{:error, {:already_started, pid}} ->
:ok = Prymn.Agents.Connection.keep_alive(pid)
:ok
{:ok, _pid} ->
:ok
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, error} -> {:error, error}
end
end
@doc """
Subscribe to the host's Health using Phoenix.PubSub Broadcasted messages are
in the form of:
the Health struct:
{host_address, %Prymn.Agents.Health{}}
## Returns
This function returns the last health status of the Agent that was saved.
%Prymn.Agents.Health{}
"""
@spec subscribe_to_health(String.t()) :: Prymn.Agents.Health.t()
def subscribe_to_health(host_address) do
Prymn.Agents.Health.subscribe(host_address)
Prymn.Agents.Health.lookup(host_address)
:ok = Health.subscribe(host_address)
end
@spec get_channel(String.t()) :: GRPC.Channel.t()
@doc """
Return the last known health status of the Agent, or `nil` if it doesn't
exist.
"""
def get_health(host_address) do
Health.lookup(host_address)
end
# TODO: We should not expose this api, instead wrap every GRPC call in this
# module GRPC is an "internal implementation detail" (although it probably
# wont ever change)
#
# E.g.
# def get_sys_info(agent) do
# PrymnProto.Prymn.Agent.Stub.get_sys_info(agent.channel, %Google.Protobuf.Empty{})
# end
def get_channel(host_address) do
Prymn.Agents.Connection.get_channel(host_address)
with [{pid, _}] <- Registry.lookup(Prymn.Agents.Registry, host_address),
channel when channel != nil <- Connection.get_channel(pid) do
{:ok, channel}
else
_ -> {:error, :not_found}
end
end
end

View file

@ -1,6 +1,7 @@
defmodule Prymn.Agents.Connection do
@moduledoc false
alias Prymn.Agents.Health
require Logger
use GenServer, restart: :transient
@ -8,39 +9,36 @@ defmodule Prymn.Agents.Connection do
@timeout :timer.minutes(2)
@spec start_link(String.t()) :: GenServer.on_start()
def start_link(addr) when is_binary(addr) do
GenServer.start_link(__MODULE__, addr, name: via(addr))
def start_link(host_address) do
GenServer.start_link(__MODULE__, host_address, name: via(host_address))
end
@spec keep_alive(pid | String.t()) :: :ok
def keep_alive(server) when is_pid(server), do: GenServer.cast(server, :keep_alive)
def keep_alive(server) when is_binary(server), do: GenServer.cast(via(server), :keep_alive)
@spec stop(pid | String.t()) :: :ok
def stop(server) when is_pid(server), do: GenServer.stop(server, :shutdown)
def stop(server) when is_binary(server), do: GenServer.stop(via(server), :shutdown)
@spec get_channel(String.t()) :: GRPC.Channel.t()
@spec get_channel(pid) :: GRPC.Channel.t() | nil
def get_channel(server) do
GenServer.call(via(server), :get_channel)
GenServer.call(server, :get_channel)
end
##
## Server callbacks
##
@impl true
def init(host) do
Process.flag(:trap_exit, true)
{:ok, {host, nil}, {:continue, :connect}}
# Process.flag(:trap_exit, true)
pid = self()
# Start a connection without blocking the GenServer
Task.start_link(fn ->
case GRPC.Stub.connect(host, 50012, []) do
{:ok, channel} -> send(pid, channel)
{:error, error} -> send(pid, {:connect_error, error})
end
@impl true
def handle_continue(:connect, {host_address, _} = state) do
case GRPC.Stub.connect(host_address, 50012, []) do
{:ok, channel} ->
keep_alive(self())
{:noreply, {host_address, channel}, {:continue, :health}}
# Keep receiving and sending back any messages to the GenServer forever
receive_loop(pid)
end)
{:error, error} ->
{:stop, error, state}
end
{:ok, {host, nil}}
end
@impl true
@ -56,7 +54,12 @@ defmodule Prymn.Agents.Connection do
|> Enum.take_while(fn _ -> true end)
end)
{:noreply, state, @timeout}
{:noreply, state}
end
@impl true
def handle_cast(_, state) do
{:noreply, state}
end
@impl true
@ -65,37 +68,44 @@ defmodule Prymn.Agents.Connection do
end
@impl true
def handle_cast(:keep_alive, state) do
{:noreply, state, @timeout}
def handle_info(%GRPC.Channel{} = channel, {host, _}) do
{:noreply, {host, channel}, {:continue, :health}}
end
def handle_info({:connect_error, reason}, {host, _} = state) do
if reason == :timeout do
Health.lookup(host, default: true)
|> Health.make_timed_out()
|> Health.update_and_broadcast()
end
{:stop, reason, state}
end
@impl true
def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do
response
|> Prymn.Agents.Health.new_from_proto()
|> Prymn.Agents.Health.update(host)
|> Health.make_from_proto(host)
|> Health.update_and_broadcast()
{:noreply, state, @timeout}
end
def handle_info(%GRPC.RPCError{} = response, {host, _} = state) do
def handle_info(%GRPC.RPCError{} = response, state) do
Logger.debug("received a GRPC error: #{inspect(response)}")
response
|> Prymn.Agents.Health.new_from_proto()
|> Prymn.Agents.Health.update(host)
{:noreply, state, @timeout}
{:noreply, state}
end
def handle_info({:gun_up, _pid, _protocol}, state) do
# TODO: If it's possible for the GRPC connection to be down when we receive
# this message, we should `{:continue, state.channel.host}`
# NOTE: If it's possible for the GRPC connection to be down when we receive
# this message, maybe we should restart the connection
{:noreply, state, {:continue, :health}}
end
def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do
Logger.debug("disconnected from #{host}")
Health.lookup(host)
|> Health.make_disconnected()
|> Health.update_and_broadcast()
{:noreply, state, @timeout}
end
@ -103,24 +113,27 @@ defmodule Prymn.Agents.Connection do
{:stop, {:shutdown, :timeout}, state}
end
def handle_info({:EXIT, _from, _reason}, state) do
# TODO: How to handle this... (this happens when a linked process exists
# e.g the one with the health stream)
{:noreply, state, @timeout}
end
def handle_info(msg, state) do
Logger.warning("received unexpected message: #{inspect(msg)}")
{:noreply, state, @timeout}
Logger.debug("received unhandled message #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(reason, {host, channel}) do
Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})")
Health.delete(host)
if channel, do: GRPC.Stub.disconnect(channel)
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
defp receive_loop(pid) do
receive do
msg -> send(pid, msg)
end
receive_loop(pid)
end
end

View file

@ -1,66 +1,82 @@
defmodule Prymn.Agents.Health do
@doc """
The Health struct keeps simple health information of
whether or not the target host machine is up to date, has any tasks running,
its resources are getting depleted, or if it's unable be reached.
@moduledoc """
The Health struct keeps simple health information of whether or not the
target host machine is up to date, has any tasks running, its resources are
getting depleted, or if it's unable be reached.
"""
defstruct [:version, message: "Unknown"]
defstruct [:host, :version, message: "Unknown"]
alias PrymnProto.Prymn.HealthResponse
@type t :: %{
host: String.t(),
version: String.t(),
message: String.t()
}
@doc false
def start() do
:ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true])
end
@doc false
def subscribe(host_address) do
Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host_address}")
def subscribe(host) do
Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host}")
end
@doc false
def update(health, host_address) do
:ets.insert(__MODULE__, {host_address, health})
Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host_address}", {host_address, health})
def broadcast!(%__MODULE__{host: host} = health) do
Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host}", health)
end
@doc false
def lookup(host_address) do
def update_and_broadcast(nil) do
nil
end
def update_and_broadcast(%__MODULE__{host: host} = health) do
:ets.insert(__MODULE__, {host, health})
broadcast!(health)
end
def delete(host_address) do
:ets.delete(__MODULE__, host_address)
end
def lookup(host_address, opts \\ []) do
default = Keyword.get(opts, :default, false)
case :ets.lookup(__MODULE__, host_address) do
[{^host_address, value}] -> value
[] when default -> %__MODULE__{host: host_address}
[] -> nil
end
end
@doc false
def new_from_proto(proto_health) do
case proto_health do
%HealthResponse{} = health ->
from_health(health)
%GRPC.RPCError{message: ":stream_error: :closed"} ->
%__MODULE__{message: "Disconnected"}
%GRPC.RPCError{} = error ->
require Logger
Logger.error("unhandled GRPC error received in Health module: #{inspect(error)}")
%__MODULE__{message: "Error retrieving server status"}
end
def make_timed_out(%__MODULE__{} = health) do
%__MODULE__{health | message: "Connect timed out"}
end
defp from_health(%HealthResponse{system: system, version: version}) do
def make_disconnected(%__MODULE__{} = health) do
%__MODULE__{health | message: "Disconnected"}
end
def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do
%__MODULE__{host: host}
|> do_version(version)
|> do_system(system)
|> do_tasks(tasks)
end
defp do_version(health, version) do
%__MODULE__{health | version: version}
end
defp do_system(health, system) do
case system.status do
"normal" ->
%__MODULE__{message: "Connected", version: version}
status ->
%__MODULE__{message: status, version: version}
"normal" -> %__MODULE__{health | message: "Connected"}
status -> %__MODULE__{health | message: "Alert: #{status}"}
end
end
defp do_tasks(health, _tasks) do
health
end
end

View file

@ -12,8 +12,6 @@ defmodule Prymn.Servers.Server do
values: [:unregistered, :registered],
default: :unregistered
field :connection_status, :string, virtual: true
timestamps()
end

View file

@ -1,7 +1,6 @@
defmodule PrymnWeb.ServerLive.Index do
require Logger
alias Prymn.Agents.Health
alias Prymn.{Servers, Agents}
use PrymnWeb, :live_view
@ -13,8 +12,9 @@ defmodule PrymnWeb.ServerLive.Index do
healths =
if connected?(socket) do
for %Servers.Server{status: :registered, public_ip: ip} <- servers, into: %{} do
Agents.start_connection_or_keep_alive(ip)
{ip, Agents.subscribe_to_health(ip)}
Agents.subscribe_to_health(ip)
Agents.start_connection(ip)
{ip, Agents.get_health(ip)}
end
else
%{}
@ -44,8 +44,8 @@ defmodule PrymnWeb.ServerLive.Index do
|> update(:servers, fn servers -> [server | servers] end)}
end
def handle_info({host, %Prymn.Agents.Health{} = health}, socket) do
healths = Map.put(socket.assigns.healths, host, health)
def handle_info(%Agents.Health{} = health, socket) do
healths = Map.put(socket.assigns.healths, health.host, health)
{:noreply, assign(socket, :healths, healths)}
end
@ -55,22 +55,32 @@ defmodule PrymnWeb.ServerLive.Index do
end
defp server_status(assigns) do
case {assigns.server, assigns.health} do
{%{status: :registered}, nil} ->
case {assigns.status, assigns.health} do
{:unregistered, _} ->
~H"""
Unknown
<span class="text-gray-500">Needs registration</span>
"""
{%{status: :registered}, %Health{message: message}} ->
assigns = assign(assigns, :status, message)
{:registered, nil} ->
~H"""
<%= @status %>
<span class="text-yellow-600">Connecting...</span>
"""
{_, _} ->
{:registered, %Agents.Health{message: "Connected"}} ->
~H"""
Not registered
<span class="text-green-600">Connected</span>
"""
{:registered, %Agents.Health{message: "Disconnected"}} ->
~H"""
<span class="text-red-600">Disconnected</span>
"""
{:registered, %Agents.Health{message: message}} ->
assigns = assign(assigns, :message, message)
~H"""
<span class="text-yellow-900"><%= @message %></span>
"""
end
end

View file

@ -19,7 +19,7 @@
<div class="flex flex-row flex-wrap justify-between">
<h2 class="text-xl"><%= server.name %></h2>
<span class="self-center text-sm">
<.server_status server={server} health={@healths[server.public_ip]} />
<.server_status status={server.status} health={@healths[server.public_ip]} />
</span>
</div>
<div class="lg:text-sm">

View file

@ -13,17 +13,22 @@ defmodule PrymnWeb.ServerLive.Show do
server = Servers.get_server!(id)
pid = self()
if connected?(socket) do
Agents.start_connection_or_keep_alive(server.public_ip)
if connected?(socket) and server.status == :registered do
Agents.subscribe_to_health(server.public_ip)
Agents.start_connection(server.public_ip)
Task.start_link(fn -> get_sys_info(pid, server.public_ip) end)
end
health = Agents.get_health(server.public_ip)
{:noreply,
socket
|> assign(:health, health || %{message: "Connecting..."})
|> assign(:page_title, server.name)
|> assign(:server, server)
|> assign(:uptime, 0)
|> assign(:cpus, [])
|> assign(:used_disk, 0)
|> assign(:total_memory, 0)
|> assign(:used_memory, 0)
|> assign(:registration_command, Servers.create_setup_command(server))}
@ -39,18 +44,39 @@ defmodule PrymnWeb.ServerLive.Show do
bytes_to_gigabytes(response.mem_total_bytes - response.mem_avail_bytes)
)
|> assign(:total_memory, bytes_to_gigabytes(response.mem_total_bytes))
|> assign(:used_disk, calculate_disk_used_percent(response.disks))
|> assign(:cpus, response.cpus)}
end
def handle_info(%Agents.Health{} = health, socket) do
{:noreply, assign(socket, :health, health)}
end
defp bytes_to_gigabytes(bytes) do
Float.round(bytes / Integer.pow(1024, 3), 2)
end
defp calculate_disk_used_percent(disks) do
alias PrymnProto.Prymn.SysInfoResponse.Disk
{used, total} =
Enum.reduce(disks, {0, 0}, fn %Disk{} = disk, {used, total} ->
{used + disk.total_bytes - disk.avail_bytes, total + disk.total_bytes}
end)
Float.round(100 * used / total, 2)
end
defp get_sys_info(from, host_address) do
channel = Agents.get_channel(host_address)
{:ok, reply} = PrymnProto.Prymn.Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{})
alias PrymnProto.Prymn.Agent
with {:ok, channel} <- Agents.get_channel(host_address),
{:ok, reply} <- Agent.Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) do
send(from, reply)
Process.sleep(:timer.seconds(2))
end
Process.sleep(:timer.seconds(5))
get_sys_info(from, host_address)
end
end

View file

@ -1,5 +1,25 @@
<.header>
Server <%= @server.name %>
<span
role="tooltip"
class="relative inline-flex h-3 w-3
before:-translate-x-1/2 before:-translate-y-full before:-top-2
before:left-1/2 before:absolute before:text-sm before:text-white
before:font-normal before:content-[attr(data-tip)] before:opacity-0
hover:before:opacity-100 before:py-1 before:px-2 before:bg-black
before:rounded before:pointer-events-none before:transition-opacity"
data-tip={@health.message}
>
<%= case @health.message do %>
<% "Connected" -> %>
<span class="absolute top-0 left-0 h-full w-full animate-ping rounded-full bg-green-400 opacity-75" />
<span class="h-3 w-3 rounded-full bg-green-500" />
<% "Disconnected" -> %>
<span class="h-3 w-3 rounded-full bg-red-500" />
<% _ -> %>
<span class="h-3 w-3 rounded-full bg-yellow-500" />
<% end %>
</span>
<span class="ml-3">Server <%= @server.name %></span>
</.header>
<section :if={@server.status == :unregistered} class="my-10">
@ -38,6 +58,13 @@
</p>
<p class="text-sm">Memory</p>
</div>
<div class="ml-4">
<p class="text-xl">
<%= @used_disk %>
<span>%</span>
</p>
<p class="text-sm">Used Disk</p>
</div>
</section>
<.back navigate={~p"/servers"}>Back to servers</.back>