Some work has been done on making the Connection feel nicer, but also more work is needed to not have the channel be exposed to the upper layers of the application. We should wrap all the GRPC calls in the GenServer (which may also allow caching on certain calls such as get_sys_info)
139 lines
3.4 KiB
Elixir
139 lines
3.4 KiB
Elixir
defmodule Prymn.Agents.Connection do
|
|
@moduledoc false
|
|
|
|
alias Prymn.Agents.Health
|
|
require Logger
|
|
|
|
use GenServer, restart: :transient
|
|
|
|
@timeout :timer.minutes(2)
|
|
|
|
@spec start_link(String.t()) :: GenServer.on_start()
|
|
def start_link(host_address) do
|
|
GenServer.start_link(__MODULE__, host_address, name: via(host_address))
|
|
end
|
|
|
|
@spec get_channel(pid) :: GRPC.Channel.t() | nil
|
|
def get_channel(server) do
|
|
GenServer.call(server, :get_channel)
|
|
end
|
|
|
|
##
|
|
## Server callbacks
|
|
##
|
|
|
|
@impl true
|
|
def init(host) do
|
|
# Process.flag(:trap_exit, true)
|
|
pid = self()
|
|
|
|
# Start a connection without blocking the GenServer
|
|
Task.start_link(fn ->
|
|
case GRPC.Stub.connect(host, 50012, []) do
|
|
{:ok, channel} -> send(pid, channel)
|
|
{:error, error} -> send(pid, {:connect_error, error})
|
|
end
|
|
|
|
# Keep receiving and sending back any messages to the GenServer forever
|
|
receive_loop(pid)
|
|
end)
|
|
|
|
{:ok, {host, nil}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_continue(:health, {_, channel} = state) do
|
|
pid = self()
|
|
|
|
Task.start_link(fn ->
|
|
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{})
|
|
|
|
# Read from the stream forever and send data back to parent
|
|
stream
|
|
|> Stream.each(fn {_, data} -> send(pid, data) end)
|
|
|> Enum.take_while(fn _ -> true end)
|
|
end)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast(_, state) do
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call(:get_channel, _from, {_, channel} = state) do
|
|
{:reply, channel, state, @timeout}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(%GRPC.Channel{} = channel, {host, _}) do
|
|
{:noreply, {host, channel}, {:continue, :health}}
|
|
end
|
|
|
|
def handle_info({:connect_error, reason}, {host, _} = state) do
|
|
if reason == :timeout do
|
|
Health.lookup(host, default: true)
|
|
|> Health.make_timed_out()
|
|
|> Health.update_and_broadcast()
|
|
end
|
|
|
|
{:stop, reason, state}
|
|
end
|
|
|
|
def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do
|
|
response
|
|
|> Health.make_from_proto(host)
|
|
|> Health.update_and_broadcast()
|
|
|
|
{:noreply, state, @timeout}
|
|
end
|
|
|
|
def handle_info(%GRPC.RPCError{} = response, state) do
|
|
Logger.debug("received a GRPC error: #{inspect(response)}")
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info({:gun_up, _pid, _protocol}, state) do
|
|
# NOTE: If it's possible for the GRPC connection to be down when we receive
|
|
# this message, maybe we should restart the connection
|
|
{:noreply, state, {:continue, :health}}
|
|
end
|
|
|
|
def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do
|
|
Health.lookup(host)
|
|
|> Health.make_disconnected()
|
|
|> Health.update_and_broadcast()
|
|
|
|
{:noreply, state, @timeout}
|
|
end
|
|
|
|
def handle_info(:timeout, state) do
|
|
{:stop, {:shutdown, :timeout}, state}
|
|
end
|
|
|
|
def handle_info(msg, state) do
|
|
Logger.debug("received unhandled message #{inspect(msg)}")
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def terminate(reason, {host, channel}) do
|
|
Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})")
|
|
Health.delete(host)
|
|
if channel, do: GRPC.Stub.disconnect(channel)
|
|
end
|
|
|
|
defp via(name) do
|
|
{:via, Registry, {Prymn.Agents.Registry, name}}
|
|
end
|
|
|
|
defp receive_loop(pid) do
|
|
receive do
|
|
msg -> send(pid, msg)
|
|
end
|
|
|
|
receive_loop(pid)
|
|
end
|
|
end
|