diff --git a/agent/src/messaging/v1/mod.rs b/agent/src/messaging/v1/mod.rs index a80f3df..b1be56e 100644 --- a/agent/src/messaging/v1/mod.rs +++ b/agent/src/messaging/v1/mod.rs @@ -16,12 +16,14 @@ pub struct Client { impl Client { pub async fn connect(id: &str) -> Result { + let prefix = format!("agents.v1.{}", id); + let nats = async_nats::ConnectOptions::with_user_and_password( String::from("demo_agent"), String::from("demo_agent_password"), ) .name(format!("Prymn Agent {id}")) - .custom_inbox_prefix(format!("_INBOX_{id}")) + .custom_inbox_prefix(format!("{prefix}._INBOX_")) .connect("localhost") .await .map_err(|err| { @@ -31,7 +33,7 @@ impl Client { Ok(Self { id: Arc::new(String::from(id)), - prefix: Arc::new(format!("agents.v1.{}", id)), + prefix: Arc::new(prefix), nats, }) } diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index ae51ded..1f8e075 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -1,38 +1,2 @@ defmodule Prymn.Agents do - alias Prymn.Messaging - alias Prymn.Agents - - def from_server(%Prymn.Servers.Server{} = server) do - agent_id = to_string(server.id) - Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{agent_id}") - Agents.Store.get_or_default(agent_id) - end - - def from_servers(servers) when is_list(servers) do - Enum.reduce(servers, [], fn server, acc -> - [{server.id, from_server(server)} | acc] - end) - |> Map.new() - end - - def update_health(agent_id, health) when is_binary(agent_id) do - agent = Agents.Store.update_health(agent_id, health) - Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent) - end - - def open_terminal(%Agents.Agent{id: id}) do - Messaging.open_terminal(id) - end - - def close_terminal(%Agents.Agent{id: id}) do - Messaging.close_terminal(id) - end - - def resize_terminal(%Agents.Agent{id: id}, rows, cols) do - Messaging.resize_terminal(id, rows, cols) - end - - def send_terminal_input(%Agents.Agent{id: id}, input) do - Messaging.send_terminal_input(id, input) - end end diff --git a/app/lib/prymn/agents/agent.ex b/app/lib/prymn/agents/agent.ex deleted file mode 100644 index 15619db..0000000 --- a/app/lib/prymn/agents/agent.ex +++ /dev/null @@ -1,7 +0,0 @@ -defmodule Prymn.Agents.Agent do - defstruct [:id, :health, status: :disconnected] - - def new(id) do - %__MODULE__{id: id} - end -end diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex deleted file mode 100644 index 654955b..0000000 --- a/app/lib/prymn/agents/health.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule Prymn.Agents.Health do - @moduledoc """ - The Health struct keeps simple health information of whether or not the - target host machine is up to date, has any tasks running, its resources are - getting depleted, or if it's unable be reached. - """ - - # defstruct status: :disconnected -end diff --git a/app/lib/prymn/agents/store.ex b/app/lib/prymn/agents/store.ex deleted file mode 100644 index 6e0f9e8..0000000 --- a/app/lib/prymn/agents/store.ex +++ /dev/null @@ -1,44 +0,0 @@ -defmodule Prymn.Agents.Store do - @moduledoc false - use Agent - - alias Prymn.Agents - - # Stores and serves locally saved "Prymn Agents" to the application - # Not to be confused with the "Prymn Server" construct - - @doc false - def start_link([]) do - Agent.start_link(fn -> %{} end, name: __MODULE__) - end - - def get(id) do - Agent.get(__MODULE__, fn agents -> agents[id] end) - end - - def get_or_default(id, new_agent \\ nil) do - case Agent.get(__MODULE__, fn agents -> agents[id] end) do - nil -> - agent = new_agent || Agents.Agent.new(id) - Agent.update(__MODULE__, &Map.put(&1, id, agent)) - agent - - agent -> - agent - end - end - - def update_health(id, health) do - case get(id) do - nil -> - agent = %Agents.Agent{Agents.Agent.new(id) | status: :connected, health: health} - Agent.update(__MODULE__, &Map.put(&1, id, agent)) - agent - - agent -> - agent = %Agents.Agent{agent | status: :connected, health: health} - Agent.update(__MODULE__, &Map.put(&1, id, agent)) - agent - end - end -end diff --git a/app/lib/prymn/application.ex b/app/lib/prymn/application.ex index b9914e7..bda8fcd 100644 --- a/app/lib/prymn/application.ex +++ b/app/lib/prymn/application.ex @@ -14,9 +14,8 @@ defmodule Prymn.Application do {Phoenix.PubSub, name: Prymn.PubSub}, {Finch, name: Prymn.Finch}, {Oban, Application.fetch_env!(:prymn, Oban)}, - Prymn.Agents.Store, - Prymn.Messaging.ConnectionSupervisor, - {Task.Supervisor, name: Prymn.TaskSupervisor}, + Prymn.Messaging.Supervisor, + # {Task.Supervisor, name: Prymn.TaskSupervisor}, PrymnWeb.Endpoint ] diff --git a/app/lib/prymn/messaging.ex b/app/lib/prymn/messaging.ex index a4c8dc1..00dd270 100644 --- a/app/lib/prymn/messaging.ex +++ b/app/lib/prymn/messaging.ex @@ -1,26 +1,2 @@ defmodule Prymn.Messaging do - @moduledoc """ - The Prymn messaging system - """ - - alias Prymn.Messaging - - def open_terminal(agent_id) do - msg = Jason.encode!(%{"id" => "foo"}) - :ok = Messaging.Connection.subscribe_to_agent(:nats1, agent_id, "terminal.foo.output") - :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "open_terminal", msg) - end - - def close_terminal(agent_id) do - :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.close") - end - - def resize_terminal(agent_id, rows, cols) do - msg = Jason.encode!(%{"rows" => rows, "cols" => cols}) - :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.resize", msg) - end - - def send_terminal_input(agent_id, input) do - :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.input", input) - end end diff --git a/app/lib/prymn/messaging/connection.ex b/app/lib/prymn/messaging/connection.ex index 92d3471..c69a274 100644 --- a/app/lib/prymn/messaging/connection.ex +++ b/app/lib/prymn/messaging/connection.ex @@ -1,89 +1,62 @@ defmodule Prymn.Messaging.Connection do use GenServer - defstruct [:conn_pid, subscriptions: %{}] - require Logger - alias Prymn.Agents - alias Prymn.Messaging @dialyzer {:nowarn_function, init: 1} - @subscription_supervisor Prymn.Messaging.SubscriptionSupervisor - @v1_prefix "agents.v1." - def start_link(name) do - GenServer.start_link(__MODULE__, nil, name: name) + def start_link(%{name: name} = init_arg) do + GenServer.start_link(__MODULE__, init_arg, name: name) end - def publish_to_agent(conn, agent_id, subject, payload \\ "") do - GenServer.call(conn, {:publish, agent_id, subject, payload}) + def publish(server, subject, payload) do + GenServer.call(server, {:pub, subject, payload}) end - def subscribe_to_agent(conn, agent_id, subject) do - GenServer.call(conn, {:subscribe, agent_id, subject}) + def subscribe(server, subject, reply) do + GenServer.call(server, {:sub, subject, reply}) end @impl true def init(_init_arg) do - connection_properties = %{ + Process.flag(:trap_exit, true) + + connect_opts = %{ host: "localhost", username: "prymn_admin", password: "prymn_admin", auth_required: true } - Process.flag(:trap_exit, true) - - case Gnat.start_link(connection_properties) do + case Gnat.start_link(connect_opts) do {:ok, pid} -> - Logger.info("Connected to NATS") - {:ok, %__MODULE__{conn_pid: pid}, {:continue, :subscribe_to_health}} + {:ok, pid} {:error, reason} -> - Logger.info("Connection to NATS failed (#{reason}). Attempting reconnect.") - {:ok, nil, {:continue, :attempt_reconnect}} + # Let the supervisor restart the Connection after a short delay + Logger.info("Initial NATS connection failed. Restarting...") + Process.sleep(1000) + {:stop, reason} end end @impl true - def handle_continue(:attempt_reconnect, state) do - Process.sleep(3000) - {:stop, {:shutdown, :connection_failure}, state} + def handle_call({:pub, subject, payload}, _from, conn_pid) do + :ok = Gnat.pub(conn_pid, subject, payload) + + {:reply, :ok, conn_pid} end - def handle_continue(:subscribe_to_health, %__MODULE__{} = state) do - {:ok, _subscription} = Gnat.sub(state.conn_pid, self(), @v1_prefix <> "*.health") - {:noreply, state} + def handle_call({:sub, subject, reply}, _from, conn_pid) do + {:ok, cons_pid} = GenServer.start_link(Prymn.Messaging.FooConsumer, reply) + {:ok, sub} = Gnat.sub(conn_pid, cons_pid, subject) + + {:reply, {:ok, sub}, conn_pid} end @impl true - def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do - :ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload) - {:reply, :ok, state} - end - - def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do - args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid} - - case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do - {:ok, _pid} -> {:reply, :ok, state} - {:error, {:already_started, _pid}} -> {:reply, :ok, state} - {:error, error} -> {:reply, {:error, error}, state} - end - end - - @impl true - def handle_info({:EXIT, _pid, _reason}, state) do - Logger.info("Lost connection to NATS. Attempting reconnect.") - {:noreply, state, {:continue, :attempt_reconnect}} - end - - def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do - [agent_id, "health"] = String.split(topic, ".") - - health = Messaging.Messages.handle_message(subject, payload) - Agents.update_health(agent_id, health) - - {:noreply, state} + def handle_info({:EXIT, pid, reason}, conn_pid) when conn_pid == pid do + Logger.info("NATS connection lost (#{reason})") + {:stop, {:shutdown, :connection_closed}, conn_pid} end end diff --git a/app/lib/prymn/messaging/connection_manager.ex b/app/lib/prymn/messaging/connection_manager.ex new file mode 100644 index 0000000..2e78c9c --- /dev/null +++ b/app/lib/prymn/messaging/connection_manager.ex @@ -0,0 +1,38 @@ +defmodule Prymn.Messaging.ConnectionManager do + use GenServer + + defstruct subscriptions: %{} + + def publish(subject, payload) do + GenServer.call(__MODULE__, {:pub, subject, payload}) + end + + def subscribe(subject) do + GenServer.call(__MODULE__, {:pub, subject}) + end + + @impl true + def init(_init_arg) do + children = [ + {Prymn.Messaging.Connection, %{name: :nats}} + ] + + Process.monitor(:nats) + + {:ok, %__MODULE__{}} + end + + @impl true + def handle_call({:pub, subject, payload}, _from, state) do + Prymn.Messaging.Connection.publish(:nats, subject, payload) + {:reply, :ok, state} + end + + def handle_call({:sub, subject}, {pid, _}, %__MODULE__{subscriptions: subsciptions} = state) do + Prymn.Messaging.Connection.subscribe(:nats, subject, pid) + + # Map.put(subsciptions) + + {:reply, :ok, state} + end +end diff --git a/app/lib/prymn/messaging/connection_supervisor.ex b/app/lib/prymn/messaging/connection_supervisor.ex deleted file mode 100644 index 6cfa0e6..0000000 --- a/app/lib/prymn/messaging/connection_supervisor.ex +++ /dev/null @@ -1,18 +0,0 @@ -defmodule Prymn.Messaging.ConnectionSupervisor do - use Supervisor, restart: :permanent - - def start_link([]) do - Supervisor.start_link(__MODULE__, [], name: __MODULE__) - end - - @impl true - def init(_init_arg) do - children = [ - {DynamicSupervisor, name: Prymn.Messaging.SubscriptionSupervisor}, - {Registry, name: Prymn.Messaging.SubscriptionRegistry, keys: :unique}, - {Prymn.Messaging.Connection, :nats1} - ] - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/app/lib/prymn/messaging/foo_consumer.ex b/app/lib/prymn/messaging/foo_consumer.ex new file mode 100644 index 0000000..a8d5b25 --- /dev/null +++ b/app/lib/prymn/messaging/foo_consumer.ex @@ -0,0 +1,15 @@ +defmodule Prymn.Messaging.FooConsumer do + use GenServer + + @impl true + def init(reply) do + {:ok, reply} + end + + @impl true + def handle_info({:msg, %{topic: subject, body: body}}, reply) do + dbg("received on fooconsumer: #{subject}") + send(reply, body) + {:noreply, reply} + end +end diff --git a/app/lib/prymn/messaging/messages.ex b/app/lib/prymn/messaging/messages.ex deleted file mode 100644 index 502b094..0000000 --- a/app/lib/prymn/messaging/messages.ex +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Prymn.Messaging.Messages.TerminalOutput do - defstruct [:id, :output] -end - -# defmodule Prymn.Messaging.Messages.Health do -# defstruct [:cpu_status, :memory_status, :disk_status] -# end - -defmodule Prymn.Messaging.Messages do - require Logger - alias Prymn.Messaging.Messages.TerminalOutput - - @v1_prefix "agents.v1." - - def handle_message(subject, payload) do - {_agent_id, subjects} = extract_subject(subject) - - case subjects do - ["health"] -> Jason.decode!(payload) - ["terminal", id, "output"] -> %TerminalOutput{id: id, output: payload} - end - end - - defp extract_subject(@v1_prefix <> rest) do - [agent_id | subjects] = String.split(rest, ".") - {agent_id, subjects} - end -end diff --git a/app/lib/prymn/messaging/subscription.ex b/app/lib/prymn/messaging/subscription.ex deleted file mode 100644 index 04c43fa..0000000 --- a/app/lib/prymn/messaging/subscription.ex +++ /dev/null @@ -1,36 +0,0 @@ -defmodule Prymn.Messaging.Subscription do - use GenServer - - alias Prymn.Messaging - - def start_link(%{agent_id: agent_id, subject: subject} = init_arg) do - name = {:via, Registry, {Prymn.Messaging.SubscriptionRegistry, agent_id <> subject}} - GenServer.start_link(__MODULE__, init_arg, name: name) - end - - @impl true - def init(%{agent_id: agent_id, subject: subject, gnat: gnat, reply: reply}) do - Process.monitor(reply) - subject = "agents.v1.#{agent_id}.#{subject}" - {:ok, sub} = Gnat.sub(gnat, self(), subject) - - {:ok, %{reply: reply, sub: sub, gnat: gnat}} - end - - @impl true - def handle_info({:msg, %{body: body, topic: subject}}, %{reply: reply} = state) do - msg = Messaging.Messages.handle_message(subject, body) - send(reply, msg) - - {:noreply, state} - end - - def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do - {:stop, {:shutdown, :gone}, state} - end - - @impl true - def terminate({:shutdown, _}, state) do - Gnat.unsub(state.gnat, state.sub) - end -end diff --git a/app/lib/prymn/messaging/supervisor.ex b/app/lib/prymn/messaging/supervisor.ex new file mode 100644 index 0000000..46acebe --- /dev/null +++ b/app/lib/prymn/messaging/supervisor.ex @@ -0,0 +1,17 @@ +defmodule Prymn.Messaging.Supervisor do + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_init_arg) do + children = + [ + # Prymn.Messaging.HealthConsumer + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 23c7cab..9b7e4dd 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -65,10 +65,10 @@ defmodule PrymnWeb.ServerLive.Index do |> update(:servers, fn servers -> [server | servers] end)} end - def handle_info(%Agents.Agent{} = agent, socket) do - id = String.to_integer(agent.id) - {:noreply, update(socket, :agents, &Map.put(&1, id, agent))} - end + # def handle_info(%Agents.Agent{} = agent, socket) do + # id = String.to_integer(agent.id) + # {:noreply, update(socket, :agents, &Map.put(&1, id, agent))} + # end def handle_info(msg, state) do Logger.debug("received unexpected message #{inspect(msg)}") diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex index 8b2ea4e..e62beab 100644 --- a/app/lib/prymn_web/live/server_live/show.ex +++ b/app/lib/prymn_web/live/server_live/show.ex @@ -116,15 +116,15 @@ defmodule PrymnWeb.ServerLive.Show do |> assign(:registration_command, Servers.create_setup_command(server))} end - @impl true - def handle_info(%Agents.Agent{} = agent, socket) do - {:noreply, assign(socket, :agent, agent)} - end + # @impl true + # def handle_info(%Agents.Agent{} = agent, socket) do + # {:noreply, assign(socket, :agent, agent)} + # end - def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do - send_update(PrymnWeb.Terminal, id: "terminal", data: output) - {:noreply, socket} - end + # def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do + # send_update(PrymnWeb.Terminal, id: "terminal", data: output) + # {:noreply, socket} + # end # @impl true # def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do diff --git a/flake.nix b/flake.nix index cd17ad5..ff35c7d 100644 --- a/flake.nix +++ b/flake.nix @@ -18,8 +18,6 @@ crane = { url = "github:ipetkov/crane"; inputs.nixpkgs.follows = "nixpkgs"; - inputs.flake-utils.follows = "flake-utils"; - inputs.rust-overlay.follows = "rust-overlay"; }; };