From 6db247878609f336654796580d3a11285f79b807 Mon Sep 17 00:00:00 2001
From: Nikos Papadakis <nikos@papadakis.xyz>
Date: Sun, 4 Feb 2024 20:13:35 +0200
Subject: [PATCH] todo: agents as genservers, messaging connection manager

---
 agent/src/messaging/v1/mod.rs                 |  6 +-
 app/lib/prymn/agents.ex                       | 36 ---------
 app/lib/prymn/agents/agent.ex                 |  7 --
 app/lib/prymn/agents/health.ex                |  9 ---
 app/lib/prymn/agents/store.ex                 | 44 ----------
 app/lib/prymn/application.ex                  |  5 +-
 app/lib/prymn/messaging.ex                    | 24 ------
 app/lib/prymn/messaging/connection.ex         | 81 +++++++------------
 app/lib/prymn/messaging/connection_manager.ex | 38 +++++++++
 .../prymn/messaging/connection_supervisor.ex  | 18 -----
 app/lib/prymn/messaging/foo_consumer.ex       | 15 ++++
 app/lib/prymn/messaging/messages.ex           | 28 -------
 app/lib/prymn/messaging/subscription.ex       | 36 ---------
 app/lib/prymn/messaging/supervisor.ex         | 17 ++++
 app/lib/prymn_web/live/server_live/index.ex   |  8 +-
 app/lib/prymn_web/live/server_live/show.ex    | 16 ++--
 flake.nix                                     |  2 -
 17 files changed, 115 insertions(+), 275 deletions(-)
 delete mode 100644 app/lib/prymn/agents/agent.ex
 delete mode 100644 app/lib/prymn/agents/health.ex
 delete mode 100644 app/lib/prymn/agents/store.ex
 create mode 100644 app/lib/prymn/messaging/connection_manager.ex
 delete mode 100644 app/lib/prymn/messaging/connection_supervisor.ex
 create mode 100644 app/lib/prymn/messaging/foo_consumer.ex
 delete mode 100644 app/lib/prymn/messaging/messages.ex
 delete mode 100644 app/lib/prymn/messaging/subscription.ex
 create mode 100644 app/lib/prymn/messaging/supervisor.ex

diff --git a/agent/src/messaging/v1/mod.rs b/agent/src/messaging/v1/mod.rs
index a80f3df..b1be56e 100644
--- a/agent/src/messaging/v1/mod.rs
+++ b/agent/src/messaging/v1/mod.rs
@@ -16,12 +16,14 @@ pub struct Client {
 
 impl Client {
     pub async fn connect(id: &str) -> Result<Self, super::MessagingError> {
+        let prefix = format!("agents.v1.{}", id);
+
         let nats = async_nats::ConnectOptions::with_user_and_password(
             String::from("demo_agent"),
             String::from("demo_agent_password"),
         )
         .name(format!("Prymn Agent {id}"))
-        .custom_inbox_prefix(format!("_INBOX_{id}"))
+        .custom_inbox_prefix(format!("{prefix}._INBOX_"))
         .connect("localhost")
         .await
         .map_err(|err| {
@@ -31,7 +33,7 @@ impl Client {
 
         Ok(Self {
             id: Arc::new(String::from(id)),
-            prefix: Arc::new(format!("agents.v1.{}", id)),
+            prefix: Arc::new(prefix),
             nats,
         })
     }
diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex
index ae51ded..1f8e075 100644
--- a/app/lib/prymn/agents.ex
+++ b/app/lib/prymn/agents.ex
@@ -1,38 +1,2 @@
 defmodule Prymn.Agents do
-  alias Prymn.Messaging
-  alias Prymn.Agents
-
-  def from_server(%Prymn.Servers.Server{} = server) do
-    agent_id = to_string(server.id)
-    Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{agent_id}")
-    Agents.Store.get_or_default(agent_id)
-  end
-
-  def from_servers(servers) when is_list(servers) do
-    Enum.reduce(servers, [], fn server, acc ->
-      [{server.id, from_server(server)} | acc]
-    end)
-    |> Map.new()
-  end
-
-  def update_health(agent_id, health) when is_binary(agent_id) do
-    agent = Agents.Store.update_health(agent_id, health)
-    Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent)
-  end
-
-  def open_terminal(%Agents.Agent{id: id}) do
-    Messaging.open_terminal(id)
-  end
-
-  def close_terminal(%Agents.Agent{id: id}) do
-    Messaging.close_terminal(id)
-  end
-
-  def resize_terminal(%Agents.Agent{id: id}, rows, cols) do
-    Messaging.resize_terminal(id, rows, cols)
-  end
-
-  def send_terminal_input(%Agents.Agent{id: id}, input) do
-    Messaging.send_terminal_input(id, input)
-  end
 end
diff --git a/app/lib/prymn/agents/agent.ex b/app/lib/prymn/agents/agent.ex
deleted file mode 100644
index 15619db..0000000
--- a/app/lib/prymn/agents/agent.ex
+++ /dev/null
@@ -1,7 +0,0 @@
-defmodule Prymn.Agents.Agent do
-  defstruct [:id, :health, status: :disconnected]
-
-  def new(id) do
-    %__MODULE__{id: id}
-  end
-end
diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex
deleted file mode 100644
index 654955b..0000000
--- a/app/lib/prymn/agents/health.ex
+++ /dev/null
@@ -1,9 +0,0 @@
-defmodule Prymn.Agents.Health do
-  @moduledoc """
-  The Health struct keeps simple health information of whether or not the
-  target host machine is up to date, has any tasks running, its resources are
-  getting depleted, or if it's unable be reached.
-  """
-
-  # defstruct status: :disconnected
-end
diff --git a/app/lib/prymn/agents/store.ex b/app/lib/prymn/agents/store.ex
deleted file mode 100644
index 6e0f9e8..0000000
--- a/app/lib/prymn/agents/store.ex
+++ /dev/null
@@ -1,44 +0,0 @@
-defmodule Prymn.Agents.Store do
-  @moduledoc false
-  use Agent
-
-  alias Prymn.Agents
-
-  # Stores and serves locally saved "Prymn Agents" to the application
-  # Not to be confused with the "Prymn Server" construct
-
-  @doc false
-  def start_link([]) do
-    Agent.start_link(fn -> %{} end, name: __MODULE__)
-  end
-
-  def get(id) do
-    Agent.get(__MODULE__, fn agents -> agents[id] end)
-  end
-
-  def get_or_default(id, new_agent \\ nil) do
-    case Agent.get(__MODULE__, fn agents -> agents[id] end) do
-      nil ->
-        agent = new_agent || Agents.Agent.new(id)
-        Agent.update(__MODULE__, &Map.put(&1, id, agent))
-        agent
-
-      agent ->
-        agent
-    end
-  end
-
-  def update_health(id, health) do
-    case get(id) do
-      nil ->
-        agent = %Agents.Agent{Agents.Agent.new(id) | status: :connected, health: health}
-        Agent.update(__MODULE__, &Map.put(&1, id, agent))
-        agent
-
-      agent ->
-        agent = %Agents.Agent{agent | status: :connected, health: health}
-        Agent.update(__MODULE__, &Map.put(&1, id, agent))
-        agent
-    end
-  end
-end
diff --git a/app/lib/prymn/application.ex b/app/lib/prymn/application.ex
index b9914e7..bda8fcd 100644
--- a/app/lib/prymn/application.ex
+++ b/app/lib/prymn/application.ex
@@ -14,9 +14,8 @@ defmodule Prymn.Application do
       {Phoenix.PubSub, name: Prymn.PubSub},
       {Finch, name: Prymn.Finch},
       {Oban, Application.fetch_env!(:prymn, Oban)},
-      Prymn.Agents.Store,
-      Prymn.Messaging.ConnectionSupervisor,
-      {Task.Supervisor, name: Prymn.TaskSupervisor},
+      Prymn.Messaging.Supervisor,
+      # {Task.Supervisor, name: Prymn.TaskSupervisor},
       PrymnWeb.Endpoint
     ]
 
diff --git a/app/lib/prymn/messaging.ex b/app/lib/prymn/messaging.ex
index a4c8dc1..00dd270 100644
--- a/app/lib/prymn/messaging.ex
+++ b/app/lib/prymn/messaging.ex
@@ -1,26 +1,2 @@
 defmodule Prymn.Messaging do
-  @moduledoc """
-  The Prymn messaging system
-  """
-
-  alias Prymn.Messaging
-
-  def open_terminal(agent_id) do
-    msg = Jason.encode!(%{"id" => "foo"})
-    :ok = Messaging.Connection.subscribe_to_agent(:nats1, agent_id, "terminal.foo.output")
-    :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "open_terminal", msg)
-  end
-
-  def close_terminal(agent_id) do
-    :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.close")
-  end
-
-  def resize_terminal(agent_id, rows, cols) do
-    msg = Jason.encode!(%{"rows" => rows, "cols" => cols})
-    :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.resize", msg)
-  end
-
-  def send_terminal_input(agent_id, input) do
-    :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.input", input)
-  end
 end
diff --git a/app/lib/prymn/messaging/connection.ex b/app/lib/prymn/messaging/connection.ex
index 92d3471..c69a274 100644
--- a/app/lib/prymn/messaging/connection.ex
+++ b/app/lib/prymn/messaging/connection.ex
@@ -1,89 +1,62 @@
 defmodule Prymn.Messaging.Connection do
   use GenServer
 
-  defstruct [:conn_pid, subscriptions: %{}]
-
   require Logger
-  alias Prymn.Agents
-  alias Prymn.Messaging
 
   @dialyzer {:nowarn_function, init: 1}
-  @subscription_supervisor Prymn.Messaging.SubscriptionSupervisor
-  @v1_prefix "agents.v1."
 
-  def start_link(name) do
-    GenServer.start_link(__MODULE__, nil, name: name)
+  def start_link(%{name: name} = init_arg) do
+    GenServer.start_link(__MODULE__, init_arg, name: name)
   end
 
-  def publish_to_agent(conn, agent_id, subject, payload \\ "") do
-    GenServer.call(conn, {:publish, agent_id, subject, payload})
+  def publish(server, subject, payload) do
+    GenServer.call(server, {:pub, subject, payload})
   end
 
-  def subscribe_to_agent(conn, agent_id, subject) do
-    GenServer.call(conn, {:subscribe, agent_id, subject})
+  def subscribe(server, subject, reply) do
+    GenServer.call(server, {:sub, subject, reply})
   end
 
   @impl true
   def init(_init_arg) do
-    connection_properties = %{
+    Process.flag(:trap_exit, true)
+
+    connect_opts = %{
       host: "localhost",
       username: "prymn_admin",
       password: "prymn_admin",
       auth_required: true
     }
 
-    Process.flag(:trap_exit, true)
-
-    case Gnat.start_link(connection_properties) do
+    case Gnat.start_link(connect_opts) do
       {:ok, pid} ->
-        Logger.info("Connected to NATS")
-        {:ok, %__MODULE__{conn_pid: pid}, {:continue, :subscribe_to_health}}
+        {:ok, pid}
 
       {:error, reason} ->
-        Logger.info("Connection to NATS failed (#{reason}). Attempting reconnect.")
-        {:ok, nil, {:continue, :attempt_reconnect}}
+        # Let the supervisor restart the Connection after a short delay
+        Logger.info("Initial NATS connection failed. Restarting...")
+        Process.sleep(1000)
+        {:stop, reason}
     end
   end
 
   @impl true
-  def handle_continue(:attempt_reconnect, state) do
-    Process.sleep(3000)
-    {:stop, {:shutdown, :connection_failure}, state}
+  def handle_call({:pub, subject, payload}, _from, conn_pid) do
+    :ok = Gnat.pub(conn_pid, subject, payload)
+
+    {:reply, :ok, conn_pid}
   end
 
-  def handle_continue(:subscribe_to_health, %__MODULE__{} = state) do
-    {:ok, _subscription} = Gnat.sub(state.conn_pid, self(), @v1_prefix <> "*.health")
-    {:noreply, state}
+  def handle_call({:sub, subject, reply}, _from, conn_pid) do
+    {:ok, cons_pid} = GenServer.start_link(Prymn.Messaging.FooConsumer, reply)
+    {:ok, sub} = Gnat.sub(conn_pid, cons_pid, subject)
+
+    {:reply, {:ok, sub}, conn_pid}
   end
 
   @impl true
-  def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do
-    :ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload)
-    {:reply, :ok, state}
-  end
-
-  def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do
-    args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid}
-
-    case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do
-      {:ok, _pid} -> {:reply, :ok, state}
-      {:error, {:already_started, _pid}} -> {:reply, :ok, state}
-      {:error, error} -> {:reply, {:error, error}, state}
-    end
-  end
-
-  @impl true
-  def handle_info({:EXIT, _pid, _reason}, state) do
-    Logger.info("Lost connection to NATS. Attempting reconnect.")
-    {:noreply, state, {:continue, :attempt_reconnect}}
-  end
-
-  def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do
-    [agent_id, "health"] = String.split(topic, ".")
-
-    health = Messaging.Messages.handle_message(subject, payload)
-    Agents.update_health(agent_id, health)
-
-    {:noreply, state}
+  def handle_info({:EXIT, pid, reason}, conn_pid) when conn_pid == pid do
+    Logger.info("NATS connection lost (#{reason})")
+    {:stop, {:shutdown, :connection_closed}, conn_pid}
   end
 end
diff --git a/app/lib/prymn/messaging/connection_manager.ex b/app/lib/prymn/messaging/connection_manager.ex
new file mode 100644
index 0000000..2e78c9c
--- /dev/null
+++ b/app/lib/prymn/messaging/connection_manager.ex
@@ -0,0 +1,38 @@
+defmodule Prymn.Messaging.ConnectionManager do
+  use GenServer
+
+  defstruct subscriptions: %{}
+
+  def publish(subject, payload) do
+    GenServer.call(__MODULE__, {:pub, subject, payload})
+  end
+
+  def subscribe(subject) do
+    GenServer.call(__MODULE__, {:pub, subject})
+  end
+
+  @impl true
+  def init(_init_arg) do
+    children = [
+      {Prymn.Messaging.Connection, %{name: :nats}}
+    ]
+
+    Process.monitor(:nats)
+
+    {:ok, %__MODULE__{}}
+  end
+
+  @impl true
+  def handle_call({:pub, subject, payload}, _from, state) do
+    Prymn.Messaging.Connection.publish(:nats, subject, payload)
+    {:reply, :ok, state}
+  end
+
+  def handle_call({:sub, subject}, {pid, _}, %__MODULE__{subscriptions: subsciptions} = state) do
+    Prymn.Messaging.Connection.subscribe(:nats, subject, pid)
+
+    # Map.put(subsciptions)
+
+    {:reply, :ok, state}
+  end
+end
diff --git a/app/lib/prymn/messaging/connection_supervisor.ex b/app/lib/prymn/messaging/connection_supervisor.ex
deleted file mode 100644
index 6cfa0e6..0000000
--- a/app/lib/prymn/messaging/connection_supervisor.ex
+++ /dev/null
@@ -1,18 +0,0 @@
-defmodule Prymn.Messaging.ConnectionSupervisor do
-  use Supervisor, restart: :permanent
-
-  def start_link([]) do
-    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
-  end
-
-  @impl true
-  def init(_init_arg) do
-    children = [
-      {DynamicSupervisor, name: Prymn.Messaging.SubscriptionSupervisor},
-      {Registry, name: Prymn.Messaging.SubscriptionRegistry, keys: :unique},
-      {Prymn.Messaging.Connection, :nats1}
-    ]
-
-    Supervisor.init(children, strategy: :one_for_one)
-  end
-end
diff --git a/app/lib/prymn/messaging/foo_consumer.ex b/app/lib/prymn/messaging/foo_consumer.ex
new file mode 100644
index 0000000..a8d5b25
--- /dev/null
+++ b/app/lib/prymn/messaging/foo_consumer.ex
@@ -0,0 +1,15 @@
+defmodule Prymn.Messaging.FooConsumer do
+  use GenServer
+
+  @impl true
+  def init(reply) do
+    {:ok, reply}
+  end
+
+  @impl true
+  def handle_info({:msg, %{topic: subject, body: body}}, reply) do
+    dbg("received on fooconsumer: #{subject}")
+    send(reply, body)
+    {:noreply, reply}
+  end
+end
diff --git a/app/lib/prymn/messaging/messages.ex b/app/lib/prymn/messaging/messages.ex
deleted file mode 100644
index 502b094..0000000
--- a/app/lib/prymn/messaging/messages.ex
+++ /dev/null
@@ -1,28 +0,0 @@
-defmodule Prymn.Messaging.Messages.TerminalOutput do
-  defstruct [:id, :output]
-end
-
-# defmodule Prymn.Messaging.Messages.Health do
-#   defstruct [:cpu_status, :memory_status, :disk_status]
-# end
-
-defmodule Prymn.Messaging.Messages do
-  require Logger
-  alias Prymn.Messaging.Messages.TerminalOutput
-
-  @v1_prefix "agents.v1."
-
-  def handle_message(subject, payload) do
-    {_agent_id, subjects} = extract_subject(subject)
-
-    case subjects do
-      ["health"] -> Jason.decode!(payload)
-      ["terminal", id, "output"] -> %TerminalOutput{id: id, output: payload}
-    end
-  end
-
-  defp extract_subject(@v1_prefix <> rest) do
-    [agent_id | subjects] = String.split(rest, ".")
-    {agent_id, subjects}
-  end
-end
diff --git a/app/lib/prymn/messaging/subscription.ex b/app/lib/prymn/messaging/subscription.ex
deleted file mode 100644
index 04c43fa..0000000
--- a/app/lib/prymn/messaging/subscription.ex
+++ /dev/null
@@ -1,36 +0,0 @@
-defmodule Prymn.Messaging.Subscription do
-  use GenServer
-
-  alias Prymn.Messaging
-
-  def start_link(%{agent_id: agent_id, subject: subject} = init_arg) do
-    name = {:via, Registry, {Prymn.Messaging.SubscriptionRegistry, agent_id <> subject}}
-    GenServer.start_link(__MODULE__, init_arg, name: name)
-  end
-
-  @impl true
-  def init(%{agent_id: agent_id, subject: subject, gnat: gnat, reply: reply}) do
-    Process.monitor(reply)
-    subject = "agents.v1.#{agent_id}.#{subject}"
-    {:ok, sub} = Gnat.sub(gnat, self(), subject)
-
-    {:ok, %{reply: reply, sub: sub, gnat: gnat}}
-  end
-
-  @impl true
-  def handle_info({:msg, %{body: body, topic: subject}}, %{reply: reply} = state) do
-    msg = Messaging.Messages.handle_message(subject, body)
-    send(reply, msg)
-
-    {:noreply, state}
-  end
-
-  def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do
-    {:stop, {:shutdown, :gone}, state}
-  end
-
-  @impl true
-  def terminate({:shutdown, _}, state) do
-    Gnat.unsub(state.gnat, state.sub)
-  end
-end
diff --git a/app/lib/prymn/messaging/supervisor.ex b/app/lib/prymn/messaging/supervisor.ex
new file mode 100644
index 0000000..46acebe
--- /dev/null
+++ b/app/lib/prymn/messaging/supervisor.ex
@@ -0,0 +1,17 @@
+defmodule Prymn.Messaging.Supervisor do
+  use Supervisor
+
+  def start_link(opts) do
+    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
+  end
+
+  @impl true
+  def init(_init_arg) do
+    children =
+      [
+        # Prymn.Messaging.HealthConsumer
+      ]
+
+    Supervisor.init(children, strategy: :one_for_one)
+  end
+end
diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex
index 23c7cab..9b7e4dd 100644
--- a/app/lib/prymn_web/live/server_live/index.ex
+++ b/app/lib/prymn_web/live/server_live/index.ex
@@ -65,10 +65,10 @@ defmodule PrymnWeb.ServerLive.Index do
      |> update(:servers, fn servers -> [server | servers] end)}
   end
 
-  def handle_info(%Agents.Agent{} = agent, socket) do
-    id = String.to_integer(agent.id)
-    {:noreply, update(socket, :agents, &Map.put(&1, id, agent))}
-  end
+  # def handle_info(%Agents.Agent{} = agent, socket) do
+  #   id = String.to_integer(agent.id)
+  #   {:noreply, update(socket, :agents, &Map.put(&1, id, agent))}
+  # end
 
   def handle_info(msg, state) do
     Logger.debug("received unexpected message #{inspect(msg)}")
diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex
index 8b2ea4e..e62beab 100644
--- a/app/lib/prymn_web/live/server_live/show.ex
+++ b/app/lib/prymn_web/live/server_live/show.ex
@@ -116,15 +116,15 @@ defmodule PrymnWeb.ServerLive.Show do
      |> assign(:registration_command, Servers.create_setup_command(server))}
   end
 
-  @impl true
-  def handle_info(%Agents.Agent{} = agent, socket) do
-    {:noreply, assign(socket, :agent, agent)}
-  end
+  # @impl true
+  # def handle_info(%Agents.Agent{} = agent, socket) do
+  #   {:noreply, assign(socket, :agent, agent)}
+  # end
 
-  def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do
-    send_update(PrymnWeb.Terminal, id: "terminal", data: output)
-    {:noreply, socket}
-  end
+  # def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do
+  #   send_update(PrymnWeb.Terminal, id: "terminal", data: output)
+  #   {:noreply, socket}
+  # end
 
   # @impl true
   #  def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do
diff --git a/flake.nix b/flake.nix
index cd17ad5..ff35c7d 100644
--- a/flake.nix
+++ b/flake.nix
@@ -18,8 +18,6 @@
     crane = {
       url = "github:ipetkov/crane";
       inputs.nixpkgs.follows = "nixpkgs";
-      inputs.flake-utils.follows = "flake-utils";
-      inputs.rust-overlay.follows = "rust-overlay";
     };
   };