app: use normal assigns instead of streams for the server list
This commit is contained in:
		
							parent
							
								
									dbdc7e0d80
								
							
						
					
					
						commit
						90ccdedd7b
					
				
					 6 changed files with 105 additions and 113 deletions
				
			
		|  | @ -18,8 +18,8 @@ defmodule Prymn.Agents do | ||||||
|   Returns `{:error, reason}` when the connection could not be established |   Returns `{:error, reason}` when the connection could not be established | ||||||
|   """ |   """ | ||||||
|   @spec ensure_connection(String.t()) :: :ok | {:error, term} |   @spec ensure_connection(String.t()) :: :ok | {:error, term} | ||||||
|   def ensure_connection(address) do |   def ensure_connection(public_ip) do | ||||||
|     child = {Prymn.Agents.Connection, address} |     child = {Prymn.Agents.Connection, public_ip} | ||||||
| 
 | 
 | ||||||
|     case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do |     case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do | ||||||
|       {:ok, _pid} -> :ok |       {:ok, _pid} -> :ok | ||||||
|  |  | ||||||
|  | @ -6,11 +6,12 @@ defmodule Prymn.Agents.Connection do | ||||||
| 
 | 
 | ||||||
|   defstruct [:channel, up?: false] |   defstruct [:channel, up?: false] | ||||||
| 
 | 
 | ||||||
|   @ping_interval 20000 |   @healthcheck_inverval 20000 | ||||||
| 
 |  | ||||||
|   use GenServer, restart: :transient |  | ||||||
| 
 | 
 | ||||||
|   require Logger |   require Logger | ||||||
|  |   alias PrymnProto.Prymn.Agent.Stub, as: Grpc | ||||||
|  | 
 | ||||||
|  |   use GenServer, restart: :transient | ||||||
| 
 | 
 | ||||||
|   @spec start_link(String.t()) :: GenServer.on_start() |   @spec start_link(String.t()) :: GenServer.on_start() | ||||||
|   def start_link(addr) do |   def start_link(addr) do | ||||||
|  | @ -28,17 +29,16 @@ defmodule Prymn.Agents.Connection do | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def init(addr) do |   def init(public_ip) do | ||||||
|     case GRPC.Stub.connect(addr) do |     case GRPC.Stub.connect("#{public_ip}:50012") do | ||||||
|       {:ok, channel} -> |       {:ok, channel} -> | ||||||
|         Logger.info("Starting new connection at address #{addr}") |         send(self(), :do_healthcheck) | ||||||
| 
 | 
 | ||||||
|         state = %__MODULE__{channel: channel, up?: true} |         {:ok, %__MODULE__{channel: channel, up?: true}} | ||||||
| 
 |  | ||||||
|         Process.send_after(self(), :do_healthcheck, @ping_interval) |  | ||||||
|         {:ok, state} |  | ||||||
| 
 | 
 | ||||||
|       {:error, error} -> |       {:error, error} -> | ||||||
|  |         broadcast_healthcheck!(:down, public_ip) | ||||||
|  | 
 | ||||||
|         {:stop, error} |         {:stop, error} | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  | @ -50,50 +50,41 @@ defmodule Prymn.Agents.Connection do | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do |   def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do | ||||||
|     Logger.info("[Agent] #{state.channel.host} regained connection") |     broadcast_healthcheck!(:up, channel.host) | ||||||
| 
 |  | ||||||
|     Phoenix.PubSub.broadcast!( |  | ||||||
|       Prymn.PubSub, |  | ||||||
|       "agent:#{channel.host}", |  | ||||||
|       {:healthcheck, :up, channel.host} |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
|     {:noreply, %{state | up?: true}} |     {:noreply, %{state | up?: true}} | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def handle_info({:gun_down, _pid, _proto, reason, _}, %{channel: channel} = state) do |   def handle_info({:gun_down, _pid, _proto, _reason, _}, %{channel: channel} = state) do | ||||||
|     Logger.info("[Agent] #{channel.host} lost connection, reason: #{reason}") |     broadcast_healthcheck!(:down, channel.host) | ||||||
| 
 |  | ||||||
|     Phoenix.PubSub.broadcast!( |  | ||||||
|       Prymn.PubSub, |  | ||||||
|       "agent:#{channel.host}", |  | ||||||
|       {:healthcheck, :down, channel.host} |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
|     {:noreply, %{state | up?: false}} |     {:noreply, %{state | up?: false}} | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def handle_info(:do_healthcheck, %{channel: channel} = state) do |   def handle_info(:do_healthcheck, %{channel: channel, up?: up?} = state) do | ||||||
|     request = %PrymnProto.Prymn.EchoRequest{message: "hello"} |     request = %PrymnProto.Prymn.EchoRequest{message: "hello"} | ||||||
| 
 | 
 | ||||||
|     case PrymnProto.Prymn.Agent.Stub.echo(channel, request) do |     if up? do | ||||||
|       {:ok, _reply} -> |       case Grpc.echo(channel, request) do | ||||||
|         :noop |         {:ok, _reply} -> | ||||||
|  |           broadcast_healthcheck!(:up, channel.host) | ||||||
| 
 | 
 | ||||||
|       {:error, error} -> |         {:error, error} -> | ||||||
|         Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}") |           Logger.warning( | ||||||
|  |             "healthcheck error for server #{channel.host}, reason: #{inspect(error)}" | ||||||
|  |           ) | ||||||
|  |       end | ||||||
|  |     else | ||||||
|  |       broadcast_healthcheck!(:down, channel.host) | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     Process.send_after(self(), :do_healthcheck, @ping_interval) |     Process.send_after(self(), :do_healthcheck, @healthcheck_inverval) | ||||||
|     {:noreply, state} |     {:noreply, state} | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def handle_info(msg, state) do |   def handle_info(msg, state) do | ||||||
|     Logger.info("received unexpected message: #{inspect(msg)}") |     Logger.debug("received unexpected message: #{inspect(msg)}") | ||||||
| 
 |  | ||||||
|     {:noreply, state} |     {:noreply, state} | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|  | @ -105,4 +96,12 @@ defmodule Prymn.Agents.Connection do | ||||||
|   defp via(name) do |   defp via(name) do | ||||||
|     {:via, Registry, {Prymn.Agents.Registry, name}} |     {:via, Registry, {Prymn.Agents.Registry, name}} | ||||||
|   end |   end | ||||||
|  | 
 | ||||||
|  |   defp broadcast_healthcheck!(msg, ip_address) do | ||||||
|  |     Phoenix.PubSub.broadcast!( | ||||||
|  |       Prymn.PubSub, | ||||||
|  |       "agent:#{ip_address}", | ||||||
|  |       {:healthcheck, ip_address, msg} | ||||||
|  |     ) | ||||||
|  |   end | ||||||
| end | end | ||||||
|  |  | ||||||
|  | @ -18,7 +18,7 @@ defmodule Prymn.Servers do | ||||||
| 
 | 
 | ||||||
|   """ |   """ | ||||||
|   def list_servers do |   def list_servers do | ||||||
|     Repo.all(Server) |     Repo.all(Server |> order_by(desc: :inserted_at)) | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @doc """ |   @doc """ | ||||||
|  |  | ||||||
|  | @ -436,8 +436,6 @@ defmodule PrymnWeb.CoreComponents do | ||||||
|   attr :rows, :list, required: true |   attr :rows, :list, required: true | ||||||
|   attr :row_id, :any, default: nil, doc: "the function for generating the row id" |   attr :row_id, :any, default: nil, doc: "the function for generating the row id" | ||||||
|   attr :row_click, :any, default: nil, doc: "the function for handling phx-click on each row" |   attr :row_click, :any, default: nil, doc: "the function for handling phx-click on each row" | ||||||
|   attr :row_indicator, :any, default: nil |  | ||||||
|   attr :indicator_label, :string, default: "Indicator" |  | ||||||
| 
 | 
 | ||||||
|   attr :row_item, :any, |   attr :row_item, :any, | ||||||
|     default: &Function.identity/1, |     default: &Function.identity/1, | ||||||
|  | @ -460,11 +458,16 @@ defmodule PrymnWeb.CoreComponents do | ||||||
|       <table class="w-[40rem] mt-11 sm:w-full"> |       <table class="w-[40rem] mt-11 sm:w-full"> | ||||||
|         <thead class="text-left text-sm leading-6 text-zinc-500"> |         <thead class="text-left text-sm leading-6 text-zinc-500"> | ||||||
|           <tr> |           <tr> | ||||||
|             <th class="p-0 pb-4 font-normal"> |  | ||||||
|               <div class="w-min"><%= @indicator_label %></div> |  | ||||||
|             </th> |  | ||||||
|             <th :for={col <- @col} class="p-0 pr-6 pb-4 font-normal"><%= col[:label] %></th> |             <th :for={col <- @col} class="p-0 pr-6 pb-4 font-normal"><%= col[:label] %></th> | ||||||
|             <th class="relative p-0 pb-4"><span class="sr-only"><%= gettext("Actions") %></span></th> |             <th class="relative p-0 pb-4"> | ||||||
|  |               <span class="sr-only"> | ||||||
|  |                 <%= if @gettext do %> | ||||||
|  |                   <%= gettext("Actions") %> | ||||||
|  |                 <% else %> | ||||||
|  |                   Actions | ||||||
|  |                 <% end %> | ||||||
|  |               </span> | ||||||
|  |             </th> | ||||||
|           </tr> |           </tr> | ||||||
|         </thead> |         </thead> | ||||||
|         <tbody |         <tbody | ||||||
|  | @ -473,21 +476,13 @@ defmodule PrymnWeb.CoreComponents do | ||||||
|           class="relative divide-y divide-zinc-100 border-t border-zinc-200 text-sm leading-6 text-zinc-700" |           class="relative divide-y divide-zinc-100 border-t border-zinc-200 text-sm leading-6 text-zinc-700" | ||||||
|         > |         > | ||||||
|           <tr :for={row <- @rows} id={@row_id && @row_id.(row)} class="group hover:bg-zinc-50"> |           <tr :for={row <- @rows} id={@row_id && @row_id.(row)} class="group hover:bg-zinc-50"> | ||||||
|             <td |  | ||||||
|               class="relative w-min p-0 hover:cursor-pointer" |  | ||||||
|               phx-click={@row_click && @row_click.(row)} |  | ||||||
|             > |  | ||||||
|               <div class="block py-4 pr-6"> |  | ||||||
|                 <span class="absolute -inset-y-px right-0 -left-4 group-hover:bg-zinc-50 sm:rounded-l-xl" /> |  | ||||||
|                 <span class="relative hover:cursor-pointer"><%= @row_indicator.(row) %></span> |  | ||||||
|               </div> |  | ||||||
|             </td> |  | ||||||
|             <td |             <td | ||||||
|               :for={{col, i} <- Enum.with_index(@col)} |               :for={{col, i} <- Enum.with_index(@col)} | ||||||
|               phx-click={@row_click && @row_click.(row)} |               phx-click={@row_click && @row_click.(row)} | ||||||
|               class={["relative p-0", @row_click && "hover:cursor-pointer"]} |               class={["relative p-0", @row_click && "hover:cursor-pointer"]} | ||||||
|             > |             > | ||||||
|               <div class="block py-4 pr-6"> |               <div class="block py-4 pr-6"> | ||||||
|  |                 <span class="absolute -inset-y-px right-0 -left-4 group-hover:bg-zinc-50 sm:rounded-l-xl" /> | ||||||
|                 <span class={["relative", i == 0 && "font-semibold text-zinc-900"]}> |                 <span class={["relative", i == 0 && "font-semibold text-zinc-900"]}> | ||||||
|                   <%= render_slot(col, @row_item.(row)) %> |                   <%= render_slot(col, @row_item.(row)) %> | ||||||
|                 </span> |                 </span> | ||||||
|  |  | ||||||
|  | @ -1,25 +1,26 @@ | ||||||
| defmodule PrymnWeb.ServerLive.Index do | defmodule PrymnWeb.ServerLive.Index do | ||||||
|   alias Prymn.Servers |   require Logger | ||||||
|  |   alias Prymn.{Servers, Agents} | ||||||
| 
 | 
 | ||||||
|   use PrymnWeb, :live_view |   use PrymnWeb, :live_view | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def mount(_params, _session, socket) do |   def mount(_params, _session, socket) do | ||||||
|     servers = Servers.list_servers() |     servers = Servers.list_servers() | ||||||
|     pid = self() |     # pid = self() | ||||||
| 
 | 
 | ||||||
|     for %{public_ip: public_ip} <- servers, public_ip != nil do |     for %Servers.Server{status: :registered, public_ip: ip} <- servers do | ||||||
|       :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{public_ip}") |       :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{ip}") | ||||||
| 
 | 
 | ||||||
|       Task.start_link(fn -> |       Task.start_link(fn -> | ||||||
|         case Prymn.Agents.ensure_connection("#{public_ip}:50012") do |         case Agents.ensure_connection(ip) do | ||||||
|           :ok -> Process.send(pid, {:healthcheck, :up, public_ip}, []) |           :ok -> IO.puts("Ok") | ||||||
|           {:error, _error} -> Process.send(pid, {:healthcheck, :down, public_ip}, []) |           _ -> IO.puts("not ok") | ||||||
|         end |         end | ||||||
|       end) |       end) | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     {:ok, stream(socket, :servers, servers)} |     {:ok, assign(socket, :servers, servers)} | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|  | @ -39,29 +40,34 @@ defmodule PrymnWeb.ServerLive.Index do | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def handle_info({:connect, %Servers.Server{} = server}, socket) do |   def handle_info({:connect, %Servers.Server{} = server}, socket) do | ||||||
|     {:noreply, stream_insert(socket, :servers, server)} |  | ||||||
|   end |  | ||||||
| 
 |  | ||||||
|   @impl true |  | ||||||
|   def handle_info({:healthcheck, status, ip}, socket) do |  | ||||||
|     server = Servers.get_server_by_ip!(ip) |  | ||||||
| 
 |  | ||||||
|     status = |  | ||||||
|       case status do |  | ||||||
|         :up -> "Connected" |  | ||||||
|         :down -> "Disconnected" |  | ||||||
|       end |  | ||||||
| 
 |  | ||||||
|     {:noreply, |     {:noreply, | ||||||
|      socket |      socket | ||||||
|      |> stream_insert(:servers, Map.put(server, :connection_status, status))} |      |> update(:servers, fn servers -> [server | servers] end)} | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   @impl true |   @impl true | ||||||
|   def handle_event("delete", %{"id" => id}, socket) do |   def handle_info({:healthcheck, ip, _message}, socket) do | ||||||
|     server = Servers.get_server!(id) |     servers = | ||||||
|     {:ok, _} = Servers.delete_server(server) |       update_in( | ||||||
|  |         socket.assigns.servers, | ||||||
|  |         [Access.filter(&match?(%{public_ip: ^ip}, &1))], | ||||||
|  |         &Map.merge(&1, %{connection_status: "Connected"}) | ||||||
|  |       ) | ||||||
| 
 | 
 | ||||||
|     {:noreply, stream_delete(socket, :servers, server)} |     {:noreply, assign(socket, :servers, servers)} | ||||||
|   end |   end | ||||||
|  | 
 | ||||||
|  |   @impl true | ||||||
|  |   def handle_info(msg, state) do | ||||||
|  |     Logger.debug("received unexpected message #{inspect(msg)}") | ||||||
|  |     {:noreply, state} | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   # @impl true | ||||||
|  |   # def handle_event("delete", %{"id" => id}, socket) do | ||||||
|  |   #   server = Servers.get_server!(id) | ||||||
|  |   #   {:ok, _} = Servers.delete_server(server) | ||||||
|  | 
 | ||||||
|  |   #   {:noreply, stream_delete(socket, :servers, server)} | ||||||
|  |   # end | ||||||
| end | end | ||||||
|  |  | ||||||
|  | @ -1,5 +1,8 @@ | ||||||
| <.header> | <.header> | ||||||
|   All available servers to you |   Your servers | ||||||
|  |   <small class="block"> | ||||||
|  |     <%= "#{Enum.count(@servers)} servers" %> | ||||||
|  |   </small> | ||||||
|   <:actions> |   <:actions> | ||||||
|     <.link patch={~p"/servers/new"}> |     <.link patch={~p"/servers/new"}> | ||||||
|       <.button>Connect a Server</.button> |       <.button>Connect a Server</.button> | ||||||
|  | @ -7,38 +10,27 @@ | ||||||
|   </:actions> |   </:actions> | ||||||
| </.header> | </.header> | ||||||
| 
 | 
 | ||||||
| <.table | <div class="space-y-5" phx-update="replace" id="servers"> | ||||||
|   id="servers" |   <.link | ||||||
|   rows={@streams.servers} |     :for={server <- @servers} | ||||||
|   row_click={fn {_id, server} -> JS.navigate(~p"/servers/#{server}") end} |     navigate={~p"/servers/#{server}"} | ||||||
|   row_indicator={ |     class="group block rounded-lg bg-gray-100 p-5 shadow-sm shadow-gray-300 hover:bg-black hover:text-white" | ||||||
|     fn |   > | ||||||
|       {_id, %Servers.Server{status: :unregistered}} -> |     <div class="flex flex-row flex-wrap justify-between"> | ||||||
|         ~H(<span class="text-grey-600">Awaiting registration</span>) |       <h2 class="text-xl"><%= server.name %></h2> | ||||||
| 
 |       <span class="self-center text-sm"> | ||||||
|       {_id, %Servers.Server{connection_status: nil, status: :registered}} -> |         <%= if server.status == :registered do %> | ||||||
|         ~H(<span class="text-yellow-600">Connecting...</span>) |           <%= server.connection_status || "..." %> | ||||||
| 
 |         <% else %> | ||||||
|       {_id, %Servers.Server{connection_status: "Connected"}} -> |           Not registered | ||||||
|         ~H(<span class="text-green-600">Connected</span>) |         <% end %> | ||||||
| 
 |       </span> | ||||||
|       {_id, %Servers.Server{connection_status: "Disconnected"}} -> |     </div> | ||||||
|         ~H(<span class="text-red-600">Disconnected</span>) |     <div class="lg:text-sm"> | ||||||
|     end |       <span>IP: <%= server.public_ip || "N/A" %></span> | ||||||
|   } |     </div> | ||||||
|   indicator_label="Status" |   </.link> | ||||||
| > | </div> | ||||||
|   <:col :let={{_id, server}} label="Name"><%= server.name %></:col> |  | ||||||
|   <:col :let={{_id, server}} label="IP"><%= server.public_ip || "N/A" %></:col> |  | ||||||
|   <:action :let={{id, server}}> |  | ||||||
|     <.link |  | ||||||
|       phx-click={JS.push("delete", value: %{id: server.id}) |> hide("##{id}")} |  | ||||||
|       data-confirm="Are you sure?" |  | ||||||
|     > |  | ||||||
|       Delete |  | ||||||
|     </.link> |  | ||||||
|   </:action> |  | ||||||
| </.table> |  | ||||||
| 
 | 
 | ||||||
| <.modal :if={@live_action == :new} id="server-modal" show on_cancel={JS.patch(~p"/servers")}> | <.modal :if={@live_action == :new} id="server-modal" show on_cancel={JS.patch(~p"/servers")}> | ||||||
|   <.live_component module={PrymnWeb.ServerLive.NewServer} id={:new} patch={~p"/servers"} /> |   <.live_component module={PrymnWeb.ServerLive.NewServer} id={:new} patch={~p"/servers"} /> | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue