defmodule Prymn.Agents.Connection do @moduledoc false alias Prymn.Agents.Health alias PrymnProto.Prymn.Agent.Stub require Logger use GenServer, restart: :transient @timeout :timer.minutes(2) def start_link(host_address) do GenServer.start_link(__MODULE__, host_address, name: via(host_address)) end def get_channel(server) do GenServer.call(server, :get_channel) end def get_sys_info(server) when is_pid(server) do GenServer.call(server, :get_sys_info) end def sys_update(server, dry_run) when is_pid(server) and is_boolean(dry_run) do GenServer.call(server, {:sys_update, dry_run}) end ## ## Server callbacks ## @impl true def init(host) do # Process.flag(:trap_exit, true) pid = self() # Start a connection without blocking the GenServer Task.start_link(fn -> case GRPC.Stub.connect(host, 50_012, []) do {:ok, channel} -> send(pid, channel) {:error, error} -> send(pid, {:connect_error, error}) end # Keep receiving and sending back any messages to the GenServer forever receive_loop(pid) end) {:ok, {host, nil}} end @impl true def handle_continue(:health, {_, channel} = state) do pid = self() Task.start_link(fn -> case Stub.health(channel, %Google.Protobuf.Empty{}) do {:ok, stream} -> # Read from the stream forever and send data back to parent stream |> Stream.each(fn {_, data} -> send(pid, data) end) |> Enum.take_while(fn _ -> true end) {:error, _rpcerror} -> send(pid, {:connect_error, :rpc_error}) end end) {:noreply, state} end @impl true def handle_call(:get_channel, _from, {_, channel} = state) do {:reply, channel, state, @timeout} end def handle_call(:get_sys_info, _from, {_, channel} = state) do reply = Stub.get_sys_info(channel, %Google.Protobuf.Empty{}) {:reply, reply, state, @timeout} end def handle_call({:sys_update, dry_run}, {from, _}, {_, channel} = state) do request = %PrymnProto.Prymn.SysUpdateRequest{dry_run: dry_run} streaming_call(fn -> Stub.sys_update(channel, request) end, from) {:reply, :ok, state, @timeout} end @impl true def handle_info(%GRPC.Channel{} = channel, {host, _}) do {:noreply, {host, channel}, {:continue, :health}} end def handle_info({:connect_error, reason}, {host, _} = state) do health = Health.lookup(host, default: true) case reason do :timeout -> Health.make_timed_out(health) :rpc_error -> Health.make_disconnected(health) end |> Health.update_and_broadcast() # NOTE: Here we terminate normally, which means we won't be retrying. Maybe we want to? {:stop, :normal, state} end def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do response |> Health.make_from_proto(host) |> Health.update_and_broadcast() {:noreply, state, @timeout} end def handle_info(%GRPC.RPCError{} = response, state) do Logger.debug("received a GRPC error: #{inspect(response)}") {:noreply, state} end def handle_info({:gun_up, _pid, _protocol}, state) do # NOTE: If it's possible for the GRPC connection to be down when we receive # this message, maybe we should restart the connection {:noreply, state, {:continue, :health}} end def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do Health.lookup(host, default: true) |> Health.make_disconnected() |> Health.update_and_broadcast() {:noreply, state, @timeout} end def handle_info(:timeout, state) do {:stop, {:shutdown, :timeout}, state} end def handle_info(msg, state) do Logger.debug("received unhandled message #{inspect(msg)}") {:noreply, state} end @impl true def terminate(reason, {host, channel}) do Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})") Health.delete(host) if channel, do: GRPC.Stub.disconnect(channel) end defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end defp receive_loop(pid) do receive do msg -> send(pid, msg) end receive_loop(pid) end defp streaming_call(fun, from) do Task.start_link(fn -> case fun.() do {:ok, stream} -> stream |> Stream.each(fn {:ok, data} -> send(from, data) end) |> Enum.to_list() {:error, _error} -> :todo end end) end end