This commit is contained in:
Nikos Papadakis 2024-02-02 20:53:52 +02:00
parent e0850c1d2b
commit 59945bd2de
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
8 changed files with 135 additions and 89 deletions

View file

@ -81,8 +81,8 @@ impl OpenTerminalMessage {
}
}
_close = close_stream.next() => {
tracing::info!("closing terminal");
child.kill().await.expect("the child to exit normally");
tracing::info!("closing terminal");
break;
}
io_result = child.wait() => {

View file

@ -1,4 +1,5 @@
defmodule Prymn.Agents do
alias Prymn.Messaging
alias Prymn.Agents
def from_server(%Prymn.Servers.Server{} = server) do
@ -18,4 +19,20 @@ defmodule Prymn.Agents do
agent = Agents.Store.update_health(agent_id, health)
Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent)
end
def open_terminal(%Agents.Agent{id: id}) do
Messaging.open_terminal(id)
end
def close_terminal(%Agents.Agent{id: id}) do
Messaging.close_terminal(id)
end
def resize_terminal(%Agents.Agent{id: id}, rows, cols) do
Messaging.resize_terminal(id, rows, cols)
end
def send_terminal_input(%Agents.Agent{id: id}, input) do
Messaging.send_terminal_input(id, input)
end
end

View file

@ -3,9 +3,24 @@ defmodule Prymn.Messaging do
The Prymn messaging system
"""
def agent_health_topic(agent_id), do: "health.#{agent_id}"
alias Prymn.Messaging
def subscribe_to_agent_health(agent_id) do
Phoenix.PubSub.subscribe(Prymn.PubSub, agent_health_topic(agent_id))
def open_terminal(agent_id) do
msg = Jason.encode!(%{"id" => "foo"})
:ok = Messaging.Connection.subscribe_to_agent(:nats1, agent_id, "terminal.foo.output")
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "open_terminal", msg)
end
def close_terminal(agent_id) do
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.close")
end
def resize_terminal(agent_id, rows, cols) do
msg = Jason.encode!(%{"rows" => rows, "cols" => cols})
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.resize", msg)
end
def send_terminal_input(agent_id, input) do
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.input", input)
end
end

View file

@ -1,18 +1,28 @@
defmodule Prymn.Messaging.Connection do
use GenServer
defstruct [:conn_pid]
defstruct [:conn_pid, subscriptions: %{}]
require Logger
alias Prymn.Agents
alias Prymn.Messaging
@dialyzer {:nowarn_function, init: 1}
@subscription_supervisor Prymn.Messaging.SubscriptionSupervisor
@v1_prefix "agents.v1."
def start_link(name) do
GenServer.start_link(__MODULE__, nil, name: name)
end
def publish_to_agent(conn, agent_id, subject, payload \\ "") do
GenServer.call(conn, {:publish, agent_id, subject, payload})
end
def subscribe_to_agent(conn, agent_id, subject) do
GenServer.call(conn, {:subscribe, agent_id, subject})
end
@impl true
def init(_init_arg) do
connection_properties = %{
@ -46,15 +56,32 @@ defmodule Prymn.Messaging.Connection do
{:noreply, state}
end
@impl true
def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do
:ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload)
{:reply, :ok, state}
end
def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do
args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid}
case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do
{:ok, _pid} -> {:reply, :ok, state}
{:error, {:already_started, _pid}} -> {:reply, :ok, state}
{:error, error} -> {:reply, {:error, error}, state}
end
end
@impl true
def handle_info({:EXIT, _pid, _reason}, state) do
Logger.info("Lost connection to NATS. Attempting reconnect.")
{:noreply, state, {:continue, :attempt_reconnect}}
end
def handle_info({:msg, %{body: body, topic: @v1_prefix <> topic}}, state) do
[agent_id, topic] = String.split(topic, ".")
health = Messaging.Messages.handle_message(topic, body)
def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do
[agent_id, "health"] = String.split(topic, ".")
health = Messaging.Messages.handle_message(subject, payload)
Agents.update_health(agent_id, health)
{:noreply, state}

View file

@ -8,6 +8,8 @@ defmodule Prymn.Messaging.ConnectionSupervisor do
@impl true
def init(_init_arg) do
children = [
{DynamicSupervisor, name: Prymn.Messaging.SubscriptionSupervisor},
{Registry, name: Prymn.Messaging.SubscriptionRegistry, keys: :unique},
{Prymn.Messaging.Connection, :nats1}
]

View file

@ -1,15 +1,28 @@
defmodule Prymn.Messaging.Messages.TerminalOutput do
defstruct [:id, :output]
end
# defmodule Prymn.Messaging.Messages.Health do
# defstruct [:cpu_status, :memory_status, :disk_status]
# end
defmodule Prymn.Messaging.Messages do
alias Prymn.Messaging.Messages.Health
require Logger
alias Prymn.Messaging.Messages.TerminalOutput
def handle_message(topic, body) do
case topic do
"health" -> Jason.decode!(body)
_ -> Logger.warning("Received unknown topic inside the Connection server: #{topic}")
@v1_prefix "agents.v1."
def handle_message(subject, payload) do
{_agent_id, subjects} = extract_subject(subject)
case subjects do
["health"] -> Jason.decode!(payload)
["terminal", id, "output"] -> %TerminalOutput{id: id, output: payload}
end
end
end
defmodule Prymn.Messaging.Messages.Health do
defstruct [:cpu_status, :memory_status, :disk_status]
defp extract_subject(@v1_prefix <> rest) do
[agent_id | subjects] = String.split(rest, ".")
{agent_id, subjects}
end
end

View file

@ -1,6 +1,8 @@
defmodule PrymnWeb.Terminal do
use PrymnWeb, :live_component
alias Prymn.Agents
@impl true
def mount(socket) do
{:ok, assign(socket, :open, false)}
@ -44,51 +46,25 @@ defmodule PrymnWeb.Terminal do
end
@impl true
def handle_event("open_terminal", _params, socket) do
# agent = Prymn.Agents.from_server(socket.assigns.server)
# pid = self()
def handle_event("open_terminal", _params, %{assigns: assigns} = socket) do
# TODO: make up a terminal id
Agents.open_terminal(assigns.agent)
# Task.Supervisor.start_child(Prymn.TaskSupervisor, fn ->
# # FIXME: Have to wrap this in a Task because gun sends unsolicited messages
# # to calling process
# stream = Prymn.Agents.terminal(agent)
# {:ok, mux_pid} =
# Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end)
# send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true)
# case GRPC.Stub.recv(stream, timeout: :infinity) do
# {:ok, stream} ->
# Enum.map(stream, fn
# {:ok, %{output: data}} ->
# send(mux_pid, :data)
# send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data)
# {:error, _err} ->
# send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false)
# end)
# {:error, error} ->
# dbg(error)
# end
# end)
{:noreply, socket}
{:noreply, assign(socket, :open, true)}
end
def handle_event("close_terminal", _params, socket) do
send(socket.assigns.mux_pid, :close)
{:noreply, assign(socket, :open, false)}
Agents.close_terminal(socket.assigns.agent)
{:noreply, assign(socket, open: false, data: "")}
end
def handle_event("data_event", data, socket) when is_binary(data) do
send(socket.assigns.mux_pid, {:data_event, data})
def handle_event("data_event", data, %{assigns: assigns} = socket) when is_binary(data) do
Agents.send_terminal_input(assigns.agent, data)
{:noreply, socket}
end
def handle_event("resize_event", %{"cols" => cols, "rows" => rows}, socket) do
send(socket.assigns.mux_pid, {:resize_event, rows, cols})
Agents.resize_terminal(socket.assigns.agent, rows, cols)
{:noreply, socket}
end

View file

@ -2,7 +2,7 @@ defmodule PrymnWeb.ServerLive.Show do
use PrymnWeb, :live_view
require Logger
alias Prymn.{Agents, Servers}
alias Prymn.{Agents, Servers, Messaging}
@impl true
def mount(_params, _session, socket) do
@ -20,7 +20,7 @@ defmodule PrymnWeb.ServerLive.Show do
<.dropdown title="Select a different server">
<:button variant="tertiary">Server <%= @server.name %></:button>
<:item
:for={server <- Enum.filter(@servers, fn s -> s.id != @server.id end)}
:for={server <- Enum.filter(@servers, &(&1.id != @server.id))}
patch={~p"/servers/#{server}"}
>
<%= server.name %>
@ -29,7 +29,7 @@ defmodule PrymnWeb.ServerLive.Show do
<Button.tertiary title="Edit server name" phx-click={show_edit_server_name()}>
<.icon class="h-4 w-4" name="hero-pencil-solid" />
</Button.tertiary>
<.indicator message={@health.message} />
<.indicator message="test" />
</div>
<form class="hidden items-center" id="server-name-edit" phx-submit={submit_edit_server_name()}>
<input class="outline-none" type="text" name="name" value={@server.name} required />
@ -48,10 +48,6 @@ defmodule PrymnWeb.ServerLive.Show do
</div>
</div>
<span class="text-sm opacity-75"><%= @server.public_ip %></span>
<div :for={{name, task} <- @health.tasks} class="my-3 text-sm text-slate-700">
<p>Background task in progress: <%= name %></p>
<p><%= task.progress %> complete</p>
</div>
<div :if={@server.status == :unregistered} class="my-10">
<p class="mb-9">
Connect to your server using root credentials and execute the following command:
@ -96,7 +92,7 @@ defmodule PrymnWeb.ServerLive.Show do
<h2 class="my-5 text-xl">
Terminal
</h2>
<.live_component id="terminal" module={PrymnWeb.Terminal} server={@server} />
<.live_component id="terminal" module={PrymnWeb.Terminal} agent={@agent} />
</section>
</div>
<.back navigate={~p"/servers"}>Back to servers</.back>
@ -107,29 +103,29 @@ defmodule PrymnWeb.ServerLive.Show do
@impl true
def handle_params(%{"id" => id}, _, socket) do
server = Servers.get_server!(id)
socket =
if connected?(socket) and server.status == :registered do
agent = Agents.from_server(server)
Agents.subscribe_to_health(agent)
assign(socket, :agent, agent)
else
socket
end
# health = Agents.get_health(server.public_ip)
{:noreply,
socket
|> assign(:page_title, server.name)
# |> assign(:health, health || %{message: "Connecting...", tasks: []})
|> assign(:server, server)
|> assign(:agent, agent)
|> assign(:dry_run, false)
|> assign(:update_output, [])
# TODO: Do not assign this to the socket - instead generate it in the HTML
|> assign(:registration_command, Servers.create_setup_command(server))}
end
@impl true
def handle_info(%Agents.Agent{} = agent, socket) do
{:noreply, assign(socket, :agent, agent)}
end
def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do
send_update(PrymnWeb.Terminal, id: "terminal", data: output)
{:noreply, socket}
end
# @impl true
# def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do
# output = String.split(response.output, "\n")
@ -148,24 +144,24 @@ defmodule PrymnWeb.ServerLive.Show do
@impl true
def handle_event("system_update", _params, socket) do
server_name = get_in(socket.assigns, [:server, Access.key(:name)])
pid = self()
# server_name = get_in(socket.assigns, [:server, Access.key(:name)])
# pid = self()
if agent = socket.assigns[:agent] do
# TODO: This is ugly
Task.start_link(fn ->
Agents.sys_update(agent, %{dry_run: socket.assigns.dry_run})
|> Stream.each(fn
{:ok, msg} -> send(pid, msg)
{:error, error} -> Logger.error("error during system update call: #{inspect(error)}")
end)
|> Enum.to_list()
end)
# if agent = socket.assigns[:agent] do
# # TODO: This is ugly
# Task.start_link(fn ->
# Agents.sys_update(agent, %{dry_run: socket.assigns.dry_run})
# |> Stream.each(fn
# {:ok, msg} -> send(pid, msg)
# {:error, error} -> Logger.error("error during system update call: #{inspect(error)}")
# end)
# |> Enum.to_list()
# end)
put_flash(socket, :info, "Started a system update on server #{server_name}.")
else
put_flash(socket, :error, "Could not perform the update.")
end
# put_flash(socket, :info, "Started a system update on server #{server_name}.")
# else
# put_flash(socket, :error, "Could not perform the update.")
# end
{:noreply, socket}
end