defmodule Prymn.Messaging.Connection do use GenServer defstruct [:conn_pid, subscriptions: %{}] require Logger alias Prymn.Agents alias Prymn.Messaging @dialyzer {:nowarn_function, init: 1} @subscription_supervisor Prymn.Messaging.SubscriptionSupervisor @v1_prefix "agents.v1." def start_link(name) do GenServer.start_link(__MODULE__, nil, name: name) end def publish_to_agent(conn, agent_id, subject, payload \\ "") do GenServer.call(conn, {:publish, agent_id, subject, payload}) end def subscribe_to_agent(conn, agent_id, subject) do GenServer.call(conn, {:subscribe, agent_id, subject}) end @impl true def init(_init_arg) do connection_properties = %{ host: "localhost", username: "prymn_admin", password: "prymn_admin", auth_required: true } Process.flag(:trap_exit, true) case Gnat.start_link(connection_properties) do {:ok, pid} -> Logger.info("Connected to NATS") {:ok, %__MODULE__{conn_pid: pid}, {:continue, :subscribe_to_health}} {:error, reason} -> Logger.info("Connection to NATS failed (#{reason}). Attempting reconnect.") {:ok, nil, {:continue, :attempt_reconnect}} end end @impl true def handle_continue(:attempt_reconnect, state) do Process.sleep(3000) {:stop, {:shutdown, :connection_failure}, state} end def handle_continue(:subscribe_to_health, %__MODULE__{} = state) do {:ok, _subscription} = Gnat.sub(state.conn_pid, self(), @v1_prefix <> "*.health") {:noreply, state} end @impl true def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do :ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload) {:reply, :ok, state} end def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid} case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do {:ok, _pid} -> {:reply, :ok, state} {:error, {:already_started, _pid}} -> {:reply, :ok, state} {:error, error} -> {:reply, {:error, error}, state} end end @impl true def handle_info({:EXIT, _pid, _reason}, state) do Logger.info("Lost connection to NATS. Attempting reconnect.") {:noreply, state, {:continue, :attempt_reconnect}} end def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do [agent_id, "health"] = String.split(topic, ".") health = Messaging.Messages.handle_message(subject, payload) Agents.update_health(agent_id, health) {:noreply, state} end end