2023-06-27 19:28:00 +00:00
|
|
|
defmodule Prymn.Agents.Connection do
|
|
|
|
@moduledoc false
|
|
|
|
|
2023-08-28 20:32:42 +00:00
|
|
|
alias Prymn.Agents.Health
|
2023-06-27 19:28:00 +00:00
|
|
|
require Logger
|
2023-07-26 19:41:52 +00:00
|
|
|
|
|
|
|
use GenServer, restart: :transient
|
2023-06-27 19:28:00 +00:00
|
|
|
|
2023-08-25 21:51:04 +00:00
|
|
|
@timeout :timer.minutes(2)
|
|
|
|
|
2023-06-27 19:28:00 +00:00
|
|
|
@spec start_link(String.t()) :: GenServer.on_start()
|
2023-08-28 20:32:42 +00:00
|
|
|
def start_link(host_address) do
|
|
|
|
GenServer.start_link(__MODULE__, host_address, name: via(host_address))
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
2023-08-28 20:32:42 +00:00
|
|
|
@spec get_channel(pid) :: GRPC.Channel.t() | nil
|
2023-08-25 21:51:04 +00:00
|
|
|
def get_channel(server) do
|
2023-08-28 20:32:42 +00:00
|
|
|
GenServer.call(server, :get_channel)
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
2023-08-28 20:32:42 +00:00
|
|
|
##
|
|
|
|
## Server callbacks
|
|
|
|
##
|
|
|
|
|
2023-08-19 18:14:07 +00:00
|
|
|
@impl true
|
|
|
|
def init(host) do
|
2023-08-28 20:32:42 +00:00
|
|
|
# Process.flag(:trap_exit, true)
|
|
|
|
pid = self()
|
2023-06-27 19:28:00 +00:00
|
|
|
|
2023-08-28 20:32:42 +00:00
|
|
|
# Start a connection without blocking the GenServer
|
|
|
|
Task.start_link(fn ->
|
2023-09-05 17:54:59 +00:00
|
|
|
case GRPC.Stub.connect(host, 50_012, []) do
|
2023-08-28 20:32:42 +00:00
|
|
|
{:ok, channel} -> send(pid, channel)
|
|
|
|
{:error, error} -> send(pid, {:connect_error, error})
|
|
|
|
end
|
|
|
|
|
|
|
|
# Keep receiving and sending back any messages to the GenServer forever
|
|
|
|
receive_loop(pid)
|
|
|
|
end)
|
|
|
|
|
|
|
|
{:ok, {host, nil}}
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
2023-08-25 21:51:04 +00:00
|
|
|
def handle_continue(:health, {_, channel} = state) do
|
2023-08-19 18:14:07 +00:00
|
|
|
pid = self()
|
|
|
|
|
|
|
|
Task.start_link(fn ->
|
2023-08-25 21:51:04 +00:00
|
|
|
{:ok, stream} = PrymnProto.Prymn.Agent.Stub.health(channel, %Google.Protobuf.Empty{})
|
2023-08-19 18:14:07 +00:00
|
|
|
|
2023-08-25 21:51:04 +00:00
|
|
|
# Read from the stream forever and send data back to parent
|
2023-08-19 18:14:07 +00:00
|
|
|
stream
|
2023-08-25 21:51:04 +00:00
|
|
|
|> Stream.each(fn {_, data} -> send(pid, data) end)
|
2023-08-19 18:14:07 +00:00
|
|
|
|> Enum.take_while(fn _ -> true end)
|
|
|
|
end)
|
|
|
|
|
2023-08-28 20:32:42 +00:00
|
|
|
{:noreply, state}
|
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
|
|
|
def handle_cast(_, state) do
|
|
|
|
{:noreply, state}
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
2023-08-25 21:51:04 +00:00
|
|
|
def handle_call(:get_channel, _from, {_, channel} = state) do
|
|
|
|
{:reply, channel, state, @timeout}
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
2023-08-28 20:32:42 +00:00
|
|
|
def handle_info(%GRPC.Channel{} = channel, {host, _}) do
|
|
|
|
{:noreply, {host, channel}, {:continue, :health}}
|
|
|
|
end
|
|
|
|
|
|
|
|
def handle_info({:connect_error, reason}, {host, _} = state) do
|
|
|
|
if reason == :timeout do
|
|
|
|
Health.lookup(host, default: true)
|
|
|
|
|> Health.make_timed_out()
|
|
|
|
|> Health.update_and_broadcast()
|
|
|
|
end
|
|
|
|
|
|
|
|
{:stop, reason, state}
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
2023-08-25 21:51:04 +00:00
|
|
|
def handle_info(%PrymnProto.Prymn.HealthResponse{} = response, {host, _} = state) do
|
|
|
|
response
|
2023-08-28 20:32:42 +00:00
|
|
|
|> Health.make_from_proto(host)
|
|
|
|
|> Health.update_and_broadcast()
|
2023-08-25 21:51:04 +00:00
|
|
|
|
|
|
|
{:noreply, state, @timeout}
|
|
|
|
end
|
|
|
|
|
2023-08-28 20:32:42 +00:00
|
|
|
def handle_info(%GRPC.RPCError{} = response, state) do
|
2023-08-25 21:51:04 +00:00
|
|
|
Logger.debug("received a GRPC error: #{inspect(response)}")
|
2023-08-28 20:32:42 +00:00
|
|
|
{:noreply, state}
|
2023-08-19 18:14:07 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
def handle_info({:gun_up, _pid, _protocol}, state) do
|
2023-08-28 20:32:42 +00:00
|
|
|
# NOTE: If it's possible for the GRPC connection to be down when we receive
|
|
|
|
# this message, maybe we should restart the connection
|
2023-08-19 18:14:07 +00:00
|
|
|
{:noreply, state, {:continue, :health}}
|
|
|
|
end
|
2023-06-27 19:28:00 +00:00
|
|
|
|
2023-08-25 21:51:04 +00:00
|
|
|
def handle_info({:gun_down, _pid, _proto, _reason, _}, {host, _} = state) do
|
2023-08-28 20:32:42 +00:00
|
|
|
Health.lookup(host)
|
|
|
|
|> Health.make_disconnected()
|
|
|
|
|> Health.update_and_broadcast()
|
|
|
|
|
2023-08-25 21:51:04 +00:00
|
|
|
{:noreply, state, @timeout}
|
|
|
|
end
|
|
|
|
|
|
|
|
def handle_info(:timeout, state) do
|
|
|
|
{:stop, {:shutdown, :timeout}, state}
|
|
|
|
end
|
|
|
|
|
2023-06-27 19:28:00 +00:00
|
|
|
def handle_info(msg, state) do
|
2023-08-28 20:32:42 +00:00
|
|
|
Logger.debug("received unhandled message #{inspect(msg)}")
|
|
|
|
{:noreply, state}
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
2023-08-25 21:51:04 +00:00
|
|
|
def terminate(reason, {host, channel}) do
|
|
|
|
Logger.debug("terminating Agent connection (host: #{host}, reason: #{inspect(reason)})")
|
2023-08-28 20:32:42 +00:00
|
|
|
Health.delete(host)
|
2023-08-19 18:14:07 +00:00
|
|
|
if channel, do: GRPC.Stub.disconnect(channel)
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
defp via(name) do
|
|
|
|
{:via, Registry, {Prymn.Agents.Registry, name}}
|
|
|
|
end
|
2023-08-28 20:32:42 +00:00
|
|
|
|
|
|
|
defp receive_loop(pid) do
|
|
|
|
receive do
|
|
|
|
msg -> send(pid, msg)
|
|
|
|
end
|
|
|
|
|
|
|
|
receive_loop(pid)
|
|
|
|
end
|
2023-06-27 19:28:00 +00:00
|
|
|
end
|