dotfiles/app/lib/prymn/agents/connection.ex
Nikos Papadakis 5c64f02579 Feature: Agent Tasks (#8)
Reviewed-on: https://git.nikos.gg/prymn/prymn/pulls/8
Co-authored-by: Nikos Papadakis <nikos@papadakis.xyz>
Co-committed-by: Nikos Papadakis <nikos@papadakis.xyz>
2023-11-14 15:23:50 +00:00

173 lines
4.4 KiB
Elixir

defmodule Prymn.Agents.Connection do
@moduledoc false
alias Prymn.Agents.Health
alias PrymnProto.Prymn.Agent.Stub
require Logger
use GenServer, restart: :transient
@timeout :timer.minutes(2)
def start_link(host_address) do
GenServer.start_link(__MODULE__, host_address, name: via(host_address))
end
def get_channel(server) do
GenServer.call(server, :get_channel)
end
def get_sys_info(server) when is_pid(server) do
GenServer.call(server, :get_sys_info)
end
def sys_update(server, dry_run) when is_pid(server) and is_boolean(dry_run) do
GenServer.call(server, {:sys_update, dry_run})
end
##
## Server callbacks
##
@impl true
def init(host) do
# Process.flag(:trap_exit, true)
pid = self()
# Start a connection without blocking the GenServer
Task.start_link(fn ->
case GRPC.Stub.connect(host, 50_012, []) do
{:ok, channel} -> send(pid, channel)
{:error, error} -> send(pid, {:connect_error, error})
end
# Keep receiving and sending back any messages to the GenServer forever
receive_loop(pid)
end)
{:ok, {host, nil}}
end
@impl true
def handle_continue(:health, {_, channel} = state) do
pid = self()
Task.start_link(fn ->
case Stub.health(channel, %Google.Protobuf.Empty{}) do
{:ok, stream} ->
# Read from the stream forever and send data back to parent
stream
|> Stream.each(fn {_, data} -> send(pid, data) end)
|> Enum.take_while(fn _ -> true end)
{:error, _rpcerror} ->
send(pid, {:connect_error, :rpc_error})
end
end)
{:noreply, state}
end
@impl true
def handle_call(:get_channel, _from, {_, channel} = state) do
{:reply, channel, state, @timeout}
end
def handle_call(:get_sys_info, _from, {_, channel} = state) do
reply = Stub.get_sys_info(channel, %Google.Protobuf.Empty{})
{:reply, reply, state, @timeout}
end
def handle_call({:sys_update, dry_run}, {from, _}, {_, channel} = state) do
request = %PrymnProto.Prymn.SysUpdateRequest{dry_run: dry_run}
streaming_call(fn -> Stub.sys_update(channel, request) end, from)
{:reply, :ok, state, @timeout}
end
@impl true
def handle_info(%GRPC.Channel{} = channel, {host, _}) do
{:noreply, {host, channel}, {:continue, :health}}
end
def handle_info({:connect_error, reason}, {host, _} = state) do
health = Health.lookup(host, default: true)
case reason do
:timeout -> Health.make_timed_out(health)
:rpc_error -> Health.make_disconnected(health)
end
|> Health.update_and_broadcast()
# NOTE: Here we terminate normally, which means we won't be retrying. Maybe we want to?
{:stop, :normal, state}
end
def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do
response
|> Health.make_from_proto(host)
|> Health.update_and_broadcast()
{:noreply, state, @timeout}
end
def handle_info(%GRPC.RPCError{} = response, state) do
Logger.debug("received a GRPC error: #{inspect(response)}")
{:noreply, state}
end
def handle_info({:gun_up, _pid, _protocol}, state) do
# NOTE: If it's possible for the GRPC connection to be down when we receive
# this message, maybe we should restart the connection
{:noreply, state, {:continue, :health}}
end
def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do
Health.lookup(host, default: true)
|> Health.make_disconnected()
|> Health.update_and_broadcast()
{:noreply, state, @timeout}
end
def handle_info(:timeout, state) do
{:stop, {:shutdown, :timeout}, state}
end
def handle_info(msg, state) do
Logger.debug("received unhandled message #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(reason, {host, channel}) do
Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})")
Health.delete(host)
if channel, do: GRPC.Stub.disconnect(channel)
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
defp receive_loop(pid) do
receive do
msg -> send(pid, msg)
end
receive_loop(pid)
end
defp streaming_call(fun, from) do
Task.start_link(fn ->
case fun.() do
{:ok, stream} ->
stream
|> Stream.each(fn {:ok, data} -> send(from, data) end)
|> Enum.to_list()
{:error, _error} ->
:todo
end
end)
end
end