defmodule Prymn.Agents.Connection do @moduledoc false # TODO: Disconnect after a while of idling. Disconnect when the healthcheck # fails too many times. defstruct [:channel, up?: false] @ping_interval 20000 use GenServer, restart: :transient require Logger @spec start_link(String.t()) :: GenServer.on_start() def start_link(addr) do GenServer.start_link(__MODULE__, addr, name: via(addr)) end @spec get_channel(String.t()) :: GRPC.Channel.t() def get_channel(addr) do GenServer.call(via(addr), :get_channel) end @spec drop(String.t()) :: :ok def drop(addr) do GenServer.stop(via(addr), :shutdown) end @impl true def init(addr) do case GRPC.Stub.connect(addr) do {:ok, channel} -> Logger.info("Starting new connection at address #{addr}") state = %__MODULE__{channel: channel, up?: true} Process.send_after(self(), :do_healthcheck, @ping_interval) {:ok, state} {:error, error} -> {:stop, error} end end @impl true def handle_call(:get_channel, _from, state) do {:reply, state.channel, state} end @impl true def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do Logger.info("[Agent] #{state.channel.host} regained connection") Phoenix.PubSub.broadcast!( Prymn.PubSub, "agent:#{channel.host}", {:healthcheck, :up, channel.host} ) {:noreply, %{state | up?: true}} end @impl true def handle_info({:gun_down, _pid, _proto, reason, _}, %{channel: channel} = state) do Logger.info("[Agent] #{channel.host} lost connection, reason: #{reason}") Phoenix.PubSub.broadcast!( Prymn.PubSub, "agent:#{channel.host}", {:healthcheck, :down, channel.host} ) {:noreply, %{state | up?: false}} end @impl true def handle_info(:do_healthcheck, %{channel: channel} = state) do request = %PrymnProto.Prymn.EchoRequest{message: "hello"} case PrymnProto.Prymn.Agent.Stub.echo(channel, request) do {:ok, _reply} -> :noop {:error, error} -> Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}") end Process.send_after(self(), :do_healthcheck, @ping_interval) {:noreply, state} end @impl true def handle_info(msg, state) do Logger.info("received unexpected message: #{inspect(msg)}") {:noreply, state} end @impl true def terminate(_reason, %{channel: channel}) do GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end end