diff --git a/app/lib/prymn/agents.ex b/app/lib/prymn/agents.ex index caaf190..4e42fd5 100644 --- a/app/lib/prymn/agents.ex +++ b/app/lib/prymn/agents.ex @@ -18,8 +18,8 @@ defmodule Prymn.Agents do Returns `{:error, reason}` when the connection could not be established """ @spec ensure_connection(String.t()) :: :ok | {:error, term} - def ensure_connection(address) do - child = {Prymn.Agents.Connection, address} + def ensure_connection(public_ip) do + child = {Prymn.Agents.Connection, public_ip} case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do {:ok, _pid} -> :ok diff --git a/app/lib/prymn/agents/connection.ex b/app/lib/prymn/agents/connection.ex index 4ef9f8a..5da6d29 100644 --- a/app/lib/prymn/agents/connection.ex +++ b/app/lib/prymn/agents/connection.ex @@ -6,11 +6,12 @@ defmodule Prymn.Agents.Connection do defstruct [:channel, up?: false] - @ping_interval 20000 - - use GenServer, restart: :transient + @healthcheck_inverval 20000 require Logger + alias PrymnProto.Prymn.Agent.Stub, as: Grpc + + use GenServer, restart: :transient @spec start_link(String.t()) :: GenServer.on_start() def start_link(addr) do @@ -28,17 +29,16 @@ defmodule Prymn.Agents.Connection do end @impl true - def init(addr) do - case GRPC.Stub.connect(addr) do + def init(public_ip) do + case GRPC.Stub.connect("#{public_ip}:50012") do {:ok, channel} -> - Logger.info("Starting new connection at address #{addr}") + send(self(), :do_healthcheck) - state = %__MODULE__{channel: channel, up?: true} - - Process.send_after(self(), :do_healthcheck, @ping_interval) - {:ok, state} + {:ok, %__MODULE__{channel: channel, up?: true}} {:error, error} -> + broadcast_healthcheck!(:down, public_ip) + {:stop, error} end end @@ -50,50 +50,41 @@ defmodule Prymn.Agents.Connection do @impl true def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do - Logger.info("[Agent] #{state.channel.host} regained connection") - - Phoenix.PubSub.broadcast!( - Prymn.PubSub, - "agent:#{channel.host}", - {:healthcheck, :up, channel.host} - ) - + broadcast_healthcheck!(:up, channel.host) {:noreply, %{state | up?: true}} end @impl true - def handle_info({:gun_down, _pid, _proto, reason, _}, %{channel: channel} = state) do - Logger.info("[Agent] #{channel.host} lost connection, reason: #{reason}") - - Phoenix.PubSub.broadcast!( - Prymn.PubSub, - "agent:#{channel.host}", - {:healthcheck, :down, channel.host} - ) - + def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do + broadcast_healthcheck!(:down, channel.host) {:noreply, %{state | up?: false}} end @impl true - def handle_info(:do_healthcheck, %{channel: channel} = state) do + def handle_info(:do_healthcheck, %{channel: channel, up?: up?} = state) do request = %PrymnProto.Prymn.EchoRequest{message: "hello"} - case PrymnProto.Prymn.Agent.Stub.echo(channel, request) do - {:ok, _reply} -> - :noop + if up? do + case Grpc.echo(channel, request) do + {:ok, _reply} -> + broadcast_healthcheck!(:up, channel.host) - {:error, error} -> - Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}") + {:error, error} -> + Logger.warning( + "healthcheck error for server #{channel.host}, reason: #{inspect(error)}" + ) + end + else + broadcast_healthcheck!(:down, channel.host) end - Process.send_after(self(), :do_healthcheck, @ping_interval) + Process.send_after(self(), :do_healthcheck, @healthcheck_inverval) {:noreply, state} end @impl true def handle_info(msg, state) do - Logger.info("received unexpected message: #{inspect(msg)}") - + Logger.debug("received unexpected message: #{inspect(msg)}") {:noreply, state} end @@ -105,4 +96,12 @@ defmodule Prymn.Agents.Connection do defp via(name) do {:via, Registry, {Prymn.Agents.Registry, name}} end + + defp broadcast_healthcheck!(msg, ip_address) do + Phoenix.PubSub.broadcast!( + Prymn.PubSub, + "agent:#{ip_address}", + {:healthcheck, ip_address, msg} + ) + end end diff --git a/app/lib/prymn/servers.ex b/app/lib/prymn/servers.ex index d1be2f0..de6b7f8 100644 --- a/app/lib/prymn/servers.ex +++ b/app/lib/prymn/servers.ex @@ -18,7 +18,7 @@ defmodule Prymn.Servers do """ def list_servers do - Repo.all(Server) + Repo.all(Server |> order_by(desc: :inserted_at)) end @doc """ diff --git a/app/lib/prymn_web/components/core_components.ex b/app/lib/prymn_web/components/core_components.ex index c3830e6..03f4fb1 100644 --- a/app/lib/prymn_web/components/core_components.ex +++ b/app/lib/prymn_web/components/core_components.ex @@ -436,8 +436,6 @@ defmodule PrymnWeb.CoreComponents do attr :rows, :list, required: true attr :row_id, :any, default: nil, doc: "the function for generating the row id" attr :row_click, :any, default: nil, doc: "the function for handling phx-click on each row" - attr :row_indicator, :any, default: nil - attr :indicator_label, :string, default: "Indicator" attr :row_item, :any, default: &Function.identity/1, @@ -460,11 +458,16 @@ defmodule PrymnWeb.CoreComponents do
- <%= @indicator_label %>
- |
<%= col[:label] %> | -<%= gettext("Actions") %> | ++ + <%= if @gettext do %> + <%= gettext("Actions") %> + <% else %> + Actions + <% end %> + + |
---|---|---|---|
-
-
- <%= @row_indicator.(row) %>
-
- |
+
<%= render_slot(col, @row_item.(row)) %>
diff --git a/app/lib/prymn_web/live/server_live/index.ex b/app/lib/prymn_web/live/server_live/index.ex
index 9ce2562..8d7f9ac 100644
--- a/app/lib/prymn_web/live/server_live/index.ex
+++ b/app/lib/prymn_web/live/server_live/index.ex
@@ -1,25 +1,26 @@
defmodule PrymnWeb.ServerLive.Index do
- alias Prymn.Servers
+ require Logger
+ alias Prymn.{Servers, Agents}
use PrymnWeb, :live_view
@impl true
def mount(_params, _session, socket) do
servers = Servers.list_servers()
- pid = self()
+ # pid = self()
- for %{public_ip: public_ip} <- servers, public_ip != nil do
- :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{public_ip}")
+ for %Servers.Server{status: :registered, public_ip: ip} <- servers do
+ :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{ip}")
Task.start_link(fn ->
- case Prymn.Agents.ensure_connection("#{public_ip}:50012") do
- :ok -> Process.send(pid, {:healthcheck, :up, public_ip}, [])
- {:error, _error} -> Process.send(pid, {:healthcheck, :down, public_ip}, [])
+ case Agents.ensure_connection(ip) do
+ :ok -> IO.puts("Ok")
+ _ -> IO.puts("not ok")
end
end)
end
- {:ok, stream(socket, :servers, servers)}
+ {:ok, assign(socket, :servers, servers)}
end
@impl true
@@ -39,29 +40,34 @@ defmodule PrymnWeb.ServerLive.Index do
@impl true
def handle_info({:connect, %Servers.Server{} = server}, socket) do
- {:noreply, stream_insert(socket, :servers, server)}
- end
-
- @impl true
- def handle_info({:healthcheck, status, ip}, socket) do
- server = Servers.get_server_by_ip!(ip)
-
- status =
- case status do
- :up -> "Connected"
- :down -> "Disconnected"
- end
-
{:noreply,
socket
- |> stream_insert(:servers, Map.put(server, :connection_status, status))}
+ |> update(:servers, fn servers -> [server | servers] end)}
end
@impl true
- def handle_event("delete", %{"id" => id}, socket) do
- server = Servers.get_server!(id)
- {:ok, _} = Servers.delete_server(server)
+ def handle_info({:healthcheck, ip, _message}, socket) do
+ servers =
+ update_in(
+ socket.assigns.servers,
+ [Access.filter(&match?(%{public_ip: ^ip}, &1))],
+ &Map.merge(&1, %{connection_status: "Connected"})
+ )
- {:noreply, stream_delete(socket, :servers, server)}
+ {:noreply, assign(socket, :servers, servers)}
end
+
+ @impl true
+ def handle_info(msg, state) do
+ Logger.debug("received unexpected message #{inspect(msg)}")
+ {:noreply, state}
+ end
+
+ # @impl true
+ # def handle_event("delete", %{"id" => id}, socket) do
+ # server = Servers.get_server!(id)
+ # {:ok, _} = Servers.delete_server(server)
+
+ # {:noreply, stream_delete(socket, :servers, server)}
+ # end
end
diff --git a/app/lib/prymn_web/live/server_live/index.html.heex b/app/lib/prymn_web/live/server_live/index.html.heex
index cce1c2a..61c8e28 100644
--- a/app/lib/prymn_web/live/server_live/index.html.heex
+++ b/app/lib/prymn_web/live/server_live/index.html.heex
@@ -1,5 +1,8 @@
<.header>
- All available servers to you
+ Your servers
+
+ <%= "#{Enum.count(@servers)} servers" %>
+
<:actions>
<.link patch={~p"/servers/new"}>
<.button>Connect a Server
@@ -7,38 +10,27 @@
-<.table
- id="servers"
- rows={@streams.servers}
- row_click={fn {_id, server} -> JS.navigate(~p"/servers/#{server}") end}
- row_indicator={
- fn
- {_id, %Servers.Server{status: :unregistered}} ->
- ~H(Awaiting registration)
-
- {_id, %Servers.Server{connection_status: nil, status: :registered}} ->
- ~H(Connecting...)
-
- {_id, %Servers.Server{connection_status: "Connected"}} ->
- ~H(Connected)
-
- {_id, %Servers.Server{connection_status: "Disconnected"}} ->
- ~H(Disconnected)
- end
- }
- indicator_label="Status"
->
- <:col :let={{_id, server}} label="Name"><%= server.name %>
- <:col :let={{_id, server}} label="IP"><%= server.public_ip || "N/A" %>
- <:action :let={{id, server}}>
- <.link
- phx-click={JS.push("delete", value: %{id: server.id}) |> hide("##{id}")}
- data-confirm="Are you sure?"
- >
- Delete
-
-
-
+
+ <.link
+ :for={server <- @servers}
+ navigate={~p"/servers/#{server}"}
+ class="group block rounded-lg bg-gray-100 p-5 shadow-sm shadow-gray-300 hover:bg-black hover:text-white"
+ >
+
<.modal :if={@live_action == :new} id="server-modal" show on_cancel={JS.patch(~p"/servers")}>
<.live_component module={PrymnWeb.ServerLive.NewServer} id={:new} patch={~p"/servers"} />
+
+ <%= server.name %>+ + <%= if server.status == :registered do %> + <%= server.connection_status || "..." %> + <% else %> + Not registered + <% end %> + +
+ IP: <%= server.public_ip || "N/A" %>
+
+
+ |