refactor: agent connections now use the new healthcheck streaming

`Prymn.Agent.Connection` GenServer uses healthchecking with the new
streaming RPC, retrieving asynchronously the health status of the agent.

The Connection will also shut itself down when there's no usage (when
the keepalive function is not called frequently)
This commit is contained in:
Nikos Papadakis 2023-08-19 21:14:07 +03:00
parent 20896ab5a3
commit 5bd44ef9ac
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
4 changed files with 73 additions and 129 deletions

View file

@ -4,48 +4,16 @@ defmodule Prymn.Agents do
communicates with them using GRPC calls. GRPC connections are started using
the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the
Prymn.Agents.Registry.
## Examples
TODO
"""
@doc """
Ensures a connection with the Prymn Agent exists and is kept in memory.
def start_connection_or_keep_alive(ip) do
case Registry.lookup(Prymn.Agents.Registry, ip) do
[{pid, _}] ->
Prymn.Agents.Connection.keep_alive(pid)
Returns `:ok` when a new connection is successfuly established or is already established
Returns `{:error, reason}` when the connection could not be established
"""
@spec ensure_connection(String.t()) :: :ok | {:error, term}
def ensure_connection(public_ip) do
child = {Prymn.Agents.Connection, public_ip}
case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, error} -> {:error, error}
[] ->
child = {Prymn.Agents.Connection, ip}
{:ok, _pid} = DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child)
end
end
@doc """
Terminates the process and drops the connection gracefully.
"""
@spec drop_connection(String.t()) :: :ok | {:error, :not_found}
def drop_connection(address) do
:ok = Prymn.Agents.Connection.drop(address)
catch
:exit, _ -> {:error, :not_found}
end
@doc """
Get the channel for the given `address`. The channel is used to make GRPC
calls.
"""
@spec get_channel(String.t()) :: GRPC.Channel.t() | {:error, :not_found}
def get_channel(address) do
Prymn.Agents.Connection.get_channel(address)
catch
:exit, _ -> {:error, :not_found}
end
end

View file

@ -1,15 +1,11 @@
defmodule Prymn.Agents.Connection do
@moduledoc false
# TODO: Disconnect after a while of idling. Disconnect when the healthcheck
# fails too many times.
@timer_interval 120_000
defstruct [:channel, up?: false]
@healthcheck_inverval 20000
defstruct [:channel, :timer_ref]
require Logger
alias PrymnProto.Prymn.Agent.Stub, as: Grpc
use GenServer, restart: :transient
@ -18,90 +14,88 @@ defmodule Prymn.Agents.Connection do
GenServer.start_link(__MODULE__, addr, name: via(addr))
end
@spec get_channel(String.t()) :: GRPC.Channel.t()
def get_channel(addr) do
GenServer.call(via(addr), :get_channel)
end
@spec drop(String.t()) :: :ok
def drop(addr) do
GenServer.stop(via(addr), :shutdown)
@spec keep_alive(pid) :: :ok
def keep_alive(pid) do
GenServer.cast(pid, :reset_timer)
end
@impl true
def init(public_ip) do
case GRPC.Stub.connect("#{public_ip}:50012") do
{:ok, channel} ->
send(self(), :do_healthcheck)
def init(host) do
Process.flag(:trap_exit, true)
{:ok, %__MODULE__{}, {:continue, host}}
end
{:ok, %__MODULE__{channel: channel, up?: true}}
@impl true
def handle_continue(host, state) when is_binary(host) do
case GRPC.Stub.connect(host, 50012, []) do
{:ok, channel} ->
GenServer.cast(self(), :reset_timer)
{:noreply, %__MODULE__{channel: channel}, {:continue, :health}}
{:error, error} ->
broadcast_healthcheck!(:down, public_ip)
{:stop, error}
{:stop, {:error, error}, state}
end
end
@impl true
def handle_call(:get_channel, _from, state) do
{:reply, state.channel, state}
def handle_continue(:health, state) do
pid = self()
Task.start_link(fn ->
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(state.channel, %Google.Protobuf.Empty{})
stream
|> Stream.each(fn health -> send(pid, {:health, health}) end)
|> Enum.take_while(fn _ -> true end)
end)
{:noreply, state}
end
@impl true
def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do
broadcast_healthcheck!(:up, channel.host)
{:noreply, %{state | up?: true}}
def handle_cast(:reset_timer, state) do
if state.timer_ref, do: Process.cancel_timer(state.timer_ref)
ref = Process.send_after(self(), :drop_connection, @timer_interval)
{:noreply, put_in(state.timer_ref, ref)}
end
@impl true
def handle_info(:drop_connection, state) do
Logger.debug("shutting down connection with agent host #{inspect(state.channel.host)}")
{:stop, :shutdown, state}
end
@impl true
def handle_info({:health, health}, state) do
IO.inspect(health)
{:noreply, state}
end
@impl true
def handle_info({:gun_up, _pid, _protocol}, state) do
# TODO: If it's possible for the GRPC connection to be down when we receive
# this message, we should `{:continue, state.channel.host}`
{:noreply, state, {:continue, :health}}
end
@impl true
def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do
broadcast_healthcheck!(:down, channel.host)
{:noreply, %{state | up?: false}}
end
@impl true
def handle_info(:do_healthcheck, %{channel: channel, up?: up?} = state) do
request = %PrymnProto.Prymn.EchoRequest{message: "hello"}
if up? do
case Grpc.echo(channel, request) do
{:ok, _reply} ->
broadcast_healthcheck!(:up, channel.host)
{:error, error} ->
Logger.warning(
"healthcheck error for server #{channel.host}, reason: #{inspect(error)}"
)
end
else
broadcast_healthcheck!(:down, channel.host)
end
Process.send_after(self(), :do_healthcheck, @healthcheck_inverval)
Logger.debug("disconnected from #{inspect(channel)}")
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
Logger.debug("received unexpected message: #{inspect(msg)}")
Logger.warning("received unexpected message: #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(_reason, %{channel: channel}) do
GRPC.Stub.disconnect(channel)
if channel, do: GRPC.Stub.disconnect(channel)
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
defp broadcast_healthcheck!(msg, ip_address) do
Phoenix.PubSub.broadcast!(
Prymn.PubSub,
"agent:#{ip_address}",
{:healthcheck, ip_address, msg}
)
end
end

View file

@ -20,7 +20,7 @@ defmodule Prymn.Application do
PrymnWeb.Endpoint,
# Start the prymn agent (grpc) registry and the supervisor
{Registry, keys: :unique, name: Prymn.Agents.Registry},
{DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one}
{DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one, max_seconds: 60}
]
# See https://hexdocs.pm/elixir/Supervisor.html

View file

@ -7,35 +7,25 @@ defmodule PrymnWeb.ServerLive.Index do
@impl true
def mount(_params, _session, socket) do
servers = Servers.list_servers()
# pid = self()
if connected?(socket) do
for %Servers.Server{status: :registered, public_ip: ip} <- servers do
:ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{ip}")
Task.start_link(fn ->
case Agents.ensure_connection(ip) do
:ok -> IO.puts("Ok")
_ -> IO.puts("not ok")
Agents.start_connection_or_keep_alive(ip)
end
end)
end
{:ok, assign(socket, :servers, servers)}
end
@impl true
def handle_params(params, _url, socket) do
{:noreply, apply_action(socket, socket.assigns.live_action, params)}
def handle_params(_params, _url, socket) do
socket =
case socket.assigns.live_action do
:new -> assign(socket, :page_title, gettext("Connect a Server"))
:index -> assign(socket, :page_title, gettext("Listing Servers"))
end
defp apply_action(socket, :new, _params) do
socket
|> assign(:page_title, "New Server")
end
defp apply_action(socket, :index, _params) do
socket
|> assign(:page_title, "Listing Servers")
{:noreply, socket}
end
@impl true
@ -62,12 +52,4 @@ defmodule PrymnWeb.ServerLive.Index do
Logger.debug("received unexpected message #{inspect(msg)}")
{:noreply, state}
end
# @impl true
# def handle_event("delete", %{"id" => id}, socket) do
# server = Servers.get_server!(id)
# {:ok, _} = Servers.delete_server(server)
# {:noreply, stream_delete(socket, :servers, server)}
# end
end