diff --git a/agent/src/messaging/v1/terminal.rs b/agent/src/messaging/v1/terminal.rs
index 2f35487..c09138e 100644
--- a/agent/src/messaging/v1/terminal.rs
+++ b/agent/src/messaging/v1/terminal.rs
@@ -81,8 +81,8 @@ impl OpenTerminalMessage {
}
}
_close = close_stream.next() => {
- tracing::info!("closing terminal");
child.kill().await.expect("the child to exit normally");
+ tracing::info!("closing terminal");
break;
}
io_result = child.wait() => {
diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex
index 7aa9d58..ae51ded 100644
--- a/app/lib/prymn/agents.ex
+++ b/app/lib/prymn/agents.ex
@@ -1,4 +1,5 @@
defmodule Prymn.Agents do
+ alias Prymn.Messaging
alias Prymn.Agents
def from_server(%Prymn.Servers.Server{} = server) do
@@ -18,4 +19,20 @@ defmodule Prymn.Agents do
agent = Agents.Store.update_health(agent_id, health)
Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent)
end
+
+ def open_terminal(%Agents.Agent{id: id}) do
+ Messaging.open_terminal(id)
+ end
+
+ def close_terminal(%Agents.Agent{id: id}) do
+ Messaging.close_terminal(id)
+ end
+
+ def resize_terminal(%Agents.Agent{id: id}, rows, cols) do
+ Messaging.resize_terminal(id, rows, cols)
+ end
+
+ def send_terminal_input(%Agents.Agent{id: id}, input) do
+ Messaging.send_terminal_input(id, input)
+ end
end
diff --git a/app/lib/prymn/messaging.ex b/app/lib/prymn/messaging.ex
index 6fc6e1f..a4c8dc1 100644
--- a/app/lib/prymn/messaging.ex
+++ b/app/lib/prymn/messaging.ex
@@ -3,9 +3,24 @@ defmodule Prymn.Messaging do
The Prymn messaging system
"""
- def agent_health_topic(agent_id), do: "health.#{agent_id}"
+ alias Prymn.Messaging
- def subscribe_to_agent_health(agent_id) do
- Phoenix.PubSub.subscribe(Prymn.PubSub, agent_health_topic(agent_id))
+ def open_terminal(agent_id) do
+ msg = Jason.encode!(%{"id" => "foo"})
+ :ok = Messaging.Connection.subscribe_to_agent(:nats1, agent_id, "terminal.foo.output")
+ :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "open_terminal", msg)
+ end
+
+ def close_terminal(agent_id) do
+ :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.close")
+ end
+
+ def resize_terminal(agent_id, rows, cols) do
+ msg = Jason.encode!(%{"rows" => rows, "cols" => cols})
+ :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.resize", msg)
+ end
+
+ def send_terminal_input(agent_id, input) do
+ :ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.input", input)
end
end
diff --git a/app/lib/prymn/messaging/connection.ex b/app/lib/prymn/messaging/connection.ex
index f22e886..92d3471 100644
--- a/app/lib/prymn/messaging/connection.ex
+++ b/app/lib/prymn/messaging/connection.ex
@@ -1,18 +1,28 @@
defmodule Prymn.Messaging.Connection do
use GenServer
- defstruct [:conn_pid]
+ defstruct [:conn_pid, subscriptions: %{}]
+
require Logger
alias Prymn.Agents
alias Prymn.Messaging
@dialyzer {:nowarn_function, init: 1}
+ @subscription_supervisor Prymn.Messaging.SubscriptionSupervisor
@v1_prefix "agents.v1."
def start_link(name) do
GenServer.start_link(__MODULE__, nil, name: name)
end
+ def publish_to_agent(conn, agent_id, subject, payload \\ "") do
+ GenServer.call(conn, {:publish, agent_id, subject, payload})
+ end
+
+ def subscribe_to_agent(conn, agent_id, subject) do
+ GenServer.call(conn, {:subscribe, agent_id, subject})
+ end
+
@impl true
def init(_init_arg) do
connection_properties = %{
@@ -46,15 +56,32 @@ defmodule Prymn.Messaging.Connection do
{:noreply, state}
end
+ @impl true
+ def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do
+ :ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload)
+ {:reply, :ok, state}
+ end
+
+ def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do
+ args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid}
+
+ case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do
+ {:ok, _pid} -> {:reply, :ok, state}
+ {:error, {:already_started, _pid}} -> {:reply, :ok, state}
+ {:error, error} -> {:reply, {:error, error}, state}
+ end
+ end
+
@impl true
def handle_info({:EXIT, _pid, _reason}, state) do
Logger.info("Lost connection to NATS. Attempting reconnect.")
{:noreply, state, {:continue, :attempt_reconnect}}
end
- def handle_info({:msg, %{body: body, topic: @v1_prefix <> topic}}, state) do
- [agent_id, topic] = String.split(topic, ".")
- health = Messaging.Messages.handle_message(topic, body)
+ def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do
+ [agent_id, "health"] = String.split(topic, ".")
+
+ health = Messaging.Messages.handle_message(subject, payload)
Agents.update_health(agent_id, health)
{:noreply, state}
diff --git a/app/lib/prymn/messaging/connection_supervisor.ex b/app/lib/prymn/messaging/connection_supervisor.ex
index b2daccb..6cfa0e6 100644
--- a/app/lib/prymn/messaging/connection_supervisor.ex
+++ b/app/lib/prymn/messaging/connection_supervisor.ex
@@ -8,6 +8,8 @@ defmodule Prymn.Messaging.ConnectionSupervisor do
@impl true
def init(_init_arg) do
children = [
+ {DynamicSupervisor, name: Prymn.Messaging.SubscriptionSupervisor},
+ {Registry, name: Prymn.Messaging.SubscriptionRegistry, keys: :unique},
{Prymn.Messaging.Connection, :nats1}
]
diff --git a/app/lib/prymn/messaging/messages.ex b/app/lib/prymn/messaging/messages.ex
index 31ed980..502b094 100644
--- a/app/lib/prymn/messaging/messages.ex
+++ b/app/lib/prymn/messaging/messages.ex
@@ -1,15 +1,28 @@
-defmodule Prymn.Messaging.Messages do
- alias Prymn.Messaging.Messages.Health
- require Logger
+defmodule Prymn.Messaging.Messages.TerminalOutput do
+ defstruct [:id, :output]
+end
- def handle_message(topic, body) do
- case topic do
- "health" -> Jason.decode!(body)
- _ -> Logger.warning("Received unknown topic inside the Connection server: #{topic}")
+# defmodule Prymn.Messaging.Messages.Health do
+# defstruct [:cpu_status, :memory_status, :disk_status]
+# end
+
+defmodule Prymn.Messaging.Messages do
+ require Logger
+ alias Prymn.Messaging.Messages.TerminalOutput
+
+ @v1_prefix "agents.v1."
+
+ def handle_message(subject, payload) do
+ {_agent_id, subjects} = extract_subject(subject)
+
+ case subjects do
+ ["health"] -> Jason.decode!(payload)
+ ["terminal", id, "output"] -> %TerminalOutput{id: id, output: payload}
end
end
-end
-defmodule Prymn.Messaging.Messages.Health do
- defstruct [:cpu_status, :memory_status, :disk_status]
+ defp extract_subject(@v1_prefix <> rest) do
+ [agent_id | subjects] = String.split(rest, ".")
+ {agent_id, subjects}
+ end
end
diff --git a/app/lib/prymn_web/components/terminal.ex b/app/lib/prymn_web/components/terminal.ex
index 2b48f99..f5c14bc 100644
--- a/app/lib/prymn_web/components/terminal.ex
+++ b/app/lib/prymn_web/components/terminal.ex
@@ -1,6 +1,8 @@
defmodule PrymnWeb.Terminal do
use PrymnWeb, :live_component
+ alias Prymn.Agents
+
@impl true
def mount(socket) do
{:ok, assign(socket, :open, false)}
@@ -44,51 +46,25 @@ defmodule PrymnWeb.Terminal do
end
@impl true
- def handle_event("open_terminal", _params, socket) do
- # agent = Prymn.Agents.from_server(socket.assigns.server)
- # pid = self()
+ def handle_event("open_terminal", _params, %{assigns: assigns} = socket) do
+ # TODO: make up a terminal id
+ Agents.open_terminal(assigns.agent)
- # Task.Supervisor.start_child(Prymn.TaskSupervisor, fn ->
- # # FIXME: Have to wrap this in a Task because gun sends unsolicited messages
- # # to calling process
- # stream = Prymn.Agents.terminal(agent)
-
- # {:ok, mux_pid} =
- # Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end)
-
- # send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true)
-
- # case GRPC.Stub.recv(stream, timeout: :infinity) do
- # {:ok, stream} ->
- # Enum.map(stream, fn
- # {:ok, %{output: data}} ->
- # send(mux_pid, :data)
- # send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data)
-
- # {:error, _err} ->
- # send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false)
- # end)
-
- # {:error, error} ->
- # dbg(error)
- # end
- # end)
-
- {:noreply, socket}
+ {:noreply, assign(socket, :open, true)}
end
def handle_event("close_terminal", _params, socket) do
- send(socket.assigns.mux_pid, :close)
- {:noreply, assign(socket, :open, false)}
+ Agents.close_terminal(socket.assigns.agent)
+ {:noreply, assign(socket, open: false, data: "")}
end
- def handle_event("data_event", data, socket) when is_binary(data) do
- send(socket.assigns.mux_pid, {:data_event, data})
+ def handle_event("data_event", data, %{assigns: assigns} = socket) when is_binary(data) do
+ Agents.send_terminal_input(assigns.agent, data)
{:noreply, socket}
end
def handle_event("resize_event", %{"cols" => cols, "rows" => rows}, socket) do
- send(socket.assigns.mux_pid, {:resize_event, rows, cols})
+ Agents.resize_terminal(socket.assigns.agent, rows, cols)
{:noreply, socket}
end
diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex
index 6280da1..8b2ea4e 100644
--- a/app/lib/prymn_web/live/server_live/show.ex
+++ b/app/lib/prymn_web/live/server_live/show.ex
@@ -2,7 +2,7 @@ defmodule PrymnWeb.ServerLive.Show do
use PrymnWeb, :live_view
require Logger
- alias Prymn.{Agents, Servers}
+ alias Prymn.{Agents, Servers, Messaging}
@impl true
def mount(_params, _session, socket) do
@@ -20,7 +20,7 @@ defmodule PrymnWeb.ServerLive.Show do
<.dropdown title="Select a different server">
<:button variant="tertiary">Server <%= @server.name %>
<:item
- :for={server <- Enum.filter(@servers, fn s -> s.id != @server.id end)}
+ :for={server <- Enum.filter(@servers, &(&1.id != @server.id))}
patch={~p"/servers/#{server}"}
>
<%= server.name %>
@@ -29,7 +29,7 @@ defmodule PrymnWeb.ServerLive.Show do