dotfiles/app/lib/prymn/agents/connection.ex
Nikos Papadakis 5bd44ef9ac
refactor: agent connections now use the new healthcheck streaming
`Prymn.Agent.Connection` GenServer uses healthchecking with the new
streaming RPC, retrieving asynchronously the health status of the agent.

The Connection will also shut itself down when there's no usage (when
the keepalive function is not called frequently)
2023-08-19 21:14:07 +03:00

101 lines
2.5 KiB
Elixir

defmodule Prymn.Agents.Connection do
@moduledoc false
@timer_interval 120_000
defstruct [:channel, :timer_ref]
require Logger
use GenServer, restart: :transient
@spec start_link(String.t()) :: GenServer.on_start()
def start_link(addr) do
GenServer.start_link(__MODULE__, addr, name: via(addr))
end
@spec keep_alive(pid) :: :ok
def keep_alive(pid) do
GenServer.cast(pid, :reset_timer)
end
@impl true
def init(host) do
Process.flag(:trap_exit, true)
{:ok, %__MODULE__{}, {:continue, host}}
end
@impl true
def handle_continue(host, state) when is_binary(host) do
case GRPC.Stub.connect(host, 50012, []) do
{:ok, channel} ->
GenServer.cast(self(), :reset_timer)
{:noreply, %__MODULE__{channel: channel}, {:continue, :health}}
{:error, error} ->
{:stop, {:error, error}, state}
end
end
@impl true
def handle_continue(:health, state) do
pid = self()
Task.start_link(fn ->
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(state.channel, %Google.Protobuf.Empty{})
stream
|> Stream.each(fn health -> send(pid, {:health, health}) end)
|> Enum.take_while(fn _ -> true end)
end)
{:noreply, state}
end
@impl true
def handle_cast(:reset_timer, state) do
if state.timer_ref, do: Process.cancel_timer(state.timer_ref)
ref = Process.send_after(self(), :drop_connection, @timer_interval)
{:noreply, put_in(state.timer_ref, ref)}
end
@impl true
def handle_info(:drop_connection, state) do
Logger.debug("shutting down connection with agent host #{inspect(state.channel.host)}")
{:stop, :shutdown, state}
end
@impl true
def handle_info({:health, health}, state) do
IO.inspect(health)
{:noreply, state}
end
@impl true
def handle_info({:gun_up, _pid, _protocol}, state) do
# TODO: If it's possible for the GRPC connection to be down when we receive
# this message, we should `{:continue, state.channel.host}`
{:noreply, state, {:continue, :health}}
end
@impl true
def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do
Logger.debug("disconnected from #{inspect(channel)}")
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
Logger.warning("received unexpected message: #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(_reason, %{channel: channel}) do
if channel, do: GRPC.Stub.disconnect(channel)
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
end