todo: agents as genservers, messaging connection manager

This commit is contained in:
Nikos Papadakis 2024-02-04 20:13:35 +02:00
parent 3e066fd23a
commit 6db2478786
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
17 changed files with 115 additions and 275 deletions

View file

@ -16,12 +16,14 @@ pub struct Client {
impl Client { impl Client {
pub async fn connect(id: &str) -> Result<Self, super::MessagingError> { pub async fn connect(id: &str) -> Result<Self, super::MessagingError> {
let prefix = format!("agents.v1.{}", id);
let nats = async_nats::ConnectOptions::with_user_and_password( let nats = async_nats::ConnectOptions::with_user_and_password(
String::from("demo_agent"), String::from("demo_agent"),
String::from("demo_agent_password"), String::from("demo_agent_password"),
) )
.name(format!("Prymn Agent {id}")) .name(format!("Prymn Agent {id}"))
.custom_inbox_prefix(format!("_INBOX_{id}")) .custom_inbox_prefix(format!("{prefix}._INBOX_"))
.connect("localhost") .connect("localhost")
.await .await
.map_err(|err| { .map_err(|err| {
@ -31,7 +33,7 @@ impl Client {
Ok(Self { Ok(Self {
id: Arc::new(String::from(id)), id: Arc::new(String::from(id)),
prefix: Arc::new(format!("agents.v1.{}", id)), prefix: Arc::new(prefix),
nats, nats,
}) })
} }

View file

@ -1,38 +1,2 @@
defmodule Prymn.Agents do defmodule Prymn.Agents do
alias Prymn.Messaging
alias Prymn.Agents
def from_server(%Prymn.Servers.Server{} = server) do
agent_id = to_string(server.id)
Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{agent_id}")
Agents.Store.get_or_default(agent_id)
end
def from_servers(servers) when is_list(servers) do
Enum.reduce(servers, [], fn server, acc ->
[{server.id, from_server(server)} | acc]
end)
|> Map.new()
end
def update_health(agent_id, health) when is_binary(agent_id) do
agent = Agents.Store.update_health(agent_id, health)
Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent)
end
def open_terminal(%Agents.Agent{id: id}) do
Messaging.open_terminal(id)
end
def close_terminal(%Agents.Agent{id: id}) do
Messaging.close_terminal(id)
end
def resize_terminal(%Agents.Agent{id: id}, rows, cols) do
Messaging.resize_terminal(id, rows, cols)
end
def send_terminal_input(%Agents.Agent{id: id}, input) do
Messaging.send_terminal_input(id, input)
end
end end

View file

@ -1,7 +0,0 @@
defmodule Prymn.Agents.Agent do
defstruct [:id, :health, status: :disconnected]
def new(id) do
%__MODULE__{id: id}
end
end

View file

@ -1,9 +0,0 @@
defmodule Prymn.Agents.Health do
@moduledoc """
The Health struct keeps simple health information of whether or not the
target host machine is up to date, has any tasks running, its resources are
getting depleted, or if it's unable be reached.
"""
# defstruct status: :disconnected
end

View file

@ -1,44 +0,0 @@
defmodule Prymn.Agents.Store do
@moduledoc false
use Agent
alias Prymn.Agents
# Stores and serves locally saved "Prymn Agents" to the application
# Not to be confused with the "Prymn Server" construct
@doc false
def start_link([]) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def get(id) do
Agent.get(__MODULE__, fn agents -> agents[id] end)
end
def get_or_default(id, new_agent \\ nil) do
case Agent.get(__MODULE__, fn agents -> agents[id] end) do
nil ->
agent = new_agent || Agents.Agent.new(id)
Agent.update(__MODULE__, &Map.put(&1, id, agent))
agent
agent ->
agent
end
end
def update_health(id, health) do
case get(id) do
nil ->
agent = %Agents.Agent{Agents.Agent.new(id) | status: :connected, health: health}
Agent.update(__MODULE__, &Map.put(&1, id, agent))
agent
agent ->
agent = %Agents.Agent{agent | status: :connected, health: health}
Agent.update(__MODULE__, &Map.put(&1, id, agent))
agent
end
end
end

View file

@ -14,9 +14,8 @@ defmodule Prymn.Application do
{Phoenix.PubSub, name: Prymn.PubSub}, {Phoenix.PubSub, name: Prymn.PubSub},
{Finch, name: Prymn.Finch}, {Finch, name: Prymn.Finch},
{Oban, Application.fetch_env!(:prymn, Oban)}, {Oban, Application.fetch_env!(:prymn, Oban)},
Prymn.Agents.Store, Prymn.Messaging.Supervisor,
Prymn.Messaging.ConnectionSupervisor, # {Task.Supervisor, name: Prymn.TaskSupervisor},
{Task.Supervisor, name: Prymn.TaskSupervisor},
PrymnWeb.Endpoint PrymnWeb.Endpoint
] ]

View file

@ -1,26 +1,2 @@
defmodule Prymn.Messaging do defmodule Prymn.Messaging do
@moduledoc """
The Prymn messaging system
"""
alias Prymn.Messaging
def open_terminal(agent_id) do
msg = Jason.encode!(%{"id" => "foo"})
:ok = Messaging.Connection.subscribe_to_agent(:nats1, agent_id, "terminal.foo.output")
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "open_terminal", msg)
end
def close_terminal(agent_id) do
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.close")
end
def resize_terminal(agent_id, rows, cols) do
msg = Jason.encode!(%{"rows" => rows, "cols" => cols})
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.resize", msg)
end
def send_terminal_input(agent_id, input) do
:ok = Messaging.Connection.publish_to_agent(:nats1, agent_id, "terminal.foo.input", input)
end
end end

View file

@ -1,89 +1,62 @@
defmodule Prymn.Messaging.Connection do defmodule Prymn.Messaging.Connection do
use GenServer use GenServer
defstruct [:conn_pid, subscriptions: %{}]
require Logger require Logger
alias Prymn.Agents
alias Prymn.Messaging
@dialyzer {:nowarn_function, init: 1} @dialyzer {:nowarn_function, init: 1}
@subscription_supervisor Prymn.Messaging.SubscriptionSupervisor
@v1_prefix "agents.v1."
def start_link(name) do def start_link(%{name: name} = init_arg) do
GenServer.start_link(__MODULE__, nil, name: name) GenServer.start_link(__MODULE__, init_arg, name: name)
end end
def publish_to_agent(conn, agent_id, subject, payload \\ "") do def publish(server, subject, payload) do
GenServer.call(conn, {:publish, agent_id, subject, payload}) GenServer.call(server, {:pub, subject, payload})
end end
def subscribe_to_agent(conn, agent_id, subject) do def subscribe(server, subject, reply) do
GenServer.call(conn, {:subscribe, agent_id, subject}) GenServer.call(server, {:sub, subject, reply})
end end
@impl true @impl true
def init(_init_arg) do def init(_init_arg) do
connection_properties = %{ Process.flag(:trap_exit, true)
connect_opts = %{
host: "localhost", host: "localhost",
username: "prymn_admin", username: "prymn_admin",
password: "prymn_admin", password: "prymn_admin",
auth_required: true auth_required: true
} }
Process.flag(:trap_exit, true) case Gnat.start_link(connect_opts) do
case Gnat.start_link(connection_properties) do
{:ok, pid} -> {:ok, pid} ->
Logger.info("Connected to NATS") {:ok, pid}
{:ok, %__MODULE__{conn_pid: pid}, {:continue, :subscribe_to_health}}
{:error, reason} -> {:error, reason} ->
Logger.info("Connection to NATS failed (#{reason}). Attempting reconnect.") # Let the supervisor restart the Connection after a short delay
{:ok, nil, {:continue, :attempt_reconnect}} Logger.info("Initial NATS connection failed. Restarting...")
Process.sleep(1000)
{:stop, reason}
end end
end end
@impl true @impl true
def handle_continue(:attempt_reconnect, state) do def handle_call({:pub, subject, payload}, _from, conn_pid) do
Process.sleep(3000) :ok = Gnat.pub(conn_pid, subject, payload)
{:stop, {:shutdown, :connection_failure}, state}
{:reply, :ok, conn_pid}
end end
def handle_continue(:subscribe_to_health, %__MODULE__{} = state) do def handle_call({:sub, subject, reply}, _from, conn_pid) do
{:ok, _subscription} = Gnat.sub(state.conn_pid, self(), @v1_prefix <> "*.health") {:ok, cons_pid} = GenServer.start_link(Prymn.Messaging.FooConsumer, reply)
{:noreply, state} {:ok, sub} = Gnat.sub(conn_pid, cons_pid, subject)
{:reply, {:ok, sub}, conn_pid}
end end
@impl true @impl true
def handle_call({:publish, agent_id, subject, payload}, _from, %{conn_pid: conn_pid} = state) do def handle_info({:EXIT, pid, reason}, conn_pid) when conn_pid == pid do
:ok = Gnat.pub(conn_pid, @v1_prefix <> "#{agent_id}.#{subject}", payload) Logger.info("NATS connection lost (#{reason})")
{:reply, :ok, state} {:stop, {:shutdown, :connection_closed}, conn_pid}
end
def handle_call({:subscribe, agent_id, subject}, {pid, _}, %{conn_pid: conn_pid} = state) do
args = %{agent_id: agent_id, subject: subject, gnat: conn_pid, reply: pid}
case DynamicSupervisor.start_child(@subscription_supervisor, {Messaging.Subscription, args}) do
{:ok, _pid} -> {:reply, :ok, state}
{:error, {:already_started, _pid}} -> {:reply, :ok, state}
{:error, error} -> {:reply, {:error, error}, state}
end
end
@impl true
def handle_info({:EXIT, _pid, _reason}, state) do
Logger.info("Lost connection to NATS. Attempting reconnect.")
{:noreply, state, {:continue, :attempt_reconnect}}
end
def handle_info({:msg, %{body: payload, topic: @v1_prefix <> topic = subject}}, state) do
[agent_id, "health"] = String.split(topic, ".")
health = Messaging.Messages.handle_message(subject, payload)
Agents.update_health(agent_id, health)
{:noreply, state}
end end
end end

View file

@ -0,0 +1,38 @@
defmodule Prymn.Messaging.ConnectionManager do
use GenServer
defstruct subscriptions: %{}
def publish(subject, payload) do
GenServer.call(__MODULE__, {:pub, subject, payload})
end
def subscribe(subject) do
GenServer.call(__MODULE__, {:pub, subject})
end
@impl true
def init(_init_arg) do
children = [
{Prymn.Messaging.Connection, %{name: :nats}}
]
Process.monitor(:nats)
{:ok, %__MODULE__{}}
end
@impl true
def handle_call({:pub, subject, payload}, _from, state) do
Prymn.Messaging.Connection.publish(:nats, subject, payload)
{:reply, :ok, state}
end
def handle_call({:sub, subject}, {pid, _}, %__MODULE__{subscriptions: subsciptions} = state) do
Prymn.Messaging.Connection.subscribe(:nats, subject, pid)
# Map.put(subsciptions)
{:reply, :ok, state}
end
end

View file

@ -1,18 +0,0 @@
defmodule Prymn.Messaging.ConnectionSupervisor do
use Supervisor, restart: :permanent
def start_link([]) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{DynamicSupervisor, name: Prymn.Messaging.SubscriptionSupervisor},
{Registry, name: Prymn.Messaging.SubscriptionRegistry, keys: :unique},
{Prymn.Messaging.Connection, :nats1}
]
Supervisor.init(children, strategy: :one_for_one)
end
end

View file

@ -0,0 +1,15 @@
defmodule Prymn.Messaging.FooConsumer do
use GenServer
@impl true
def init(reply) do
{:ok, reply}
end
@impl true
def handle_info({:msg, %{topic: subject, body: body}}, reply) do
dbg("received on fooconsumer: #{subject}")
send(reply, body)
{:noreply, reply}
end
end

View file

@ -1,28 +0,0 @@
defmodule Prymn.Messaging.Messages.TerminalOutput do
defstruct [:id, :output]
end
# defmodule Prymn.Messaging.Messages.Health do
# defstruct [:cpu_status, :memory_status, :disk_status]
# end
defmodule Prymn.Messaging.Messages do
require Logger
alias Prymn.Messaging.Messages.TerminalOutput
@v1_prefix "agents.v1."
def handle_message(subject, payload) do
{_agent_id, subjects} = extract_subject(subject)
case subjects do
["health"] -> Jason.decode!(payload)
["terminal", id, "output"] -> %TerminalOutput{id: id, output: payload}
end
end
defp extract_subject(@v1_prefix <> rest) do
[agent_id | subjects] = String.split(rest, ".")
{agent_id, subjects}
end
end

View file

@ -1,36 +0,0 @@
defmodule Prymn.Messaging.Subscription do
use GenServer
alias Prymn.Messaging
def start_link(%{agent_id: agent_id, subject: subject} = init_arg) do
name = {:via, Registry, {Prymn.Messaging.SubscriptionRegistry, agent_id <> subject}}
GenServer.start_link(__MODULE__, init_arg, name: name)
end
@impl true
def init(%{agent_id: agent_id, subject: subject, gnat: gnat, reply: reply}) do
Process.monitor(reply)
subject = "agents.v1.#{agent_id}.#{subject}"
{:ok, sub} = Gnat.sub(gnat, self(), subject)
{:ok, %{reply: reply, sub: sub, gnat: gnat}}
end
@impl true
def handle_info({:msg, %{body: body, topic: subject}}, %{reply: reply} = state) do
msg = Messaging.Messages.handle_message(subject, body)
send(reply, msg)
{:noreply, state}
end
def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do
{:stop, {:shutdown, :gone}, state}
end
@impl true
def terminate({:shutdown, _}, state) do
Gnat.unsub(state.gnat, state.sub)
end
end

View file

@ -0,0 +1,17 @@
defmodule Prymn.Messaging.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children =
[
# Prymn.Messaging.HealthConsumer
]
Supervisor.init(children, strategy: :one_for_one)
end
end

View file

@ -65,10 +65,10 @@ defmodule PrymnWeb.ServerLive.Index do
|> update(:servers, fn servers -> [server | servers] end)} |> update(:servers, fn servers -> [server | servers] end)}
end end
def handle_info(%Agents.Agent{} = agent, socket) do # def handle_info(%Agents.Agent{} = agent, socket) do
id = String.to_integer(agent.id) # id = String.to_integer(agent.id)
{:noreply, update(socket, :agents, &Map.put(&1, id, agent))} # {:noreply, update(socket, :agents, &Map.put(&1, id, agent))}
end # end
def handle_info(msg, state) do def handle_info(msg, state) do
Logger.debug("received unexpected message #{inspect(msg)}") Logger.debug("received unexpected message #{inspect(msg)}")

View file

@ -116,15 +116,15 @@ defmodule PrymnWeb.ServerLive.Show do
|> assign(:registration_command, Servers.create_setup_command(server))} |> assign(:registration_command, Servers.create_setup_command(server))}
end end
@impl true # @impl true
def handle_info(%Agents.Agent{} = agent, socket) do # def handle_info(%Agents.Agent{} = agent, socket) do
{:noreply, assign(socket, :agent, agent)} # {:noreply, assign(socket, :agent, agent)}
end # end
def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do # def handle_info(%Messaging.Messages.TerminalOutput{output: output}, socket) do
send_update(PrymnWeb.Terminal, id: "terminal", data: output) # send_update(PrymnWeb.Terminal, id: "terminal", data: output)
{:noreply, socket} # {:noreply, socket}
end # end
# @impl true # @impl true
# def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do # def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do

View file

@ -18,8 +18,6 @@
crane = { crane = {
url = "github:ipetkov/crane"; url = "github:ipetkov/crane";
inputs.nixpkgs.follows = "nixpkgs"; inputs.nixpkgs.follows = "nixpkgs";
inputs.flake-utils.follows = "flake-utils";
inputs.rust-overlay.follows = "rust-overlay";
}; };
}; };