diff --git a/agent/server.conf b/agent/server.conf index cf24e03..6c95148 100644 --- a/agent/server.conf +++ b/agent/server.conf @@ -13,10 +13,10 @@ authorization: { password: demo_agent_password permissions: { publish: [ - "agents.v1.demo_agent.>" + "agents.v1.1.>" ] subscribe: [ - "agents.v1.demo_agent.>" + "agents.v1.1.>" "_INBOX_demo_agent.>" ] } diff --git a/agent/src/main.rs b/agent/src/main.rs index bd43804..ee93cb7 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -20,7 +20,8 @@ async fn main() -> anyhow::Result<()> { } async fn run() -> anyhow::Result<()> { - let client = messaging::Client::connect("demo_agent").await?; + // TODO: Configurable client + let client = messaging::Client::connect("1").await?; init_health_subsystem(client.clone()); tracing::info!("initialized health subsystem"); diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index 66183b3..7aa9d58 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -1,145 +1,21 @@ defmodule Prymn.Agents do - @moduledoc ~S""" - Prymn Agents are programs that manage a remote client machine. Prymn backend - communicates with them using GRPC calls. GRPC connections are started using - the Prymn.Agents.ConnectionSupervisor and are book-kept using the - Prymn.Agents.Registry. + alias Prymn.Agents - Agents are only valid when a `Prymn.Servers.Server` is considered registered. - - """ - - require Logger - alias Prymn.Agents.{Connection, Health, Agent} - alias PrymnProto.Prymn.Agent.Stub - alias PrymnProto.Prymn.{ExecRequest, SysUpdateRequest} - - @doc """ - Establish a connection with a Server if one does not already exist, and - return an Agent that interfaces with the rest of the system. - - ## Examples - iex> Prymn.Servers.get_server_by_ip!("127.0.0.1") |> Prymn.Agents.from_server() - %Prymn.Agents.Agent{} - """ - def from_server(%Prymn.Servers.Server{status: :registered} = server) do - case start_connection(server.public_ip) do - {:ok, _pid} -> Agent.new(server.public_ip) - {:error, error} -> {:error, error} - end + def from_server(%Prymn.Servers.Server{} = server) do + agent_id = to_string(server.id) + Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{agent_id}") + Agents.Store.get_or_default(agent_id) end - def from_server(%Prymn.Servers.Server{}) do - Logger.error("Tried to establish a connection with an unregistered server.") - {:error, :unauthorized_action} + def from_servers(servers) when is_list(servers) do + Enum.reduce(servers, [], fn server, acc -> + [{server.id, from_server(server)} | acc] + end) + |> Map.new() end - @doc """ - Establish a connection with a Server if one does not already exist for a - given App. Returns an [Agent] that interfaces with the rest of the system. - """ - def from_app(%Prymn.Apps.App{} = app) do - app = Prymn.Repo.preload(app, :server) - from_server(app.server) - end - - @doc """ - Starts a new connection with `host_address` if one does not exist. - - ## Examples - iex> Prymn.Agents.start_connection("127.0.0.1") - {:ok, } - iex> Prymn.Agents.start_connection("127.0.0.1") - {:ok, } - """ - def start_connection(host_address) do - spec = {Connection, host_address} - - case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do - {:ok, pid} -> {:ok, pid} - {:error, {:already_started, pid}} -> {:ok, pid} - {:error, error} -> {:error, error} - end - end - - @doc """ - Subscribe to the host's Health using Phoenix.PubSub. - Broadcasted messages are the Health struct: - - %Prymn.Agents.Health{} - """ - def subscribe_to_health(%Agent{} = agent) do - :ok = Health.subscribe(agent.host_address) - agent - end - - def subscribe_to_health(host_address) do - :ok = Health.subscribe(host_address) - end - - # TODO - # def alive?(host_address) do - # end - - @doc """ - Return the last known health status of the Agent, or `nil` if it doesn't - exist. - """ - def get_health(host_address) do - Health.lookup(host_address) - end - - @doc """ - Get the system's information (CPU, Memory usage, etc.). - """ - def get_sys_info(%Agent{} = agent) do - {:error, :unimplemented} - end - - @doc """ - Run a command. - """ - def exec(%Agent{} = agent, %ExecRequest{} = request) do - with {:ok, channel} <- get_channel(agent), - {:ok, result} <- Stub.exec(channel, request) do - result - else - {:error, error} -> {:error, error} - end - end - - def exec(%Agent{} = agent, request) when is_map(request), - do: exec(agent, struct(ExecRequest, request)) - - @doc """ - Perform a system update. - """ - def sys_update(%Agent{} = agent, %SysUpdateRequest{} = request) do - with {:ok, channel} <- get_channel(agent), - {:ok, result} <- Stub.sys_update(channel, request) do - result - else - {:error, error} -> {:error, error} - end - end - - def sys_update(%Agent{} = agent, request) when is_map(request), - do: sys_update(agent, struct(SysUpdateRequest, request)) - - def terminal(%Agent{} = agent) do - # TODO: Find a better solve for bi-directional GRPC stream - with {:ok, channel} <- get_channel(agent), - stream <- Stub.terminal(channel) do - stream - else - {:error, error} -> {:error, error} - end - end - - defp get_channel(%Agent{} = agent) do - case start_connection(agent.host_address) do - {:ok, pid} -> {:ok, Connection.get_channel(pid)} - {:error, error} -> {:error, error} - end + def update_health(agent_id, health) when is_binary(agent_id) do + agent = Agents.Store.update_health(agent_id, health) + Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent) end end diff --git a/app/lib/prymn/agents/agent.ex b/app/lib/prymn/agents/agent.ex index a2594e5..15619db 100644 --- a/app/lib/prymn/agents/agent.ex +++ b/app/lib/prymn/agents/agent.ex @@ -1,13 +1,7 @@ defmodule Prymn.Agents.Agent do - @moduledoc false + defstruct [:id, :health, status: :disconnected] - defstruct [:host_address] - - @type t :: %__MODULE__{ - host_address: String.t() - } - - def new(host_address) when is_binary(host_address) do - %__MODULE__{host_address: host_address} + def new(id) do + %__MODULE__{id: id} end end diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex deleted file mode 100644 index ac76ddf..0000000 --- a/app/lib/prymn/agents/connection.ex +++ /dev/null @@ -1,25 +0,0 @@ -defmodule Prymn.Agents.Connection do - @moduledoc false - use GenServer, restart: :transient - - def start_link(host_address) do - GenServer.start_link(__MODULE__, host_address, name: via(host_address)) - end - - def get_channel(server) when is_pid(server) do - GenServer.call(server, :get_channel) - end - - ## - ## Server callbacks - ## - - @impl true - def init(host) do - {:ok, {host, nil}} - end - - defp via(name) do - {:via, Registry, {Prymn.Agents.Registry, name}} - end -end diff --git a/app/lib/prymn/agents/health.ex b/app/lib/prymn/agents/health.ex index 95d0ffd..654955b 100644 --- a/app/lib/prymn/agents/health.ex +++ b/app/lib/prymn/agents/health.ex @@ -5,104 +5,5 @@ defmodule Prymn.Agents.Health do getting depleted, or if it's unable be reached. """ - defstruct [:host, :version, :status, tasks: [], message: "Unknown"] - - alias PrymnProto.Prymn.HealthResponse - - @type t :: %{ - host: String.t(), - version: String.t(), - status: atom(), - message: String.t() - } - - def start() do - :ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true]) - end - - def subscribe(host) do - Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host}") - end - - def broadcast!(%__MODULE__{host: host} = health) do - Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host}", health) - end - - def update_and_broadcast(nil) do - nil - end - - def update_and_broadcast(%__MODULE__{host: host} = health) do - :ets.insert(__MODULE__, {host, health}) - broadcast!(health) - end - - def delete(host_address) do - :ets.delete(__MODULE__, host_address) - end - - def lookup(host_address, opts \\ []) do - default = Keyword.get(opts, :default, false) - - case :ets.lookup(__MODULE__, host_address) do - [{^host_address, value}] -> value - [] when default -> %__MODULE__{host: host_address} - [] -> nil - end - end - - def new(agent_id, %{"cpu_status" => cpu, "memory_status" => memory, "disk_status" => disk}) do - %__MODULE__{host: agent_id, version: "0.1.0"} - |> do_cpu(cpu) - |> do_memory(memory) - |> do_disks(disk) - end - - defp do_cpu(health, cpu) do - %__MODULE__{health | message: "Connected", status: :connected} - end - - defp do_memory(health, memory) do - health - end - - defp do_disks(health, disks) do - health - end - - def make_timed_out(%__MODULE__{} = health) do - %__MODULE__{health | status: :unreachable, message: "Connect timed out"} - end - - def make_disconnected(%__MODULE__{} = health) do - %__MODULE__{health | status: :disconnected, message: "Disconnected"} - end - - def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do - %__MODULE__{host: host, status: :connected} - |> do_version(version) - |> do_system(system) - |> do_tasks(tasks) - end - - defp do_version(health, version) do - %__MODULE__{health | version: version} - end - - defp do_system(health, system) do - case system.status do - "normal" -> %__MODULE__{health | message: "Connected"} - status -> %__MODULE__{health | message: "Alert: #{status}"} - end - end - - defp do_tasks(health, tasks) do - tasks = - Enum.map(tasks, fn {task_key, task_value} -> - progress = Float.round(task_value.progress, 2) - {task_key, %{task_value | progress: "#{progress}%"}} - end) - - %__MODULE__{health | tasks: tasks} - end + # defstruct status: :disconnected end diff --git a/app/lib/prymn/agents/store.ex b/app/lib/prymn/agents/store.ex new file mode 100644 index 0000000..6e0f9e8 --- /dev/null +++ b/app/lib/prymn/agents/store.ex @@ -0,0 +1,44 @@ +defmodule Prymn.Agents.Store do + @moduledoc false + use Agent + + alias Prymn.Agents + + # Stores and serves locally saved "Prymn Agents" to the application + # Not to be confused with the "Prymn Server" construct + + @doc false + def start_link([]) do + Agent.start_link(fn -> %{} end, name: __MODULE__) + end + + def get(id) do + Agent.get(__MODULE__, fn agents -> agents[id] end) + end + + def get_or_default(id, new_agent \\ nil) do + case Agent.get(__MODULE__, fn agents -> agents[id] end) do + nil -> + agent = new_agent || Agents.Agent.new(id) + Agent.update(__MODULE__, &Map.put(&1, id, agent)) + agent + + agent -> + agent + end + end + + def update_health(id, health) do + case get(id) do + nil -> + agent = %Agents.Agent{Agents.Agent.new(id) | status: :connected, health: health} + Agent.update(__MODULE__, &Map.put(&1, id, agent)) + agent + + agent -> + agent = %Agents.Agent{agent | status: :connected, health: health} + Agent.update(__MODULE__, &Map.put(&1, id, agent)) + agent + end + end +end diff --git a/app/lib/prymn/agents/supervisor.ex b/app/lib/prymn/agents/supervisor.ex deleted file mode 100644 index 51055c9..0000000 --- a/app/lib/prymn/agents/supervisor.ex +++ /dev/null @@ -1,27 +0,0 @@ -defmodule Prymn.Agents.Supervisor do - @moduledoc false - - use Supervisor - - @dynamic_supervisor Prymn.Agents.ConnectionSupervisor - - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @impl true - def init(_init_arg) do - children = [ - # The registry will be used to register `Connection` processes with their - # name as their address - {Registry, keys: :unique, name: Prymn.Agents.Registry}, - # Dynamically start `Connection` processes - {DynamicSupervisor, name: @dynamic_supervisor, strategy: :one_for_one, max_seconds: 60} - ] - - # Register a "health" table that stores in-memory any agent health data - Prymn.Agents.Health.start() - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/app/lib/prymn/application.ex b/app/lib/prymn/application.ex index 8c41187..b9914e7 100644 --- a/app/lib/prymn/application.ex +++ b/app/lib/prymn/application.ex @@ -14,7 +14,7 @@ defmodule Prymn.Application do {Phoenix.PubSub, name: Prymn.PubSub}, {Finch, name: Prymn.Finch}, {Oban, Application.fetch_env!(:prymn, Oban)}, - Prymn.Agents.Supervisor, + Prymn.Agents.Store, Prymn.Messaging.ConnectionSupervisor, {Task.Supervisor, name: Prymn.TaskSupervisor}, PrymnWeb.Endpoint diff --git a/app/lib/prymn/apps/wordpress.ex b/app/lib/prymn/apps/wordpress.ex index 30cd2e7..9cb9a6b 100644 --- a/app/lib/prymn/apps/wordpress.ex +++ b/app/lib/prymn/apps/wordpress.ex @@ -4,7 +4,6 @@ defmodule Prymn.Apps.Wordpress do import Ecto.Changeset alias Prymn.Apps.App alias Prymn.Agents - alias PrymnProto.Prymn.{ExecRequest, ExecResponse} @primary_key false embedded_schema do @@ -50,150 +49,150 @@ defmodule Prymn.Apps.Wordpress do # }) # |> Health.update_and_broadcast() - def deploy(%App{type: "wordpress"} = app, agent, notify_fun) do + def deploy(%App{type: "wordpress"} = _app, _agent, _notify_fun) do # TODO: Run sanity checks # e.g Database does not exist, domain does not exist, etc. - deploy(app, agent, notify_fun, 1) + # deploy(app, agent, notify_fun, 1) end - defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 1 = step) do - # TODO: We need a mechanism to wait for the agent to connect before proceeding, - # this is executed faster than the connection (which happens on deploy_app in Worker) + # defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 1 = step) do + # # TODO: We need a mechanism to wait for the agent to connect before proceeding, + # # this is executed faster than the connection (which happens on deploy_app in Worker) - Agents.exec(agent, %ExecRequest{ - user: "root", - program: "mysql", - args: [ - "-e", - # TODO: Sanitize the string to protect from injection - "create user '#{wp.db_user}'@'#{wp.db_host}' identified by '#{wp.db_pass}';" - ] - }) - |> then(&get_results/1) - |> case do - {_, _, exit_code} = data when exit_code != 0 -> - notify_fun.(:error, data, 1 / 5 * 100) + # Agents.exec(agent, %ExecRequest{ + # user: "root", + # program: "mysql", + # args: [ + # "-e", + # # TODO: Sanitize the string to protect from injection + # "create user '#{wp.db_user}'@'#{wp.db_host}' identified by '#{wp.db_pass}';" + # ] + # }) + # |> then(&get_results/1) + # |> case do + # {_, _, exit_code} = data when exit_code != 0 -> + # notify_fun.(:error, data, 1 / 5 * 100) - data -> - notify_fun.(:progress, data, step / @max_steps * 100) - deploy(app, agent, notify_fun, step + 1) - end - end + # data -> + # notify_fun.(:progress, data, step / @max_steps * 100) + # deploy(app, agent, notify_fun, step + 1) + # end + # end - defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 2 = step) do - Agents.exec(agent, %ExecRequest{ - user: "root", - program: "mysql", - args: [ - "-e", - # TODO: Sanitize the string to protect from injection - "grant all privileges on #{wp.db_name}.* to '#{wp.db_user}'@'#{wp.db_host}';" - ] - }) - |> then(&get_results/1) - |> case do - {_, _, exit_code} = data when exit_code != 0 -> - notify_fun.(:error, data, step / @max_steps * 100) + # defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 2 = step) do + # Agents.exec(agent, %ExecRequest{ + # user: "root", + # program: "mysql", + # args: [ + # "-e", + # # TODO: Sanitize the string to protect from injection + # "grant all privileges on #{wp.db_name}.* to '#{wp.db_user}'@'#{wp.db_host}';" + # ] + # }) + # |> then(&get_results/1) + # |> case do + # {_, _, exit_code} = data when exit_code != 0 -> + # notify_fun.(:error, data, step / @max_steps * 100) - data -> - notify_fun.(:progress, data, step / @max_steps * 100) - deploy(app, agent, notify_fun, step + 1) - end - end + # data -> + # notify_fun.(:progress, data, step / @max_steps * 100) + # deploy(app, agent, notify_fun, step + 1) + # end + # end - defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 3 = step) do - Agents.exec(agent, %ExecRequest{ - user: "vagrant", - program: "wp", - args: ["core", "download", "--path=#{wp.path}"] - }) - |> then(&get_results/1) - |> case do - {_, _, exit_code} = data when exit_code != 0 -> - notify_fun.(:error, data, step / @max_steps * 100) + # defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 3 = step) do + # Agents.exec(agent, %ExecRequest{ + # user: "vagrant", + # program: "wp", + # args: ["core", "download", "--path=#{wp.path}"] + # }) + # |> then(&get_results/1) + # |> case do + # {_, _, exit_code} = data when exit_code != 0 -> + # notify_fun.(:error, data, step / @max_steps * 100) - data -> - notify_fun.(:progress, data, step / @max_steps * 100) - deploy(app, agent, notify_fun, step + 1) - end - end + # data -> + # notify_fun.(:progress, data, step / @max_steps * 100) + # deploy(app, agent, notify_fun, step + 1) + # end + # end - defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 4 = step) do - Agents.exec(agent, %ExecRequest{ - user: "vagrant", - program: "wp", - args: [ - "config", - "create", - "--path=#{wp.path}", - "--dbhost=#{wp.db_host}", - "--dbname=#{wp.db_name}", - "--dbuser=#{wp.db_user}", - "--dbpass=#{wp.db_pass}" - ] - }) - |> then(&get_results/1) - |> case do - {_, _, exit_code} = data when exit_code != 0 -> - notify_fun.(:error, data, step / @max_steps * 100) + # defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 4 = step) do + # Agents.exec(agent, %ExecRequest{ + # user: "vagrant", + # program: "wp", + # args: [ + # "config", + # "create", + # "--path=#{wp.path}", + # "--dbhost=#{wp.db_host}", + # "--dbname=#{wp.db_name}", + # "--dbuser=#{wp.db_user}", + # "--dbpass=#{wp.db_pass}" + # ] + # }) + # |> then(&get_results/1) + # |> case do + # {_, _, exit_code} = data when exit_code != 0 -> + # notify_fun.(:error, data, step / @max_steps * 100) - data -> - notify_fun.(:progress, data, step / @max_steps * 100) - deploy(app, agent, notify_fun, step + 1) - end - end + # data -> + # notify_fun.(:progress, data, step / @max_steps * 100) + # deploy(app, agent, notify_fun, step + 1) + # end + # end - defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 5 = step) do - Agents.exec(agent, %ExecRequest{ - user: "vagrant", - program: "wp", - args: ["db", "create", "--path=#{wp.path}"] - }) - |> then(&get_results/1) - |> case do - {_, _, exit_code} = data when exit_code != 0 -> - notify_fun.(:error, data, step / @max_steps * 100) + # defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 5 = step) do + # Agents.exec(agent, %ExecRequest{ + # user: "vagrant", + # program: "wp", + # args: ["db", "create", "--path=#{wp.path}"] + # }) + # |> then(&get_results/1) + # |> case do + # {_, _, exit_code} = data when exit_code != 0 -> + # notify_fun.(:error, data, step / @max_steps * 100) - data -> - notify_fun.(:progress, data, step / @max_steps * 100) - deploy(app, agent, notify_fun, step + 1) - end - end + # data -> + # notify_fun.(:progress, data, step / @max_steps * 100) + # deploy(app, agent, notify_fun, step + 1) + # end + # end - defp deploy(%App{name: name, wordpress: %__MODULE__{} = wp}, agent, notify_fun, 6 = step) do - Agents.exec(agent, %ExecRequest{ - user: "vagrant", - program: "wp", - args: [ - "core", - "install", - "--path=#{wp.path}", - "--url=http://site.test", - "--title=#{name}", - "--admin_user=#{wp.admin_username}", - "--admin_email=#{wp.admin_email}" - ] - }) - |> then(&get_results/1) - |> case do - {_, _, exit_code} = data when exit_code != 0 -> - notify_fun.(:error, data, step / @max_steps * 100) + # defp deploy(%App{name: name, wordpress: %__MODULE__{} = wp}, agent, notify_fun, 6 = step) do + # Agents.exec(agent, %ExecRequest{ + # user: "vagrant", + # program: "wp", + # args: [ + # "core", + # "install", + # "--path=#{wp.path}", + # "--url=http://site.test", + # "--title=#{name}", + # "--admin_user=#{wp.admin_username}", + # "--admin_email=#{wp.admin_email}" + # ] + # }) + # |> then(&get_results/1) + # |> case do + # {_, _, exit_code} = data when exit_code != 0 -> + # notify_fun.(:error, data, step / @max_steps * 100) - data -> - notify_fun.(:complete, data, step / @max_steps * 100) - end - end + # data -> + # notify_fun.(:complete, data, step / @max_steps * 100) + # end + # end - defp get_results(stream) do - Enum.reduce_while(stream, {"", "", nil}, fn - {:ok, %ExecResponse{out: {:exit_code, exit_code}}}, {stdout, stderr, _} -> - {:halt, {stdout, stderr, exit_code}} + # defp get_results(stream) do + # Enum.reduce_while(stream, {"", "", nil}, fn + # {:ok, %ExecResponse{out: {:exit_code, exit_code}}}, {stdout, stderr, _} -> + # {:halt, {stdout, stderr, exit_code}} - {:ok, %ExecResponse{out: {:stdout, stdout}}}, {acc_stdout, stderr, exit_code} -> - {:cont, {acc_stdout <> stdout, stderr, exit_code}} + # {:ok, %ExecResponse{out: {:stdout, stdout}}}, {acc_stdout, stderr, exit_code} -> + # {:cont, {acc_stdout <> stdout, stderr, exit_code}} - {:ok, %ExecResponse{out: {:stderr, stderr}}}, {stdout, acc_stderr, exit_code} -> - {:cont, {stdout, acc_stderr <> stderr, exit_code}} - end) - end + # {:ok, %ExecResponse{out: {:stderr, stderr}}}, {stdout, acc_stderr, exit_code} -> + # {:cont, {stdout, acc_stderr <> stderr, exit_code}} + # end) + # end end diff --git a/app/lib/prymn/messaging/connection.ex b/app/lib/prymn/messaging/connection.ex index ea6b954..f22e886 100644 --- a/app/lib/prymn/messaging/connection.ex +++ b/app/lib/prymn/messaging/connection.ex @@ -1,9 +1,10 @@ defmodule Prymn.Messaging.Connection do - alias Prymn.Agents.Health use GenServer defstruct [:conn_pid] require Logger + alias Prymn.Agents + alias Prymn.Messaging @dialyzer {:nowarn_function, init: 1} @v1_prefix "agents.v1." @@ -18,7 +19,6 @@ defmodule Prymn.Messaging.Connection do host: "localhost", username: "prymn_admin", password: "prymn_admin", - name: "Prymn Control", auth_required: true } @@ -53,11 +53,9 @@ defmodule Prymn.Messaging.Connection do end def handle_info({:msg, %{body: body, topic: @v1_prefix <> topic}}, state) do - [agent_id, "health"] = String.split(topic, ".") - - agent_id - |> Health.new(Jason.decode!(body)) - |> Health.update_and_broadcast() + [agent_id, topic] = String.split(topic, ".") + health = Messaging.Messages.handle_message(topic, body) + Agents.update_health(agent_id, health) {:noreply, state} end diff --git a/app/lib/prymn/messaging/messages.ex b/app/lib/prymn/messaging/messages.ex new file mode 100644 index 0000000..31ed980 --- /dev/null +++ b/app/lib/prymn/messaging/messages.ex @@ -0,0 +1,15 @@ +defmodule Prymn.Messaging.Messages do + alias Prymn.Messaging.Messages.Health + require Logger + + def handle_message(topic, body) do + case topic do + "health" -> Jason.decode!(body) + _ -> Logger.warning("Received unknown topic inside the Connection server: #{topic}") + end + end +end + +defmodule Prymn.Messaging.Messages.Health do + defstruct [:cpu_status, :memory_status, :disk_status] +end diff --git a/app/lib/prymn/worker.ex b/app/lib/prymn/worker.ex index a30c5ef..a67215a 100644 --- a/app/lib/prymn/worker.ex +++ b/app/lib/prymn/worker.ex @@ -13,7 +13,7 @@ defmodule Prymn.Worker do defp deploy_app(app_id) do pid = self() app = Apps.get_app!(app_id) - agent = Agents.from_app(app) + # agent = Agents.from_app(app) Task.start_link(fn -> notify_fun = fn @@ -22,7 +22,7 @@ defmodule Prymn.Worker do :error, data, _progress -> send(pid, {:error, data}) end - Apps.Wordpress.deploy(app, agent, notify_fun) + # Apps.Wordpress.deploy(app, agent, notify_fun) end) end diff --git a/app/lib/prymn_proto/.gitkeep b/app/lib/prymn_proto/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/app/lib/prymn_web/components/system_info.ex b/app/lib/prymn_web/components/system_info.ex index 390e232..f87a84e 100644 --- a/app/lib/prymn_web/components/system_info.ex +++ b/app/lib/prymn_web/components/system_info.ex @@ -3,17 +3,17 @@ defmodule PrymnWeb.SystemInfo do require Logger alias Phoenix.LiveView.AsyncResult - alias PrymnProto.Prymn.SysInfoResponse @impl true def update(assigns, socket) do {:ok, socket |> assign(:agent, assigns.agent) - |> assign(:sys_info, AsyncResult.loading()) - |> start_async(:get_sys_info, fn -> - Prymn.Agents.get_sys_info(assigns.agent) - end)} + |> assign(:sys_info, AsyncResult.loading())} + + # |> start_async(:get_sys_info, fn -> + # Prymn.Agents.get_sys_info(assigns.agent) + # end)} end @impl true @@ -60,31 +60,32 @@ defmodule PrymnWeb.SystemInfo do """ end - def handle_async(:get_sys_info, {:ok, %SysInfoResponse{} = sys_info}, socket) do - %{sys_info: sys_info_result, agent: agent} = socket.assigns + # @impl true + # def handle_async(:get_sys_info, {:ok, %SysInfoResponse{} = sys_info}, socket) do + # %{sys_info: sys_info_result, agent: agent} = socket.assigns - {:noreply, - socket - |> assign(:sys_info, AsyncResult.ok(sys_info_result, sys_info)) - |> start_async(:get_sys_info, fn -> - # 10 seconds is >5 which is gun's timeout duration (which might have a race - # condition if they are equal) - Process.sleep(:timer.seconds(10)) - Prymn.Agents.get_sys_info(agent) - end)} - end + # {:noreply, + # socket + # |> assign(:sys_info, AsyncResult.ok(sys_info_result, sys_info)) + # |> start_async(:get_sys_info, fn -> + # # 10 seconds is >5 which is gun's timeout duration (which might have a race + # # condition if they are equal) + # Process.sleep(:timer.seconds(10)) + # Prymn.Agents.get_sys_info(agent) + # end)} + # end - def handle_async(:get_sys_info, {:ok, {:error, grpc_error}}, socket) do - %{sys_info: sys_info_result} = socket.assigns + # def handle_async(:get_sys_info, {:ok, {:error, grpc_error}}, socket) do + # %{sys_info: sys_info_result} = socket.assigns - {:noreply, - socket - |> assign(:sys_info, AsyncResult.failed(sys_info_result, grpc_error))} - end + # {:noreply, + # socket + # |> assign(:sys_info, AsyncResult.failed(sys_info_result, grpc_error))} + # end - def handle_async(:get_sys_info, {:exit, _reason}, socket) do - {:noreply, socket} - end + # def handle_async(:get_sys_info, {:exit, _reason}, socket) do + # {:noreply, socket} + # end defp calculate_cpu_usage(cpus) do (Enum.reduce(cpus, 0, fn x, acc -> x.usage + acc end) / Enum.count(cpus)) @@ -96,13 +97,14 @@ defmodule PrymnWeb.SystemInfo do end defp calculate_disk_used_percent(disks) do - alias PrymnProto.Prymn.SysInfoResponse.Disk + 0 + # alias PrymnProto.Prymn.SysInfoResponse.Disk - {used, total} = - Enum.reduce(disks, {0, 0}, fn %Disk{} = disk, {used, total} -> - {used + disk.total_bytes - disk.avail_bytes, total + disk.total_bytes} - end) + # {used, total} = + # Enum.reduce(disks, {0, 0}, fn %Disk{} = disk, {used, total} -> + # {used + disk.total_bytes - disk.avail_bytes, total + disk.total_bytes} + # end) - Float.round(100 * used / total, 2) + # Float.round(100 * used / total, 2) end end diff --git a/app/lib/prymn_web/components/terminal.ex b/app/lib/prymn_web/components/terminal.ex index 8156a0b..2b48f99 100644 --- a/app/lib/prymn_web/components/terminal.ex +++ b/app/lib/prymn_web/components/terminal.ex @@ -1,8 +1,6 @@ defmodule PrymnWeb.Terminal do use PrymnWeb, :live_component - alias PrymnProto.Prymn.TerminalRequest - @impl true def mount(socket) do {:ok, assign(socket, :open, false)} @@ -47,34 +45,34 @@ defmodule PrymnWeb.Terminal do @impl true def handle_event("open_terminal", _params, socket) do - agent = Prymn.Agents.from_server(socket.assigns.server) - pid = self() + # agent = Prymn.Agents.from_server(socket.assigns.server) + # pid = self() - Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> - # FIXME: Have to wrap this in a Task because gun sends unsolicited messages - # to calling process - stream = Prymn.Agents.terminal(agent) + # Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> + # # FIXME: Have to wrap this in a Task because gun sends unsolicited messages + # # to calling process + # stream = Prymn.Agents.terminal(agent) - {:ok, mux_pid} = - Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end) + # {:ok, mux_pid} = + # Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end) - send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true) + # send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true) - case GRPC.Stub.recv(stream, timeout: :infinity) do - {:ok, stream} -> - Enum.map(stream, fn - {:ok, %{output: data}} -> - send(mux_pid, :data) - send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data) + # case GRPC.Stub.recv(stream, timeout: :infinity) do + # {:ok, stream} -> + # Enum.map(stream, fn + # {:ok, %{output: data}} -> + # send(mux_pid, :data) + # send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data) - {:error, _err} -> - send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false) - end) + # {:error, _err} -> + # send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false) + # end) - {:error, error} -> - dbg(error) - end - end) + # {:error, error} -> + # dbg(error) + # end + # end) {:noreply, socket} end @@ -94,27 +92,27 @@ defmodule PrymnWeb.Terminal do {:noreply, socket} end - defp receive_loop(stream) do - receive do - {:data_event, data} -> - GRPC.Stub.send_request(stream, %TerminalRequest{input: data}) - receive_loop(stream) + # defp receive_loop(stream) do + # receive do + # {:data_event, data} -> + # GRPC.Stub.send_request(stream, %TerminalRequest{input: data}) + # receive_loop(stream) - {:resize_event, rows, cols} -> - GRPC.Stub.send_request(stream, %TerminalRequest{ - resize: %TerminalRequest.Resize{rows: rows, cols: cols} - }) + # {:resize_event, rows, cols} -> + # GRPC.Stub.send_request(stream, %TerminalRequest{ + # resize: %TerminalRequest.Resize{rows: rows, cols: cols} + # }) - receive_loop(stream) + # receive_loop(stream) - :data -> - receive_loop(stream) + # :data -> + # receive_loop(stream) - :close -> - GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true) - after - 120_000 -> - GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true) - end - end + # :close -> + # GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true) + # after + # 120_000 -> + # GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true) + # end + # end end diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex index 074be2f..23c7cab 100644 --- a/app/lib/prymn_web/live/server_live/index.ex +++ b/app/lib/prymn_web/live/server_live/index.ex @@ -8,19 +8,12 @@ defmodule PrymnWeb.ServerLive.Index do @impl true def mount(_params, _session, socket) do servers = Servers.list_servers() - - healths = - for %Servers.Server{status: :registered, id: id} = server <- servers, into: %{} do - # Agents.from_server(server) - # |> Agents.subscribe_to_health() - - {id, Agents.get_health(id)} - end + agents = Agents.from_servers(servers) {:ok, socket |> assign(:servers, servers) - |> assign(:healths, healths)} + |> assign(:agents, agents)} end @impl true @@ -36,27 +29,14 @@ defmodule PrymnWeb.ServerLive.Index do Connect a Server -
- <.link - :for={server <- @servers} - navigate={~p"/servers/#{server}"} - class="group block rounded-lg bg-gray-100 p-5 shadow-sm shadow-gray-300 hover:bg-black hover:text-white" - > -
+
+ <.link :for={server <- @servers} navigate={~p"/servers/#{server}"} class="group flex"> + <.status_bar agent={@agents[server.id]} /> +

<%= server.name %>

- <.server_status status={server.status} health={@healths[server.public_ip]} /> -
-
- IP: <%= server.public_ip || "N/A" %> - <%= if @healths[server.public_ip] do %> - -
In progress: <%= name %>
-
<%= task.progress %>
-
- <% end %> +
+ IP: <%= server.public_ip || "N/A" %> +
@@ -85,9 +65,9 @@ defmodule PrymnWeb.ServerLive.Index do |> update(:servers, fn servers -> [server | servers] end)} end - def handle_info(%Agents.Health{} = health, socket) do - healths = Map.put(socket.assigns.healths, health.host, health) - {:noreply, assign(socket, :healths, healths)} + def handle_info(%Agents.Agent{} = agent, socket) do + id = String.to_integer(agent.id) + {:noreply, update(socket, :agents, &Map.put(&1, id, agent))} end def handle_info(msg, state) do @@ -95,34 +75,16 @@ defmodule PrymnWeb.ServerLive.Index do {:noreply, state} end - defp server_status(assigns) do - case {assigns.status, assigns.health} do - {:unregistered, _} -> - ~H""" - Needs registration - """ + defp status_bar(assigns) do + assigns = + assign(assigns, :class, [ + "w-3 rounded-l-lg", + assigns.agent.status == :connected && "bg-teal-500", + assigns.agent.status == :disconnected && "bg-red-500" + ]) - {:registered, nil} -> - ~H""" - <.spinner size="md" /> - """ - - {:registered, %Agents.Health{status: :connected}} -> - ~H""" - Connected - """ - - {:registered, %Agents.Health{status: :disconnected}} -> - ~H""" - Disconnected - """ - - {:registered, %Agents.Health{message: message}} -> - assigns = assign(assigns, :message, message) - - ~H""" - <%= @message %> - """ - end + ~H""" +
+ """ end end diff --git a/app/lib/prymn_web/live/server_live/show.ex b/app/lib/prymn_web/live/server_live/show.ex index 2726d7d..6280da1 100644 --- a/app/lib/prymn_web/live/server_live/show.ex +++ b/app/lib/prymn_web/live/server_live/show.ex @@ -117,12 +117,12 @@ defmodule PrymnWeb.ServerLive.Show do socket end - health = Agents.get_health(server.public_ip) + # health = Agents.get_health(server.public_ip) {:noreply, socket |> assign(:page_title, server.name) - |> assign(:health, health || %{message: "Connecting...", tasks: []}) + # |> assign(:health, health || %{message: "Connecting...", tasks: []}) |> assign(:server, server) |> assign(:dry_run, false) |> assign(:update_output, []) @@ -130,21 +130,21 @@ defmodule PrymnWeb.ServerLive.Show do |> assign(:registration_command, Servers.create_setup_command(server))} end - @impl true - def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do - output = String.split(response.output, "\n") - socket = assign(socket, :update_output, output) - {:noreply, socket} - end + # @impl true + # def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do + # output = String.split(response.output, "\n") + # socket = assign(socket, :update_output, output) + # {:noreply, socket} + # end - def handle_info(%Agents.Health{host: host} = health, socket) do - socket = - if host == socket.assigns.server.public_ip, - do: assign(socket, :health, health), - else: socket + # def handle_info(%Agents.Health{host: host} = health, socket) do + # socket = + # if host == socket.assigns.server.public_ip, + # do: assign(socket, :health, health), + # else: socket - {:noreply, socket} - end + # {:noreply, socket} + # end @impl true def handle_event("system_update", _params, socket) do