defmodule Prymn.Agents.Connection do @moduledoc false # TODO: Disconnect after a while of idling. Disconnect when the healthcheck # fails too many times. defstruct [:channel, up?: false] @healthcheck_inverval 20000 require Logger alias PrymnProto.Prymn.Agent.Stub, as: Grpc use GenServer, restart: :transient @spec start_link(String.t()) :: GenServer.on_start() def start_link(addr) do GenServer.start_link(__MODULE__, addr, name: via(addr)) end @spec get_channel(String.t()) :: GRPC.Channel.t() def get_channel(addr) do GenServer.call(via(addr), :get_channel) end @spec drop(String.t()) :: :ok def drop(addr) do GenServer.stop(via(addr), :shutdown) end @impl true def init(public_ip) do case GRPC.Stub.connect("#{public_ip}:50012") do {:ok, channel} -> send(self(), :do_healthcheck) {:ok, %__MODULE__{channel: channel, up?: true}} {:error, error} -> broadcast_healthcheck!(:down, public_ip) {:stop, error} end end @impl true def handle_call(:get_channel, _from, state) do {:reply, state.channel, state} end @impl true def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do broadcast_healthcheck!(:up, channel.host) {:noreply, %{state | up?: true}} end @impl true def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do broadcast_healthcheck!(:down, channel.host) {:noreply, %{state | up?: false}} end @impl true def handle_info(:do_healthcheck, %{channel: channel, up?: up?} = state) do request = %PrymnProto.Prymn.EchoRequest{message: "hello"} if up? do case Grpc.echo(channel, request) do {:ok, _reply} -> broadcast_healthcheck!(:up, channel.host) {:error, error} -> Logger.warning( "healthcheck error for server #{channel.host}, reason: #{inspect(error)}" ) end else broadcast_healthcheck!(:down, channel.host) end Process.send_after(self(), :do_healthcheck, @healthcheck_inverval) {:noreply, state} end @impl true def handle_info(msg, state) do Logger.debug("received unexpected message: #{inspect(msg)}") {:noreply, state} end @impl true def terminate(_reason, %{channel: channel}) do GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end defp broadcast_healthcheck!(msg, ip_address) do Phoenix.PubSub.broadcast!( Prymn.PubSub, "agent:#{ip_address}", {:healthcheck, ip_address, msg} ) end end