defmodule Prymn.Agents.Connection do @moduledoc false alias Prymn.Agents.Health require Logger use GenServer, restart: :transient @timeout :timer.minutes(2) @spec start_link(String.t()) :: GenServer.on_start() def start_link(host_address) do GenServer.start_link(__MODULE__, host_address, name: via(host_address)) end @spec get_channel(pid) :: GRPC.Channel.t() | nil def get_channel(server) do GenServer.call(server, :get_channel) end ## ## Server callbacks ## @impl true def init(host) do # Process.flag(:trap_exit, true) pid = self() # Start a connection without blocking the GenServer Task.start_link(fn -> case GRPC.Stub.connect(host, 50_012, []) do {:ok, channel} -> send(pid, channel) {:error, error} -> send(pid, {:connect_error, error}) end # Keep receiving and sending back any messages to the GenServer forever receive_loop(pid) end) {:ok, {host, nil}} end @impl true def handle_continue(:health, {_, channel} = state) do pid = self() Task.start_link(fn -> {:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{}) # Read from the stream forever and send data back to parent stream |> Stream.each(fn {_, data} -> send(pid, data) end) |> Enum.take_while(fn _ -> true end) end) {:noreply, state} end @impl true def handle_cast(_, state) do {:noreply, state} end @impl true def handle_call(:get_channel, _from, {_, channel} = state) do {:reply, channel, state, @timeout} end @impl true def handle_info(%GRPC.Channel{} = channel, {host, _}) do {:noreply, {host, channel}, {:continue, :health}} end def handle_info({:connect_error, reason}, {host, _} = state) do if reason == :timeout do Health.lookup(host, default: true) |> Health.make_timed_out() |> Health.update_and_broadcast() end {:stop, reason, state} end def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do response |> Health.make_from_proto(host) |> Health.update_and_broadcast() {:noreply, state, @timeout} end def handle_info(%GRPC.RPCError{} = response, state) do Logger.debug("received a GRPC error: #{inspect(response)}") {:noreply, state} end def handle_info({:gun_up, _pid, _protocol}, state) do # NOTE: If it's possible for the GRPC connection to be down when we receive # this message, maybe we should restart the connection {:noreply, state, {:continue, :health}} end def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do Health.lookup(host) |> Health.make_disconnected() |> Health.update_and_broadcast() {:noreply, state, @timeout} end def handle_info(:timeout, state) do {:stop, {:shutdown, :timeout}, state} end def handle_info(msg, state) do Logger.debug("received unhandled message #{inspect(msg)}") {:noreply, state} end @impl true def terminate(reason, {host, channel}) do Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})") Health.delete(host) if channel, do: GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end defp receive_loop(pid) do receive do msg -> send(pid, msg) end receive_loop(pid) end end