From 5bd44ef9ac155044c1f89fdf61afc3f0580b0376 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Sat, 19 Aug 2023 21:14:07 +0300 Subject: [PATCH] refactor: agent connections now use the new healthcheck streaming `Prymn.Agent.Connection` GenServer uses healthchecking with the new streaming RPC, retrieving asynchronously the health status of the agent. The Connection will also shut itself down when there's no usage (when the keepalive function is not called frequently) --- app/lib/prymn/agents.ex | 46 ++------ app/lib/prymn/agents/connection.ex | 114 ++++++++++---------- app/lib/prymn/application.ex | 2 +- app/lib/prymn_web/live/server_live/index.ex | 40 ++----- 4 files changed, 73 insertions(+), 129 deletions(-) diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index 4e42fd5..c0c6139 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -4,48 +4,16 @@ defmodule Prymn.Agents do communicates with them using GRPC calls. GRPC connections are started using the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the Prymn.Agents.Registry. - - ## Examples - - TODO """ - @doc """ - Ensures a connection with the Prymn Agent exists and is kept in memory. + def start_connection_or_keep_alive(ip) do + case Registry.lookup(Prymn.Agents.Registry, ip) do + [{pid, _}] -> + Prymn.Agents.Connection.keep_alive(pid) - Returns `:ok` when a new connection is successfuly established or is already established - - Returns `{:error, reason}` when the connection could not be established - """ - @spec ensure_connection(String.t()) :: :ok | {:error, term} - def ensure_connection(public_ip) do - child = {Prymn.Agents.Connection, public_ip} - - case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do - {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> :ok - {:error, error} -> {:error, error} + [] -> + child = {Prymn.Agents.Connection, ip} + {:ok, _pid} = DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) end end - - @doc """ - Terminates the process and drops the connection gracefully. - """ - @spec drop_connection(String.t()) :: :ok | {:error, :not_found} - def drop_connection(address) do - :ok = Prymn.Agents.Connection.drop(address) - catch - :exit, _ -> {:error, :not_found} - end - - @doc """ - Get the channel for the given `address`. The channel is used to make GRPC - calls. - """ - @spec get_channel(String.t()) :: GRPC.Channel.t() | {:error, :not_found} - def get_channel(address) do - Prymn.Agents.Connection.get_channel(address) - catch - :exit, _ -> {:error, :not_found} - end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 5da6d29..3ee50d3 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -1,15 +1,11 @@ defmodule Prymn.Agents.Connection do @moduledoc false - # TODO: Disconnect after a while of idling. Disconnect when the healthcheck - # fails too many times. + @timer_interval 120_000 - defstruct [:channel, up?: false] - - @healthcheck_inverval 20000 + defstruct [:channel, :timer_ref] require Logger - alias PrymnProto.Prymn.Agent.Stub, as: Grpc use GenServer, restart: :transient @@ -18,90 +14,88 @@ defmodule Prymn.Agents.Connection do GenServer.start_link(__MODULE__, addr, name: via(addr)) end - @spec get_channel(String.t()) :: GRPC.Channel.t() - def get_channel(addr) do - GenServer.call(via(addr), :get_channel) - end - - @spec drop(String.t()) :: :ok - def drop(addr) do - GenServer.stop(via(addr), :shutdown) + @spec keep_alive(pid) :: :ok + def keep_alive(pid) do + GenServer.cast(pid, :reset_timer) end @impl true - def init(public_ip) do - case GRPC.Stub.connect("#{public_ip}:50012") do - {:ok, channel} -> - send(self(), :do_healthcheck) + def init(host) do + Process.flag(:trap_exit, true) + {:ok, %__MODULE__{}, {:continue, host}} + end - {:ok, %__MODULE__{channel: channel, up?: true}} + @impl true + def handle_continue(host, state) when is_binary(host) do + case GRPC.Stub.connect(host, 50012, []) do + {:ok, channel} -> + GenServer.cast(self(), :reset_timer) + {:noreply, %__MODULE__{channel: channel}, {:continue, :health}} {:error, error} -> - broadcast_healthcheck!(:down, public_ip) - - {:stop, error} + {:stop, {:error, error}, state} end end @impl true - def handle_call(:get_channel, _from, state) do - {:reply, state.channel, state} + def handle_continue(:health, state) do + pid = self() + + Task.start_link(fn -> + {:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(state.channel, %Google.Protobuf.Empty{}) + + stream + |> Stream.each(fn health -> send(pid, {:health, health}) end) + |> Enum.take_while(fn _ -> true end) + end) + + {:noreply, state} end @impl true - def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do - broadcast_healthcheck!(:up, channel.host) - {:noreply, %{state | up?: true}} + def handle_cast(:reset_timer, state) do + if state.timer_ref, do: Process.cancel_timer(state.timer_ref) + ref = Process.send_after(self(), :drop_connection, @timer_interval) + {:noreply, put_in(state.timer_ref, ref)} + end + + @impl true + def handle_info(:drop_connection, state) do + Logger.debug("shutting down connection with agent host #{inspect(state.channel.host)}") + {:stop, :shutdown, state} + end + + @impl true + def handle_info({:health, health}, state) do + IO.inspect(health) + {:noreply, state} + end + + @impl true + def handle_info({:gun_up, _pid, _protocol}, state) do + # TODO: If it's possible for the GRPC connection to be down when we receive + # this message, we should `{:continue, state.channel.host}` + {:noreply, state, {:continue, :health}} end @impl true def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do - broadcast_healthcheck!(:down, channel.host) - {:noreply, %{state | up?: false}} - end - - @impl true - def handle_info(:do_healthcheck, %{channel: channel, up?: up?} = state) do - request = %PrymnProto.Prymn.EchoRequest{message: "hello"} - - if up? do - case Grpc.echo(channel, request) do - {:ok, _reply} -> - broadcast_healthcheck!(:up, channel.host) - - {:error, error} -> - Logger.warning( - "healthcheck error for server #{channel.host}, reason: #{inspect(error)}" - ) - end - else - broadcast_healthcheck!(:down, channel.host) - end - - Process.send_after(self(), :do_healthcheck, @healthcheck_inverval) + Logger.debug("disconnected from #{inspect(channel)}") {:noreply, state} end @impl true def handle_info(msg, state) do - Logger.debug("received unexpected message: #{inspect(msg)}") + Logger.warning("received unexpected message: #{inspect(msg)}") {:noreply, state} end @impl true def terminate(_reason, %{channel: channel}) do - GRPC.Stub.disconnect(channel) + if channel, do: GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end - - defp broadcast_healthcheck!(msg, ip_address) do - Phoenix.PubSub.broadcast!( - Prymn.PubSub, - "agent:#{ip_address}", - {:healthcheck, ip_address, msg} - ) - end end diff --git a/app/lib/prymn/application.ex b/app/lib/prymn/application.ex index c44afd6..feda28d 100644 --- a/app/lib/prymn/application.ex +++ b/app/lib/prymn/application.ex @@ -20,7 +20,7 @@ defmodule Prymn.Application do PrymnWeb.Endpoint, # Start the prymn agent (grpc) registry and the supervisor {Registry, keys: :unique, name: Prymn.Agents.Registry}, - {DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one} + {DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one, max_seconds: 60} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 8d7f9ac..4cbf0f1 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -7,35 +7,25 @@ defmodule PrymnWeb.ServerLive.Index do @impl true def mount(_params, _session, socket) do servers = Servers.list_servers() - # pid = self() - for %Servers.Server{status: :registered, public_ip: ip} <- servers do - :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{ip}") - - Task.start_link(fn -> - case Agents.ensure_connection(ip) do - :ok -> IO.puts("Ok") - _ -> IO.puts("not ok") - end - end) + if connected?(socket) do + for %Servers.Server{status: :registered, public_ip: ip} <- servers do + Agents.start_connection_or_keep_alive(ip) + end end {:ok, assign(socket, :servers, servers)} end @impl true - def handle_params(params, _url, socket) do - {:noreply, apply_action(socket, socket.assigns.live_action, params)} - end + def handle_params(_params, _url, socket) do + socket = + case socket.assigns.live_action do + :new -> assign(socket, :page_title, gettext("Connect a Server")) + :index -> assign(socket, :page_title, gettext("Listing Servers")) + end - defp apply_action(socket, :new, _params) do - socket - |> assign(:page_title, "New Server") - end - - defp apply_action(socket, :index, _params) do - socket - |> assign(:page_title, "Listing Servers") + {:noreply, socket} end @impl true @@ -62,12 +52,4 @@ defmodule PrymnWeb.ServerLive.Index do Logger.debug("received unexpected message #{inspect(msg)}") {:noreply, state} end - - # @impl true - # def handle_event("delete", %{"id" => id}, socket) do - # server = Servers.get_server!(id) - # {:ok, _} = Servers.delete_server(server) - - # {:noreply, stream_delete(socket, :servers, server)} - # end end