dotfiles/app/lib/prymn/agents/connection.ex

109 lines
2.6 KiB
Elixir
Raw Normal View History

defmodule Prymn.Agents.Connection do
@moduledoc false
2023-07-09 16:41:41 +00:00
# TODO: Disconnect after a while of idling. Disconnect when the healthcheck
# fails too many times.
defstruct [:channel, up?: false]
@ping_interval 20000
use GenServer, restart: :transient
require Logger
@spec start_link(String.t()) :: GenServer.on_start()
def start_link(addr) do
GenServer.start_link(__MODULE__, addr, name: via(addr))
end
@spec get_channel(String.t()) :: GRPC.Channel.t()
def get_channel(addr) do
GenServer.call(via(addr), :get_channel)
end
@spec drop(String.t()) :: :ok
def drop(addr) do
GenServer.stop(via(addr), :shutdown)
end
@impl true
def init(addr) do
case GRPC.Stub.connect(addr) do
{:ok, channel} ->
Logger.info("Starting new connection at address #{addr}")
state = %__MODULE__{channel: channel, up?: true}
Process.send_after(self(), :do_healthcheck, @ping_interval)
{:ok, state}
{:error, error} ->
{:stop, error}
end
end
@impl true
def handle_call(:get_channel, _from, state) do
{:reply, state.channel, state}
end
@impl true
2023-07-09 16:41:41 +00:00
def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do
Logger.info("[Agent] #{state.channel.host} regained connection")
Phoenix.PubSub.broadcast!(
Prymn.PubSub,
"agent:#{channel.host}",
{:healthcheck, :up, channel.host}
)
{:noreply, %{state | up?: true}}
end
@impl true
2023-07-09 16:41:41 +00:00
def handle_info({:gun_down, _pid, _proto, reason, _}, %{channel: channel} = state) do
Logger.info("[Agent] #{channel.host} lost connection, reason: #{reason}")
Phoenix.PubSub.broadcast!(
Prymn.PubSub,
"agent:#{channel.host}",
{:healthcheck, :down, channel.host}
)
{:noreply, %{state | up?: false}}
end
@impl true
def handle_info(:do_healthcheck, %{channel: channel} = state) do
request = %PrymnProto.Prymn.EchoRequest{message: "hello"}
case PrymnProto.Prymn.Agent.Stub.echo(channel, request) do
{:ok, _reply} ->
:noop
{:error, error} ->
Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}")
end
Process.send_after(self(), :do_healthcheck, @ping_interval)
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
Logger.info("received unexpected message: #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(_reason, %{channel: channel}) do
GRPC.Stub.disconnect(channel)
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
end