make health somewhat work

This commit is contained in:
Nikos Papadakis 2024-02-01 23:54:23 +02:00
parent b4cd5642ed
commit e0850c1d2b
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
18 changed files with 327 additions and 589 deletions

View file

@ -13,10 +13,10 @@ authorization: {
password: demo_agent_password
permissions: {
publish: [
"agents.v1.demo_agent.>"
"agents.v1.1.>"
]
subscribe: [
"agents.v1.demo_agent.>"
"agents.v1.1.>"
"_INBOX_demo_agent.>"
]
}

View file

@ -20,7 +20,8 @@ async fn main() -> anyhow::Result<()> {
}
async fn run() -> anyhow::Result<()> {
let client = messaging::Client::connect("demo_agent").await?;
// TODO: Configurable client
let client = messaging::Client::connect("1").await?;
init_health_subsystem(client.clone());
tracing::info!("initialized health subsystem");

View file

@ -1,145 +1,21 @@
defmodule Prymn.Agents do
@moduledoc ~S"""
Prymn Agents are programs that manage a remote client machine. Prymn backend
communicates with them using GRPC calls. GRPC connections are started using
the Prymn.Agents.ConnectionSupervisor and are book-kept using the
Prymn.Agents.Registry.
alias Prymn.Agents
Agents are only valid when a `Prymn.Servers.Server` is considered registered.
"""
require Logger
alias Prymn.Agents.{Connection, Health, Agent}
alias PrymnProto.Prymn.Agent.Stub
alias PrymnProto.Prymn.{ExecRequest, SysUpdateRequest}
@doc """
Establish a connection with a Server if one does not already exist, and
return an Agent that interfaces with the rest of the system.
## Examples
iex> Prymn.Servers.get_server_by_ip!("127.0.0.1") |> Prymn.Agents.from_server()
%Prymn.Agents.Agent{}
"""
def from_server(%Prymn.Servers.Server{status: :registered} = server) do
case start_connection(server.public_ip) do
{:ok, _pid} -> Agent.new(server.public_ip)
{:error, error} -> {:error, error}
end
def from_server(%Prymn.Servers.Server{} = server) do
agent_id = to_string(server.id)
Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{agent_id}")
Agents.Store.get_or_default(agent_id)
end
def from_server(%Prymn.Servers.Server{}) do
Logger.error("Tried to establish a connection with an unregistered server.")
{:error, :unauthorized_action}
def from_servers(servers) when is_list(servers) do
Enum.reduce(servers, [], fn server, acc ->
[{server.id, from_server(server)} | acc]
end)
|> Map.new()
end
@doc """
Establish a connection with a Server if one does not already exist for a
given App. Returns an [Agent] that interfaces with the rest of the system.
"""
def from_app(%Prymn.Apps.App{} = app) do
app = Prymn.Repo.preload(app, :server)
from_server(app.server)
end
@doc """
Starts a new connection with `host_address` if one does not exist.
## Examples
iex> Prymn.Agents.start_connection("127.0.0.1")
{:ok, <PID:1234>}
iex> Prymn.Agents.start_connection("127.0.0.1")
{:ok, <PID:1234>}
"""
def start_connection(host_address) do
spec = {Connection, host_address}
case DynamicSupervisor.start_child(Prymn.Agents.ConnectionSupervisor, spec) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
{:error, error} -> {:error, error}
end
end
@doc """
Subscribe to the host's Health using Phoenix.PubSub.
Broadcasted messages are the Health struct:
%Prymn.Agents.Health{}
"""
def subscribe_to_health(%Agent{} = agent) do
:ok = Health.subscribe(agent.host_address)
agent
end
def subscribe_to_health(host_address) do
:ok = Health.subscribe(host_address)
end
# TODO
# def alive?(host_address) do
# end
@doc """
Return the last known health status of the Agent, or `nil` if it doesn't
exist.
"""
def get_health(host_address) do
Health.lookup(host_address)
end
@doc """
Get the system's information (CPU, Memory usage, etc.).
"""
def get_sys_info(%Agent{} = agent) do
{:error, :unimplemented}
end
@doc """
Run a command.
"""
def exec(%Agent{} = agent, %ExecRequest{} = request) do
with {:ok, channel} <- get_channel(agent),
{:ok, result} <- Stub.exec(channel, request) do
result
else
{:error, error} -> {:error, error}
end
end
def exec(%Agent{} = agent, request) when is_map(request),
do: exec(agent, struct(ExecRequest, request))
@doc """
Perform a system update.
"""
def sys_update(%Agent{} = agent, %SysUpdateRequest{} = request) do
with {:ok, channel} <- get_channel(agent),
{:ok, result} <- Stub.sys_update(channel, request) do
result
else
{:error, error} -> {:error, error}
end
end
def sys_update(%Agent{} = agent, request) when is_map(request),
do: sys_update(agent, struct(SysUpdateRequest, request))
def terminal(%Agent{} = agent) do
# TODO: Find a better solve for bi-directional GRPC stream
with {:ok, channel} <- get_channel(agent),
stream <- Stub.terminal(channel) do
stream
else
{:error, error} -> {:error, error}
end
end
defp get_channel(%Agent{} = agent) do
case start_connection(agent.host_address) do
{:ok, pid} -> {:ok, Connection.get_channel(pid)}
{:error, error} -> {:error, error}
end
def update_health(agent_id, health) when is_binary(agent_id) do
agent = Agents.Store.update_health(agent_id, health)
Phoenix.PubSub.broadcast!(Prymn.PubSub, "agent:#{agent_id}", agent)
end
end

View file

@ -1,13 +1,7 @@
defmodule Prymn.Agents.Agent do
@moduledoc false
defstruct [:id, :health, status: :disconnected]
defstruct [:host_address]
@type t :: %__MODULE__{
host_address: String.t()
}
def new(host_address) when is_binary(host_address) do
%__MODULE__{host_address: host_address}
def new(id) do
%__MODULE__{id: id}
end
end

View file

@ -1,25 +0,0 @@
defmodule Prymn.Agents.Connection do
@moduledoc false
use GenServer, restart: :transient
def start_link(host_address) do
GenServer.start_link(__MODULE__, host_address, name: via(host_address))
end
def get_channel(server) when is_pid(server) do
GenServer.call(server, :get_channel)
end
##
## Server callbacks
##
@impl true
def init(host) do
{:ok, {host, nil}}
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
end

View file

@ -5,104 +5,5 @@ defmodule Prymn.Agents.Health do
getting depleted, or if it's unable be reached.
"""
defstruct [:host, :version, :status, tasks: [], message: "Unknown"]
alias PrymnProto.Prymn.HealthResponse
@type t :: %{
host: String.t(),
version: String.t(),
status: atom(),
message: String.t()
}
def start() do
:ets.new(__MODULE__, [:set, :public, :named_table, read_concurrency: true])
end
def subscribe(host) do
Phoenix.PubSub.subscribe(Prymn.PubSub, "health:#{host}")
end
def broadcast!(%__MODULE__{host: host} = health) do
Phoenix.PubSub.broadcast!(Prymn.PubSub, "health:#{host}", health)
end
def update_and_broadcast(nil) do
nil
end
def update_and_broadcast(%__MODULE__{host: host} = health) do
:ets.insert(__MODULE__, {host, health})
broadcast!(health)
end
def delete(host_address) do
:ets.delete(__MODULE__, host_address)
end
def lookup(host_address, opts \\ []) do
default = Keyword.get(opts, :default, false)
case :ets.lookup(__MODULE__, host_address) do
[{^host_address, value}] -> value
[] when default -> %__MODULE__{host: host_address}
[] -> nil
end
end
def new(agent_id, %{"cpu_status" => cpu, "memory_status" => memory, "disk_status" => disk}) do
%__MODULE__{host: agent_id, version: "0.1.0"}
|> do_cpu(cpu)
|> do_memory(memory)
|> do_disks(disk)
end
defp do_cpu(health, cpu) do
%__MODULE__{health | message: "Connected", status: :connected}
end
defp do_memory(health, memory) do
health
end
defp do_disks(health, disks) do
health
end
def make_timed_out(%__MODULE__{} = health) do
%__MODULE__{health | status: :unreachable, message: "Connect timed out"}
end
def make_disconnected(%__MODULE__{} = health) do
%__MODULE__{health | status: :disconnected, message: "Disconnected"}
end
def make_from_proto(%HealthResponse{system: system, version: version, tasks: tasks}, host) do
%__MODULE__{host: host, status: :connected}
|> do_version(version)
|> do_system(system)
|> do_tasks(tasks)
end
defp do_version(health, version) do
%__MODULE__{health | version: version}
end
defp do_system(health, system) do
case system.status do
"normal" -> %__MODULE__{health | message: "Connected"}
status -> %__MODULE__{health | message: "Alert: #{status}"}
end
end
defp do_tasks(health, tasks) do
tasks =
Enum.map(tasks, fn {task_key, task_value} ->
progress = Float.round(task_value.progress, 2)
{task_key, %{task_value | progress: "#{progress}%"}}
end)
%__MODULE__{health | tasks: tasks}
end
# defstruct status: :disconnected
end

View file

@ -0,0 +1,44 @@
defmodule Prymn.Agents.Store do
@moduledoc false
use Agent
alias Prymn.Agents
# Stores and serves locally saved "Prymn Agents" to the application
# Not to be confused with the "Prymn Server" construct
@doc false
def start_link([]) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def get(id) do
Agent.get(__MODULE__, fn agents -> agents[id] end)
end
def get_or_default(id, new_agent \\ nil) do
case Agent.get(__MODULE__, fn agents -> agents[id] end) do
nil ->
agent = new_agent || Agents.Agent.new(id)
Agent.update(__MODULE__, &Map.put(&1, id, agent))
agent
agent ->
agent
end
end
def update_health(id, health) do
case get(id) do
nil ->
agent = %Agents.Agent{Agents.Agent.new(id) | status: :connected, health: health}
Agent.update(__MODULE__, &Map.put(&1, id, agent))
agent
agent ->
agent = %Agents.Agent{agent | status: :connected, health: health}
Agent.update(__MODULE__, &Map.put(&1, id, agent))
agent
end
end
end

View file

@ -1,27 +0,0 @@
defmodule Prymn.Agents.Supervisor do
@moduledoc false
use Supervisor
@dynamic_supervisor Prymn.Agents.ConnectionSupervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
# The registry will be used to register `Connection` processes with their
# name as their address
{Registry, keys: :unique, name: Prymn.Agents.Registry},
# Dynamically start `Connection` processes
{DynamicSupervisor, name: @dynamic_supervisor, strategy: :one_for_one, max_seconds: 60}
]
# Register a "health" table that stores in-memory any agent health data
Prymn.Agents.Health.start()
Supervisor.init(children, strategy: :one_for_one)
end
end

View file

@ -14,7 +14,7 @@ defmodule Prymn.Application do
{Phoenix.PubSub, name: Prymn.PubSub},
{Finch, name: Prymn.Finch},
{Oban, Application.fetch_env!(:prymn, Oban)},
Prymn.Agents.Supervisor,
Prymn.Agents.Store,
Prymn.Messaging.ConnectionSupervisor,
{Task.Supervisor, name: Prymn.TaskSupervisor},
PrymnWeb.Endpoint

View file

@ -4,7 +4,6 @@ defmodule Prymn.Apps.Wordpress do
import Ecto.Changeset
alias Prymn.Apps.App
alias Prymn.Agents
alias PrymnProto.Prymn.{ExecRequest, ExecResponse}
@primary_key false
embedded_schema do
@ -50,150 +49,150 @@ defmodule Prymn.Apps.Wordpress do
# })
# |> Health.update_and_broadcast()
def deploy(%App{type: "wordpress"} = app, agent, notify_fun) do
def deploy(%App{type: "wordpress"} = _app, _agent, _notify_fun) do
# TODO: Run sanity checks
# e.g Database does not exist, domain does not exist, etc.
deploy(app, agent, notify_fun, 1)
# deploy(app, agent, notify_fun, 1)
end
defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 1 = step) do
# TODO: We need a mechanism to wait for the agent to connect before proceeding,
# this is executed faster than the connection (which happens on deploy_app in Worker)
# defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 1 = step) do
# # TODO: We need a mechanism to wait for the agent to connect before proceeding,
# # this is executed faster than the connection (which happens on deploy_app in Worker)
Agents.exec(agent, %ExecRequest{
user: "root",
program: "mysql",
args: [
"-e",
# TODO: Sanitize the string to protect from injection
"create user '#{wp.db_user}'@'#{wp.db_host}' identified by '#{wp.db_pass}';"
]
})
|> then(&get_results/1)
|> case do
{_, _, exit_code} = data when exit_code != 0 ->
notify_fun.(:error, data, 1 / 5 * 100)
# Agents.exec(agent, %ExecRequest{
# user: "root",
# program: "mysql",
# args: [
# "-e",
# # TODO: Sanitize the string to protect from injection
# "create user '#{wp.db_user}'@'#{wp.db_host}' identified by '#{wp.db_pass}';"
# ]
# })
# |> then(&get_results/1)
# |> case do
# {_, _, exit_code} = data when exit_code != 0 ->
# notify_fun.(:error, data, 1 / 5 * 100)
data ->
notify_fun.(:progress, data, step / @max_steps * 100)
deploy(app, agent, notify_fun, step + 1)
end
end
# data ->
# notify_fun.(:progress, data, step / @max_steps * 100)
# deploy(app, agent, notify_fun, step + 1)
# end
# end
defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 2 = step) do
Agents.exec(agent, %ExecRequest{
user: "root",
program: "mysql",
args: [
"-e",
# TODO: Sanitize the string to protect from injection
"grant all privileges on #{wp.db_name}.* to '#{wp.db_user}'@'#{wp.db_host}';"
]
})
|> then(&get_results/1)
|> case do
{_, _, exit_code} = data when exit_code != 0 ->
notify_fun.(:error, data, step / @max_steps * 100)
# defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 2 = step) do
# Agents.exec(agent, %ExecRequest{
# user: "root",
# program: "mysql",
# args: [
# "-e",
# # TODO: Sanitize the string to protect from injection
# "grant all privileges on #{wp.db_name}.* to '#{wp.db_user}'@'#{wp.db_host}';"
# ]
# })
# |> then(&get_results/1)
# |> case do
# {_, _, exit_code} = data when exit_code != 0 ->
# notify_fun.(:error, data, step / @max_steps * 100)
data ->
notify_fun.(:progress, data, step / @max_steps * 100)
deploy(app, agent, notify_fun, step + 1)
end
end
# data ->
# notify_fun.(:progress, data, step / @max_steps * 100)
# deploy(app, agent, notify_fun, step + 1)
# end
# end
defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 3 = step) do
Agents.exec(agent, %ExecRequest{
user: "vagrant",
program: "wp",
args: ["core", "download", "--path=#{wp.path}"]
})
|> then(&get_results/1)
|> case do
{_, _, exit_code} = data when exit_code != 0 ->
notify_fun.(:error, data, step / @max_steps * 100)
# defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 3 = step) do
# Agents.exec(agent, %ExecRequest{
# user: "vagrant",
# program: "wp",
# args: ["core", "download", "--path=#{wp.path}"]
# })
# |> then(&get_results/1)
# |> case do
# {_, _, exit_code} = data when exit_code != 0 ->
# notify_fun.(:error, data, step / @max_steps * 100)
data ->
notify_fun.(:progress, data, step / @max_steps * 100)
deploy(app, agent, notify_fun, step + 1)
end
end
# data ->
# notify_fun.(:progress, data, step / @max_steps * 100)
# deploy(app, agent, notify_fun, step + 1)
# end
# end
defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 4 = step) do
Agents.exec(agent, %ExecRequest{
user: "vagrant",
program: "wp",
args: [
"config",
"create",
"--path=#{wp.path}",
"--dbhost=#{wp.db_host}",
"--dbname=#{wp.db_name}",
"--dbuser=#{wp.db_user}",
"--dbpass=#{wp.db_pass}"
]
})
|> then(&get_results/1)
|> case do
{_, _, exit_code} = data when exit_code != 0 ->
notify_fun.(:error, data, step / @max_steps * 100)
# defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 4 = step) do
# Agents.exec(agent, %ExecRequest{
# user: "vagrant",
# program: "wp",
# args: [
# "config",
# "create",
# "--path=#{wp.path}",
# "--dbhost=#{wp.db_host}",
# "--dbname=#{wp.db_name}",
# "--dbuser=#{wp.db_user}",
# "--dbpass=#{wp.db_pass}"
# ]
# })
# |> then(&get_results/1)
# |> case do
# {_, _, exit_code} = data when exit_code != 0 ->
# notify_fun.(:error, data, step / @max_steps * 100)
data ->
notify_fun.(:progress, data, step / @max_steps * 100)
deploy(app, agent, notify_fun, step + 1)
end
end
# data ->
# notify_fun.(:progress, data, step / @max_steps * 100)
# deploy(app, agent, notify_fun, step + 1)
# end
# end
defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 5 = step) do
Agents.exec(agent, %ExecRequest{
user: "vagrant",
program: "wp",
args: ["db", "create", "--path=#{wp.path}"]
})
|> then(&get_results/1)
|> case do
{_, _, exit_code} = data when exit_code != 0 ->
notify_fun.(:error, data, step / @max_steps * 100)
# defp deploy(%App{wordpress: %__MODULE__{} = wp} = app, agent, notify_fun, 5 = step) do
# Agents.exec(agent, %ExecRequest{
# user: "vagrant",
# program: "wp",
# args: ["db", "create", "--path=#{wp.path}"]
# })
# |> then(&get_results/1)
# |> case do
# {_, _, exit_code} = data when exit_code != 0 ->
# notify_fun.(:error, data, step / @max_steps * 100)
data ->
notify_fun.(:progress, data, step / @max_steps * 100)
deploy(app, agent, notify_fun, step + 1)
end
end
# data ->
# notify_fun.(:progress, data, step / @max_steps * 100)
# deploy(app, agent, notify_fun, step + 1)
# end
# end
defp deploy(%App{name: name, wordpress: %__MODULE__{} = wp}, agent, notify_fun, 6 = step) do
Agents.exec(agent, %ExecRequest{
user: "vagrant",
program: "wp",
args: [
"core",
"install",
"--path=#{wp.path}",
"--url=http://site.test",
"--title=#{name}",
"--admin_user=#{wp.admin_username}",
"--admin_email=#{wp.admin_email}"
]
})
|> then(&get_results/1)
|> case do
{_, _, exit_code} = data when exit_code != 0 ->
notify_fun.(:error, data, step / @max_steps * 100)
# defp deploy(%App{name: name, wordpress: %__MODULE__{} = wp}, agent, notify_fun, 6 = step) do
# Agents.exec(agent, %ExecRequest{
# user: "vagrant",
# program: "wp",
# args: [
# "core",
# "install",
# "--path=#{wp.path}",
# "--url=http://site.test",
# "--title=#{name}",
# "--admin_user=#{wp.admin_username}",
# "--admin_email=#{wp.admin_email}"
# ]
# })
# |> then(&get_results/1)
# |> case do
# {_, _, exit_code} = data when exit_code != 0 ->
# notify_fun.(:error, data, step / @max_steps * 100)
data ->
notify_fun.(:complete, data, step / @max_steps * 100)
end
end
# data ->
# notify_fun.(:complete, data, step / @max_steps * 100)
# end
# end
defp get_results(stream) do
Enum.reduce_while(stream, {"", "", nil}, fn
{:ok, %ExecResponse{out: {:exit_code, exit_code}}}, {stdout, stderr, _} ->
{:halt, {stdout, stderr, exit_code}}
# defp get_results(stream) do
# Enum.reduce_while(stream, {"", "", nil}, fn
# {:ok, %ExecResponse{out: {:exit_code, exit_code}}}, {stdout, stderr, _} ->
# {:halt, {stdout, stderr, exit_code}}
{:ok, %ExecResponse{out: {:stdout, stdout}}}, {acc_stdout, stderr, exit_code} ->
{:cont, {acc_stdout <> stdout, stderr, exit_code}}
# {:ok, %ExecResponse{out: {:stdout, stdout}}}, {acc_stdout, stderr, exit_code} ->
# {:cont, {acc_stdout <> stdout, stderr, exit_code}}
{:ok, %ExecResponse{out: {:stderr, stderr}}}, {stdout, acc_stderr, exit_code} ->
{:cont, {stdout, acc_stderr <> stderr, exit_code}}
end)
end
# {:ok, %ExecResponse{out: {:stderr, stderr}}}, {stdout, acc_stderr, exit_code} ->
# {:cont, {stdout, acc_stderr <> stderr, exit_code}}
# end)
# end
end

View file

@ -1,9 +1,10 @@
defmodule Prymn.Messaging.Connection do
alias Prymn.Agents.Health
use GenServer
defstruct [:conn_pid]
require Logger
alias Prymn.Agents
alias Prymn.Messaging
@dialyzer {:nowarn_function, init: 1}
@v1_prefix "agents.v1."
@ -18,7 +19,6 @@ defmodule Prymn.Messaging.Connection do
host: "localhost",
username: "prymn_admin",
password: "prymn_admin",
name: "Prymn Control",
auth_required: true
}
@ -53,11 +53,9 @@ defmodule Prymn.Messaging.Connection do
end
def handle_info({:msg, %{body: body, topic: @v1_prefix <> topic}}, state) do
[agent_id, "health"] = String.split(topic, ".")
agent_id
|> Health.new(Jason.decode!(body))
|> Health.update_and_broadcast()
[agent_id, topic] = String.split(topic, ".")
health = Messaging.Messages.handle_message(topic, body)
Agents.update_health(agent_id, health)
{:noreply, state}
end

View file

@ -0,0 +1,15 @@
defmodule Prymn.Messaging.Messages do
alias Prymn.Messaging.Messages.Health
require Logger
def handle_message(topic, body) do
case topic do
"health" -> Jason.decode!(body)
_ -> Logger.warning("Received unknown topic inside the Connection server: #{topic}")
end
end
end
defmodule Prymn.Messaging.Messages.Health do
defstruct [:cpu_status, :memory_status, :disk_status]
end

View file

@ -13,7 +13,7 @@ defmodule Prymn.Worker do
defp deploy_app(app_id) do
pid = self()
app = Apps.get_app!(app_id)
agent = Agents.from_app(app)
# agent = Agents.from_app(app)
Task.start_link(fn ->
notify_fun = fn
@ -22,7 +22,7 @@ defmodule Prymn.Worker do
:error, data, _progress -> send(pid, {:error, data})
end
Apps.Wordpress.deploy(app, agent, notify_fun)
# Apps.Wordpress.deploy(app, agent, notify_fun)
end)
end

View file

@ -3,17 +3,17 @@ defmodule PrymnWeb.SystemInfo do
require Logger
alias Phoenix.LiveView.AsyncResult
alias PrymnProto.Prymn.SysInfoResponse
@impl true
def update(assigns, socket) do
{:ok,
socket
|> assign(:agent, assigns.agent)
|> assign(:sys_info, AsyncResult.loading())
|> start_async(:get_sys_info, fn ->
Prymn.Agents.get_sys_info(assigns.agent)
end)}
|> assign(:sys_info, AsyncResult.loading())}
# |> start_async(:get_sys_info, fn ->
# Prymn.Agents.get_sys_info(assigns.agent)
# end)}
end
@impl true
@ -60,31 +60,32 @@ defmodule PrymnWeb.SystemInfo do
"""
end
def handle_async(:get_sys_info, {:ok, %SysInfoResponse{} = sys_info}, socket) do
%{sys_info: sys_info_result, agent: agent} = socket.assigns
# @impl true
# def handle_async(:get_sys_info, {:ok, %SysInfoResponse{} = sys_info}, socket) do
# %{sys_info: sys_info_result, agent: agent} = socket.assigns
{:noreply,
socket
|> assign(:sys_info, AsyncResult.ok(sys_info_result, sys_info))
|> start_async(:get_sys_info, fn ->
# 10 seconds is >5 which is gun's timeout duration (which might have a race
# condition if they are equal)
Process.sleep(:timer.seconds(10))
Prymn.Agents.get_sys_info(agent)
end)}
end
# {:noreply,
# socket
# |> assign(:sys_info, AsyncResult.ok(sys_info_result, sys_info))
# |> start_async(:get_sys_info, fn ->
# # 10 seconds is >5 which is gun's timeout duration (which might have a race
# # condition if they are equal)
# Process.sleep(:timer.seconds(10))
# Prymn.Agents.get_sys_info(agent)
# end)}
# end
def handle_async(:get_sys_info, {:ok, {:error, grpc_error}}, socket) do
%{sys_info: sys_info_result} = socket.assigns
# def handle_async(:get_sys_info, {:ok, {:error, grpc_error}}, socket) do
# %{sys_info: sys_info_result} = socket.assigns
{:noreply,
socket
|> assign(:sys_info, AsyncResult.failed(sys_info_result, grpc_error))}
end
# {:noreply,
# socket
# |> assign(:sys_info, AsyncResult.failed(sys_info_result, grpc_error))}
# end
def handle_async(:get_sys_info, {:exit, _reason}, socket) do
{:noreply, socket}
end
# def handle_async(:get_sys_info, {:exit, _reason}, socket) do
# {:noreply, socket}
# end
defp calculate_cpu_usage(cpus) do
(Enum.reduce(cpus, 0, fn x, acc -> x.usage + acc end) / Enum.count(cpus))
@ -96,13 +97,14 @@ defmodule PrymnWeb.SystemInfo do
end
defp calculate_disk_used_percent(disks) do
alias PrymnProto.Prymn.SysInfoResponse.Disk
0
# alias PrymnProto.Prymn.SysInfoResponse.Disk
{used, total} =
Enum.reduce(disks, {0, 0}, fn %Disk{} = disk, {used, total} ->
{used + disk.total_bytes - disk.avail_bytes, total + disk.total_bytes}
end)
# {used, total} =
# Enum.reduce(disks, {0, 0}, fn %Disk{} = disk, {used, total} ->
# {used + disk.total_bytes - disk.avail_bytes, total + disk.total_bytes}
# end)
Float.round(100 * used / total, 2)
# Float.round(100 * used / total, 2)
end
end

View file

@ -1,8 +1,6 @@
defmodule PrymnWeb.Terminal do
use PrymnWeb, :live_component
alias PrymnProto.Prymn.TerminalRequest
@impl true
def mount(socket) do
{:ok, assign(socket, :open, false)}
@ -47,34 +45,34 @@ defmodule PrymnWeb.Terminal do
@impl true
def handle_event("open_terminal", _params, socket) do
agent = Prymn.Agents.from_server(socket.assigns.server)
pid = self()
# agent = Prymn.Agents.from_server(socket.assigns.server)
# pid = self()
Task.Supervisor.start_child(Prymn.TaskSupervisor, fn ->
# FIXME: Have to wrap this in a Task because gun sends unsolicited messages
# to calling process
stream = Prymn.Agents.terminal(agent)
# Task.Supervisor.start_child(Prymn.TaskSupervisor, fn ->
# # FIXME: Have to wrap this in a Task because gun sends unsolicited messages
# # to calling process
# stream = Prymn.Agents.terminal(agent)
{:ok, mux_pid} =
Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end)
# {:ok, mux_pid} =
# Task.Supervisor.start_child(Prymn.TaskSupervisor, fn -> receive_loop(stream) end)
send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true)
# send_update(pid, PrymnWeb.Terminal, id: "terminal", mux_pid: mux_pid, open: true)
case GRPC.Stub.recv(stream, timeout: :infinity) do
{:ok, stream} ->
Enum.map(stream, fn
{:ok, %{output: data}} ->
send(mux_pid, :data)
send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data)
# case GRPC.Stub.recv(stream, timeout: :infinity) do
# {:ok, stream} ->
# Enum.map(stream, fn
# {:ok, %{output: data}} ->
# send(mux_pid, :data)
# send_update(pid, PrymnWeb.Terminal, id: "terminal", data: data)
{:error, _err} ->
send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false)
end)
# {:error, _err} ->
# send_update(pid, PrymnWeb.Terminal, id: "terminal", open: false)
# end)
{:error, error} ->
dbg(error)
end
end)
# {:error, error} ->
# dbg(error)
# end
# end)
{:noreply, socket}
end
@ -94,27 +92,27 @@ defmodule PrymnWeb.Terminal do
{:noreply, socket}
end
defp receive_loop(stream) do
receive do
{:data_event, data} ->
GRPC.Stub.send_request(stream, %TerminalRequest{input: data})
receive_loop(stream)
# defp receive_loop(stream) do
# receive do
# {:data_event, data} ->
# GRPC.Stub.send_request(stream, %TerminalRequest{input: data})
# receive_loop(stream)
{:resize_event, rows, cols} ->
GRPC.Stub.send_request(stream, %TerminalRequest{
resize: %TerminalRequest.Resize{rows: rows, cols: cols}
})
# {:resize_event, rows, cols} ->
# GRPC.Stub.send_request(stream, %TerminalRequest{
# resize: %TerminalRequest.Resize{rows: rows, cols: cols}
# })
receive_loop(stream)
# receive_loop(stream)
:data ->
receive_loop(stream)
# :data ->
# receive_loop(stream)
:close ->
GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true)
after
120_000 ->
GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true)
end
end
# :close ->
# GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true)
# after
# 120_000 ->
# GRPC.Stub.send_request(stream, %TerminalRequest{input: ""}, end_stream: true)
# end
# end
end

View file

@ -8,19 +8,12 @@ defmodule PrymnWeb.ServerLive.Index do
@impl true
def mount(_params, _session, socket) do
servers = Servers.list_servers()
healths =
for %Servers.Server{status: :registered, id: id} = server <- servers, into: %{} do
# Agents.from_server(server)
# |> Agents.subscribe_to_health()
{id, Agents.get_health(id)}
end
agents = Agents.from_servers(servers)
{:ok,
socket
|> assign(:servers, servers)
|> assign(:healths, healths)}
|> assign(:agents, agents)}
end
@impl true
@ -36,27 +29,14 @@ defmodule PrymnWeb.ServerLive.Index do
<Button.primary patch={~p"/servers/new"}>Connect a Server</Button.primary>
</:actions>
</.header>
<div class="space-y-5" phx-update="replace" id="servers">
<.link
:for={server <- @servers}
navigate={~p"/servers/#{server}"}
class="group block rounded-lg bg-gray-100 p-5 shadow-sm shadow-gray-300 hover:bg-black hover:text-white"
>
<div class="flex flex-row flex-wrap justify-between">
<div class="mt-10 space-y-5" phx-update="replace" id="servers">
<.link :for={server <- @servers} navigate={~p"/servers/#{server}"} class="group flex">
<.status_bar agent={@agents[server.id]} />
<div class="flex-1 rounded-r-lg bg-gray-100 p-5 transition-colors group-hover:bg-black group-hover:text-white">
<h2 class="text-xl"><%= server.name %></h2>
<.server_status status={server.status} health={@healths[server.public_ip]} />
</div>
<div class="flex flex-row flex-wrap justify-between lg:text-sm">
<span>IP: <%= server.public_ip || "N/A" %></span>
<%= if @healths[server.public_ip] do %>
<span
:for={{name, task} <- @healths[server.public_ip].tasks || []}
class="text-right text-xs text-slate-700"
>
<div>In progress: <%= name %></div>
<div><%= task.progress %></div>
</span>
<% end %>
</div>
</div>
</.link>
</div>
@ -85,9 +65,9 @@ defmodule PrymnWeb.ServerLive.Index do
|> update(:servers, fn servers -> [server | servers] end)}
end
def handle_info(%Agents.Health{} = health, socket) do
healths = Map.put(socket.assigns.healths, health.host, health)
{:noreply, assign(socket, :healths, healths)}
def handle_info(%Agents.Agent{} = agent, socket) do
id = String.to_integer(agent.id)
{:noreply, update(socket, :agents, &Map.put(&1, id, agent))}
end
def handle_info(msg, state) do
@ -95,34 +75,16 @@ defmodule PrymnWeb.ServerLive.Index do
{:noreply, state}
end
defp server_status(assigns) do
case {assigns.status, assigns.health} do
{:unregistered, _} ->
~H"""
<span class="self-center text-sm text-gray-500">Needs registration</span>
"""
{:registered, nil} ->
~H"""
<.spinner size="md" />
"""
{:registered, %Agents.Health{status: :connected}} ->
~H"""
<span class="self-center text-sm text-green-600">Connected</span>
"""
{:registered, %Agents.Health{status: :disconnected}} ->
~H"""
<span class="self-center text-sm text-red-600">Disconnected</span>
"""
{:registered, %Agents.Health{message: message}} ->
assigns = assign(assigns, :message, message)
defp status_bar(assigns) do
assigns =
assign(assigns, :class, [
"w-3 rounded-l-lg",
assigns.agent.status == :connected && "bg-teal-500",
assigns.agent.status == :disconnected && "bg-red-500"
])
~H"""
<span class="self-center text-sm text-yellow-900"><%= @message %></span>
<div class={@class}></div>
"""
end
end
end

View file

@ -117,12 +117,12 @@ defmodule PrymnWeb.ServerLive.Show do
socket
end
health = Agents.get_health(server.public_ip)
# health = Agents.get_health(server.public_ip)
{:noreply,
socket
|> assign(:page_title, server.name)
|> assign(:health, health || %{message: "Connecting...", tasks: []})
# |> assign(:health, health || %{message: "Connecting...", tasks: []})
|> assign(:server, server)
|> assign(:dry_run, false)
|> assign(:update_output, [])
@ -130,21 +130,21 @@ defmodule PrymnWeb.ServerLive.Show do
|> assign(:registration_command, Servers.create_setup_command(server))}
end
@impl true
def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do
output = String.split(response.output, "\n")
socket = assign(socket, :update_output, output)
{:noreply, socket}
end
# @impl true
# def handle_info(%PrymnProto.Prymn.SysUpdateResponse{} = response, socket) do
# output = String.split(response.output, "\n")
# socket = assign(socket, :update_output, output)
# {:noreply, socket}
# end
def handle_info(%Agents.Health{host: host} = health, socket) do
socket =
if host == socket.assigns.server.public_ip,
do: assign(socket, :health, health),
else: socket
# def handle_info(%Agents.Health{host: host} = health, socket) do
# socket =
# if host == socket.assigns.server.public_ip,
# do: assign(socket, :health, health),
# else: socket
{:noreply, socket}
end
# {:noreply, socket}
# end
@impl true
def handle_event("system_update", _params, socket) do