From 59945bd2deff0a37409fa10cbb3d1937e111825f Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Fri, 2 Feb 2024 20:53:52 +0200 Subject: [PATCH] terminal --- agent/src/messaging/v1/terminal.rs | 2 +- app/lib/prymn/agents.ex | 17 +++++ app/lib/prymn/messaging.ex | 21 +++++- app/lib/prymn/messaging/connection.ex | 35 ++++++++-- .../prymn/messaging/connection_supervisor.ex | 2 + app/lib/prymn/messaging/messages.ex | 33 ++++++--- app/lib/prymn_web/components/terminal.ex | 46 +++---------- app/lib/prymn_web/live/server_live/show.ex | 68 +++++++++---------- 8 files changed, 135 insertions(+), 89 deletions(-) diff --git a/agent/src/messaging/v1/terminal.rs b/agent/src/messaging/v1/terminal.rs index 2f35487..c09138e 100644 --- a/agent/src/messaging/v1/terminal.rs +++ b/agent/src/messaging/v1/terminal.rs @@ -81,8 +81,8 @@ impl OpenTerminalMessage { } } _close = close_stream.next() => { - tracing::info!("closing terminal"); child.kill().await.expect("the child to exit normally"); + tracing::info!("closing terminal"); break; } io_result = child.wait() => { diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index 7aa9d58..ae51ded 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -1,4 +1,5 @@ defmodule Prymn.Agents do + alias Prymn.Messaging alias Prymn.Agents def from_server(%Prymn.Servers.Server{} = server) do @@ -18,4 +19,20 @@ defmodule Prymn.Agents do agent = Agents.Store.update_health(agent_id, health) Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent) end + + def open_terminal(%Agents.Agent{id: id}) do + Messaging.open_terminal(id) + end + + def close_terminal(%Agents.Agent{id: id}) do + Messaging.close_terminal(id) + end + + def resize_terminal(%Agents.Agent{id: id}, rows, cols) do + Messaging.resize_terminal(id, rows, cols) + end + + def send_terminal_input(%Agents.Agent{id: id}, input) do + Messaging.send_terminal_input(id, input) + end end diff --git a/app/lib/prymn/messaging.ex b/app/lib/prymn/messaging.ex index 6fc6e1f..a4c8dc1 100644 --- a/app/lib/prymn/messaging.ex +++ b/app/lib/prymn/messaging.ex @@ -3,9 +3,24 @@ defmodule Prymn.Messaging do The Prymn messaging system """ - def agent_health_topic(agent_id), do: "health.#{agent_id}" + alias Prymn.Messaging - def subscribe_to_agent_health(agent_id) do - Phoenix.PubSub.subscribe(Prymn.PubSub, agent_health_topic(agent_id)) + def open_terminal(agent_id) do + msg = Jason.encode!(%{"id" => "foo"}) + :ok = Messaging.Connection.subscribe_to_agent(:nats1, agent_id, "terminal.foo.output") + :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "open_terminal", msg) + end + + def close_terminal(agent_id) do + :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.close") + end + + def resize_terminal(agent_id, rows, cols) do + msg = Jason.encode!(%{"rows" => rows, "cols" => cols}) + :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.resize", msg) + end + + def send_terminal_input(agent_id, input) do + :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.input", input) end end diff --git a/app/lib/prymn/messaging/connection.ex b/app/lib/prymn/messaging/connection.ex index f22e886..92d3471 100644 --- a/app/lib/prymn/messaging/connection.ex +++ b/app/lib/prymn/messaging/connection.ex @@ -1,18 +1,28 @@ defmodule Prymn.Messaging.Connection do use GenServer - defstruct [:conn_pid] + defstruct [:conn_pid, subscriptions: %{}] + require Logger alias Prymn.Agents alias Prymn.Messaging @dialyzer {:nowarn_function, init: 1} + @subscription_supervisor Prymn.Messaging.SubscriptionSupervisor @v1_prefix "agents.v1." def start_link(name) do GenServer.start_link(__MODULE__, nil, name: name) end + def publish_to_agent(conn, agent_id, subject, payload \\ "") do + GenServer.call(conn, {:publish, agent_id, subject, payload}) + end + + def subscribe_to_agent(conn, agent_id, subject) do + GenServer.call(conn, {:subscribe, agent_id, subject}) + end + @impl true def init(_init_arg) do connection_properties = %{ @@ -46,15 +56,32 @@ defmodule Prymn.Messaging.Connection do {:noreply, state} end + @impl true + def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do + :ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload) + {:reply, :ok, state} + end + + def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do + args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid} + + case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do + {:ok, _pid} -> {:reply, :ok, state} + {:error, {:already_started, _pid}} -> {:reply, :ok, state} + {:error, error} -> {:reply, {:error, error}, state} + end + end + @impl true def handle_info({:EXIT, _pid, _reason}, state) do Logger.info("Lost connection to NATS. Attempting reconnect.") {:noreply, state, {:continue, :attempt_reconnect}} end - def handle_info({:msg, %{body: body, topic: @v1_prefix <> topic}}, state) do - [agent_id, topic] = String.split(topic, ".") - health = Messaging.Messages.handle_message(topic, body) + def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do + [agent_id, "health"] = String.split(topic, ".") + + health = Messaging.Messages.handle_message(subject, payload) Agents.update_health(agent_id, health) {:noreply, state} diff --git a/app/lib/prymn/messaging/connection_supervisor.ex b/app/lib/prymn/messaging/connection_supervisor.ex index b2daccb..6cfa0e6 100644 --- a/app/lib/prymn/messaging/connection_supervisor.ex +++ b/app/lib/prymn/messaging/connection_supervisor.ex @@ -8,6 +8,8 @@ defmodule Prymn.Messaging.ConnectionSupervisor do @impl true def init(_init_arg) do children = [ + {DynamicSupervisor, name: Prymn.Messaging.SubscriptionSupervisor}, + {Registry, name: Prymn.Messaging.SubscriptionRegistry, keys: :unique}, {Prymn.Messaging.Connection, :nats1} ] diff --git a/app/lib/prymn/messaging/messages.ex b/app/lib/prymn/messaging/messages.ex index 31ed980..502b094 100644 --- a/app/lib/prymn/messaging/messages.ex +++ b/app/lib/prymn/messaging/messages.ex @@ -1,15 +1,28 @@ -defmodule Prymn.Messaging.Messages do - alias Prymn.Messaging.Messages.Health - require Logger +defmodule Prymn.Messaging.Messages.TerminalOutput do + defstruct [:id, :output] +end - def handle_message(topic, body) do - case topic do - "health" -> Jason.decode!(body) - _ -> Logger.warning("Received unknown topic inside the Connection server: #{topic}") +# defmodule Prymn.Messaging.Messages.Health do +# defstruct [:cpu_status, :memory_status, :disk_status] +# end + +defmodule Prymn.Messaging.Messages do + require Logger + alias Prymn.Messaging.Messages.TerminalOutput + + @v1_prefix "agents.v1." + + def handle_message(subject, payload) do + {_agent_id, subjects} = extract_subject(subject) + + case subjects do + ["health"] -> Jason.decode!(payload) + ["terminal", id, "output"] -> %TerminalOutput{id: id, output: payload} end end -end -defmodule Prymn.Messaging.Messages.Health do - defstruct [:cpu_status, :memory_status, :disk_status] + defp extract_subject(@v1_prefix <> rest) do + [agent_id | subjects] = String.split(rest, ".") + {agent_id, subjects} + end end diff --git a/app/lib/prymn_web/components/terminal.ex b/app/lib/prymn_web/components/terminal.ex index 2b48f99..f5c14bc 100644 --- a/app/lib/prymn_web/components/terminal.ex +++ b/app/lib/prymn_web/components/terminal.ex @@ -1,6 +1,8 @@ defmodule PrymnWeb.Terminal do use PrymnWeb, :live_component + alias Prymn.Agents + @impl true def mount(socket) do {:ok, assign(socket, :open, false)} @@ -44,51 +46,25 @@ defmodule PrymnWeb.Terminal do end @impl true - def handle_event("open_terminal", _params, socket) do - # agent = Prymn.Agents.from_server(socket.assigns.server) - # pid = self() + def handle_event("open_terminal", _params, %{assigns: assigns} = socket) do + # TODO: make up a terminal id + Agents.open_terminal(assigns.agent) - # Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> - # # FIXME: Have to wrap this in a Task because gun sends unsolicited messages - # # to calling process - # stream = Prymn.Agents.terminal(agent) - - # {:ok, mux_pid} = - # Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end) - - # send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true) - - # case GRPC.Stub.recv(stream, timeout: :infinity) do - # {:ok, stream} -> - # Enum.map(stream, fn - # {:ok, %{output: data}} -> - # send(mux_pid, :data) - # send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data) - - # {:error, _err} -> - # send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false) - # end) - - # {:error, error} -> - # dbg(error) - # end - # end) - - {:noreply, socket} + {:noreply, assign(socket, :open, true)} end def handle_event("close_terminal", _params, socket) do - send(socket.assigns.mux_pid, :close) - {:noreply, assign(socket, :open, false)} + Agents.close_terminal(socket.assigns.agent) + {:noreply, assign(socket, open: false, data: "")} end - def handle_event("data_event", data, socket) when is_binary(data) do - send(socket.assigns.mux_pid, {:data_event, data}) + def handle_event("data_event", data, %{assigns: assigns} = socket) when is_binary(data) do + Agents.send_terminal_input(assigns.agent, data) {:noreply, socket} end def handle_event("resize_event", %{"cols" => cols, "rows" => rows}, socket) do - send(socket.assigns.mux_pid, {:resize_event, rows, cols}) + Agents.resize_terminal(socket.assigns.agent, rows, cols) {:noreply, socket} end diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex index 6280da1..8b2ea4e 100644 --- a/app/lib/prymn_web/live/server_live/show.ex +++ b/app/lib/prymn_web/live/server_live/show.ex @@ -2,7 +2,7 @@ defmodule PrymnWeb.ServerLive.Show do use PrymnWeb, :live_view require Logger - alias Prymn.{Agents, Servers} + alias Prymn.{Agents, Servers, Messaging} @impl true def mount(_params, _session, socket) do @@ -20,7 +20,7 @@ defmodule PrymnWeb.ServerLive.Show do <.dropdown title="Select a different server"> <:button variant="tertiary">Server <%= @server.name %> <:item - :for={server <- Enum.filter(@servers, fn s -> s.id != @server.id end)} + :for={server <- Enum.filter(@servers, &(&1.id != @server.id))} patch={~p"/servers/#{server}"} > <%= server.name %> @@ -29,7 +29,7 @@ defmodule PrymnWeb.ServerLive.Show do <.icon class="h-4 w-4" name="hero-pencil-solid" /> - <.indicator message={@health.message} /> + <.indicator message="test" />