diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index 4e42fd5..c0c6139 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -4,48 +4,16 @@ defmodule Prymn.Agents do communicates with them using GRPC calls. GRPC connections are started using the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the Prymn.Agents.Registry. - - ## Examples - - TODO """ - @doc """ - Ensures a connection with the Prymn Agent exists and is kept in memory. + def start_connection_or_keep_alive(ip) do + case Registry.lookup(Prymn.Agents.Registry, ip) do + [{pid, _}] -> + Prymn.Agents.Connection.keep_alive(pid) - Returns `:ok` when a new connection is successfuly established or is already established - - Returns `{:error, reason}` when the connection could not be established - """ - @spec ensure_connection(String.t()) :: :ok | {:error, term} - def ensure_connection(public_ip) do - child = {Prymn.Agents.Connection, public_ip} - - case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do - {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> :ok - {:error, error} -> {:error, error} + [] -> + child = {Prymn.Agents.Connection, ip} + {:ok, _pid} = DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) end end - - @doc """ - Terminates the process and drops the connection gracefully. - """ - @spec drop_connection(String.t()) :: :ok | {:error, :not_found} - def drop_connection(address) do - :ok = Prymn.Agents.Connection.drop(address) - catch - :exit, _ -> {:error, :not_found} - end - - @doc """ - Get the channel for the given `address`. The channel is used to make GRPC - calls. - """ - @spec get_channel(String.t()) :: GRPC.Channel.t() | {:error, :not_found} - def get_channel(address) do - Prymn.Agents.Connection.get_channel(address) - catch - :exit, _ -> {:error, :not_found} - end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 5da6d29..3ee50d3 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -1,15 +1,11 @@ defmodule Prymn.Agents.Connection do @moduledoc false - # TODO: Disconnect after a while of idling. Disconnect when the healthcheck - # fails too many times. + @timer_interval 120_000 - defstruct [:channel, up?: false] - - @healthcheck_inverval 20000 + defstruct [:channel, :timer_ref] require Logger - alias PrymnProto.Prymn.Agent.Stub, as: Grpc use GenServer, restart: :transient @@ -18,90 +14,88 @@ defmodule Prymn.Agents.Connection do GenServer.start_link(__MODULE__, addr, name: via(addr)) end - @spec get_channel(String.t()) :: GRPC.Channel.t() - def get_channel(addr) do - GenServer.call(via(addr), :get_channel) - end - - @spec drop(String.t()) :: :ok - def drop(addr) do - GenServer.stop(via(addr), :shutdown) + @spec keep_alive(pid) :: :ok + def keep_alive(pid) do + GenServer.cast(pid, :reset_timer) end @impl true - def init(public_ip) do - case GRPC.Stub.connect("#{public_ip}:50012") do - {:ok, channel} -> - send(self(), :do_healthcheck) + def init(host) do + Process.flag(:trap_exit, true) + {:ok, %__MODULE__{}, {:continue, host}} + end - {:ok, %__MODULE__{channel: channel, up?: true}} + @impl true + def handle_continue(host, state) when is_binary(host) do + case GRPC.Stub.connect(host, 50012, []) do + {:ok, channel} -> + GenServer.cast(self(), :reset_timer) + {:noreply, %__MODULE__{channel: channel}, {:continue, :health}} {:error, error} -> - broadcast_healthcheck!(:down, public_ip) - - {:stop, error} + {:stop, {:error, error}, state} end end @impl true - def handle_call(:get_channel, _from, state) do - {:reply, state.channel, state} + def handle_continue(:health, state) do + pid = self() + + Task.start_link(fn -> + {:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(state.channel, %Google.Protobuf.Empty{}) + + stream + |> Stream.each(fn health -> send(pid, {:health, health}) end) + |> Enum.take_while(fn _ -> true end) + end) + + {:noreply, state} end @impl true - def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do - broadcast_healthcheck!(:up, channel.host) - {:noreply, %{state | up?: true}} + def handle_cast(:reset_timer, state) do + if state.timer_ref, do: Process.cancel_timer(state.timer_ref) + ref = Process.send_after(self(), :drop_connection, @timer_interval) + {:noreply, put_in(state.timer_ref, ref)} + end + + @impl true + def handle_info(:drop_connection, state) do + Logger.debug("shutting down connection with agent host #{inspect(state.channel.host)}") + {:stop, :shutdown, state} + end + + @impl true + def handle_info({:health, health}, state) do + IO.inspect(health) + {:noreply, state} + end + + @impl true + def handle_info({:gun_up, _pid, _protocol}, state) do + # TODO: If it's possible for the GRPC connection to be down when we receive + # this message, we should `{:continue, state.channel.host}` + {:noreply, state, {:continue, :health}} end @impl true def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do - broadcast_healthcheck!(:down, channel.host) - {:noreply, %{state | up?: false}} - end - - @impl true - def handle_info(:do_healthcheck, %{channel: channel, up?: up?} = state) do - request = %PrymnProto.Prymn.EchoRequest{message: "hello"} - - if up? do - case Grpc.echo(channel, request) do - {:ok, _reply} -> - broadcast_healthcheck!(:up, channel.host) - - {:error, error} -> - Logger.warning( - "healthcheck error for server #{channel.host}, reason: #{inspect(error)}" - ) - end - else - broadcast_healthcheck!(:down, channel.host) - end - - Process.send_after(self(), :do_healthcheck, @healthcheck_inverval) + Logger.debug("disconnected from #{inspect(channel)}") {:noreply, state} end @impl true def handle_info(msg, state) do - Logger.debug("received unexpected message: #{inspect(msg)}") + Logger.warning("received unexpected message: #{inspect(msg)}") {:noreply, state} end @impl true def terminate(_reason, %{channel: channel}) do - GRPC.Stub.disconnect(channel) + if channel, do: GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end - - defp broadcast_healthcheck!(msg, ip_address) do - Phoenix.PubSub.broadcast!( - Prymn.PubSub, - "agent:#{ip_address}", - {:healthcheck, ip_address, msg} - ) - end end diff --git a/app/lib/prymn/application.ex b/app/lib/prymn/application.ex index c44afd6..feda28d 100644 --- a/app/lib/prymn/application.ex +++ b/app/lib/prymn/application.ex @@ -20,7 +20,7 @@ defmodule Prymn.Application do PrymnWeb.Endpoint, # Start the prymn agent (grpc) registry and the supervisor {Registry, keys: :unique, name: Prymn.Agents.Registry}, - {DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one} + {DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one, max_seconds: 60} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 8d7f9ac..4cbf0f1 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -7,35 +7,25 @@ defmodule PrymnWeb.ServerLive.Index do @impl true def mount(_params, _session, socket) do servers = Servers.list_servers() - # pid = self() - for %Servers.Server{status: :registered, public_ip: ip} <- servers do - :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{ip}") - - Task.start_link(fn -> - case Agents.ensure_connection(ip) do - :ok -> IO.puts("Ok") - _ -> IO.puts("not ok") - end - end) + if connected?(socket) do + for %Servers.Server{status: :registered, public_ip: ip} <- servers do + Agents.start_connection_or_keep_alive(ip) + end end {:ok, assign(socket, :servers, servers)} end @impl true - def handle_params(params, _url, socket) do - {:noreply, apply_action(socket, socket.assigns.live_action, params)} - end + def handle_params(_params, _url, socket) do + socket = + case socket.assigns.live_action do + :new -> assign(socket, :page_title, gettext("Connect a Server")) + :index -> assign(socket, :page_title, gettext("Listing Servers")) + end - defp apply_action(socket, :new, _params) do - socket - |> assign(:page_title, "New Server") - end - - defp apply_action(socket, :index, _params) do - socket - |> assign(:page_title, "Listing Servers") + {:noreply, socket} end @impl true @@ -62,12 +52,4 @@ defmodule PrymnWeb.ServerLive.Index do Logger.debug("received unexpected message #{inspect(msg)}") {:noreply, state} end - - # @impl true - # def handle_event("delete", %{"id" => id}, socket) do - # server = Servers.get_server!(id) - # {:ok, _} = Servers.delete_server(server) - - # {:noreply, stream_delete(socket, :servers, server)} - # end end