backend: live connection status!
This commit is contained in:
		
							parent
							
								
									e12e20eb38
								
							
						
					
					
						commit
						a8699035d8
					
				
					 9 changed files with 135 additions and 45 deletions
				
			
		| 
						 | 
				
			
			@ -1,6 +1,9 @@
 | 
			
		|||
defmodule Prymn.Agents.Connection do
 | 
			
		||||
  @moduledoc false
 | 
			
		||||
 | 
			
		||||
  # TODO: Disconnect after a while of idling. Disconnect when the healthcheck
 | 
			
		||||
  #       fails too many times.
 | 
			
		||||
 | 
			
		||||
  defstruct [:channel, up?: false]
 | 
			
		||||
 | 
			
		||||
  @ping_interval 20000
 | 
			
		||||
| 
						 | 
				
			
			@ -46,15 +49,27 @@ defmodule Prymn.Agents.Connection do
 | 
			
		|||
  end
 | 
			
		||||
 | 
			
		||||
  @impl true
 | 
			
		||||
  def handle_info({:gun_up, _pid, _protocol}, state) do
 | 
			
		||||
    Logger.info("regained connection (#{inspect(state.channel)})")
 | 
			
		||||
  def handle_info({:gun_up, _pid, _protocol}, %{channel: channel} = state) do
 | 
			
		||||
    Logger.info("[Agent] #{state.channel.host} regained connection")
 | 
			
		||||
 | 
			
		||||
    Phoenix.PubSub.broadcast!(
 | 
			
		||||
      Prymn.PubSub,
 | 
			
		||||
      "agent:#{channel.host}",
 | 
			
		||||
      {:healthcheck, :up, channel.host}
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    {:noreply, %{state | up?: true}}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @impl true
 | 
			
		||||
  def handle_info({:gun_down, _pid, _protocol, reason, _killed_streams}, state) do
 | 
			
		||||
    Logger.info("lost connection (reason: #{inspect(reason)}, #{inspect(state.channel)})")
 | 
			
		||||
  def handle_info({:gun_down, _pid, _proto, reason, _}, %{channel: channel} = state) do
 | 
			
		||||
    Logger.info("[Agent] #{channel.host} lost connection, reason: #{reason}")
 | 
			
		||||
 | 
			
		||||
    Phoenix.PubSub.broadcast!(
 | 
			
		||||
      Prymn.PubSub,
 | 
			
		||||
      "agent:#{channel.host}",
 | 
			
		||||
      {:healthcheck, :down, channel.host}
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    {:noreply, %{state | up?: false}}
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			@ -67,8 +82,6 @@ defmodule Prymn.Agents.Connection do
 | 
			
		|||
      {:ok, _reply} ->
 | 
			
		||||
        :noop
 | 
			
		||||
 | 
			
		||||
      # IO.inspect(reply)
 | 
			
		||||
 | 
			
		||||
      {:error, error} ->
 | 
			
		||||
        Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}")
 | 
			
		||||
    end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,6 +37,12 @@ defmodule Prymn.Servers do
 | 
			
		|||
  """
 | 
			
		||||
  def get_server!(id), do: Repo.get!(Server, id)
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Get a single server by its IP.
 | 
			
		||||
  """
 | 
			
		||||
  @spec get_server_by_ip!(String.t()) :: %Server{}
 | 
			
		||||
  def get_server_by_ip!(ip), do: Repo.get_by!(Server, public_ip: ip)
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
 | 
			
		||||
  Start a new server connection with the app.
 | 
			
		||||
| 
						 | 
				
			
			@ -52,16 +58,19 @@ defmodule Prymn.Servers do
 | 
			
		|||
  Registers a server using a registration token.
 | 
			
		||||
  """
 | 
			
		||||
  def register_server(token, public_ip) do
 | 
			
		||||
    # TODO: Validate public ip
 | 
			
		||||
    with true <- :inet.is_ip_address(public_ip),
 | 
			
		||||
         {:ok, token} <- Base.decode64(token) do
 | 
			
		||||
      public_ip_string =
 | 
			
		||||
        public_ip
 | 
			
		||||
        |> :inet.ntoa()
 | 
			
		||||
        |> to_string()
 | 
			
		||||
 | 
			
		||||
    case Base.decode64(token) do
 | 
			
		||||
      {:ok, token} ->
 | 
			
		||||
        from(s in Server, where: s.registration_token == ^token, select: s)
 | 
			
		||||
        |> Repo.one()
 | 
			
		||||
        |> update_server(%{public_ip: public_ip})
 | 
			
		||||
 | 
			
		||||
      :error ->
 | 
			
		||||
        {:error, "token is not a valid base64 encoded string"}
 | 
			
		||||
      from(s in Server, where: s.registration_token == ^token, select: s)
 | 
			
		||||
      |> Repo.one!()
 | 
			
		||||
      |> update_server(%{public_ip: public_ip_string, status: :registered})
 | 
			
		||||
    else
 | 
			
		||||
      false -> {:error, :invalid_ip}
 | 
			
		||||
      :error -> {:error, :bad_token}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -111,4 +120,26 @@ defmodule Prymn.Servers do
 | 
			
		|||
  def change_server(%Server{} = server, attrs \\ %{}) do
 | 
			
		||||
    Server.changeset(server, attrs)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Returns a string containing the command that needs to be executed to the
 | 
			
		||||
  remote server in order to register it to the backend.
 | 
			
		||||
  """
 | 
			
		||||
  @spec create_setup_command(%Server{}) :: String.t()
 | 
			
		||||
  def create_setup_command(%Server{registration_token: token}) do
 | 
			
		||||
    build_command = fn token ->
 | 
			
		||||
      if Application.get_env(:prymn, :environment) == :prod do
 | 
			
		||||
        "curl -sSf https://static.prymn.net/get_prymn.sh | sh -s " <> token
 | 
			
		||||
      else
 | 
			
		||||
        # On a dev environment we want to download the local version of the agent
 | 
			
		||||
        agent_path = Path.expand("../agent/target/debug/prymn_agent")
 | 
			
		||||
        get_prymn_path = Path.expand("../agent/get_prymn.sh")
 | 
			
		||||
        "GET_PRYMN_ROOT=file://#{agent_path} #{get_prymn_path} #{token}"
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    token
 | 
			
		||||
    |> Base.encode64()
 | 
			
		||||
    |> then(build_command)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,9 +8,11 @@ defmodule Prymn.Servers.Server do
 | 
			
		|||
    field :provider, Ecto.Enum, values: [:Hetzner, :Custom]
 | 
			
		||||
    field :registration_token, :binary, redact: true
 | 
			
		||||
 | 
			
		||||
    field :connection_status, Ecto.Enum,
 | 
			
		||||
      values: [:awaiting, :connecting, :installing, :connected, :disconnected],
 | 
			
		||||
      default: :awaiting
 | 
			
		||||
    field :status, Ecto.Enum,
 | 
			
		||||
      values: [:unregistered, :registered],
 | 
			
		||||
      default: :unregistered
 | 
			
		||||
 | 
			
		||||
    field :connection_status, :string, virtual: true
 | 
			
		||||
 | 
			
		||||
    timestamps()
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			@ -18,8 +20,9 @@ defmodule Prymn.Servers.Server do
 | 
			
		|||
  @doc false
 | 
			
		||||
  def changeset(server, attrs) do
 | 
			
		||||
    server
 | 
			
		||||
    |> cast(attrs, [:name, :provider, :registration_token])
 | 
			
		||||
    |> cast(attrs, [:name, :public_ip, :provider, :registration_token, :status])
 | 
			
		||||
    |> validate_required([:name, :provider])
 | 
			
		||||
    |> validate_inclusion(:provider, [:Custom], message: "Provider not available (yet)")
 | 
			
		||||
    |> unique_constraint([:public_ip])
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,19 +1,36 @@
 | 
			
		|||
defmodule PrymnWeb.ServerController do
 | 
			
		||||
  use PrymnWeb, :controller
 | 
			
		||||
 | 
			
		||||
  alias Prymn.Servers
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  use PrymnWeb, :controller
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Used by clients to request a new server connection to the prymn backend
 | 
			
		||||
  validating their registration token.
 | 
			
		||||
  """
 | 
			
		||||
  def register(conn, %{"token" => token, "ip" => ip}) do
 | 
			
		||||
    case Servers.register_server(token, ip) do
 | 
			
		||||
  def register(conn, %{"token" => token}) do
 | 
			
		||||
    case Servers.register_server(token, conn.remote_ip) do
 | 
			
		||||
      {:ok, _server} ->
 | 
			
		||||
        json(conn, %{"connected" => true})
 | 
			
		||||
 | 
			
		||||
      {:error, :invalid_ip} ->
 | 
			
		||||
        Logger.error("could not register a server because we received an invalid ip")
 | 
			
		||||
 | 
			
		||||
        put_status(conn, 500)
 | 
			
		||||
        |> json(%{"errors" => ["invalid ip received"]})
 | 
			
		||||
 | 
			
		||||
      {:error, :bad_token} ->
 | 
			
		||||
        put_status(conn, 400)
 | 
			
		||||
        |> json(%{"errors" => %{"token" => "token is not valid"}})
 | 
			
		||||
 | 
			
		||||
      {:error, %Ecto.Changeset{} = changeset} ->
 | 
			
		||||
        errors = Ecto.Changeset.traverse_errors(changeset, fn {msg, _} -> msg end)
 | 
			
		||||
 | 
			
		||||
        put_status(conn, 400)
 | 
			
		||||
        |> json(%{"errors" => errors})
 | 
			
		||||
 | 
			
		||||
      {:error, error} ->
 | 
			
		||||
        raise inspect(error)
 | 
			
		||||
        raise "An unhandled error was received #{inspect(error)}"
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,15 +1,25 @@
 | 
			
		|||
defmodule PrymnWeb.ServerLive.Index do
 | 
			
		||||
  use PrymnWeb, :live_view
 | 
			
		||||
 | 
			
		||||
  alias Prymn.Servers
 | 
			
		||||
 | 
			
		||||
  use PrymnWeb, :live_view
 | 
			
		||||
 | 
			
		||||
  @impl true
 | 
			
		||||
  def mount(_params, _session, socket) do
 | 
			
		||||
    # Run this for every server:
 | 
			
		||||
    #   make sure an agent connection is made (async "cheap" request)
 | 
			
		||||
    #     then wait for events
 | 
			
		||||
    #     pubsub will eventually send a connected or a disconnected (and anything else) event
 | 
			
		||||
    {:ok, stream(socket, :servers, Servers.list_servers())}
 | 
			
		||||
    servers = Servers.list_servers()
 | 
			
		||||
    pid = self()
 | 
			
		||||
 | 
			
		||||
    for %{public_ip: public_ip} <- servers, public_ip != nil do
 | 
			
		||||
      :ok = Phoenix.PubSub.subscribe(Prymn.PubSub, "agent:#{public_ip}")
 | 
			
		||||
 | 
			
		||||
      Task.start_link(fn ->
 | 
			
		||||
        case Prymn.Agents.ensure_connection("#{public_ip}:50012") do
 | 
			
		||||
          :ok -> Process.send(pid, {:healthcheck, :up, public_ip}, [])
 | 
			
		||||
          {:error, _error} -> Process.send(pid, {:healthcheck, :down, public_ip}, [])
 | 
			
		||||
        end
 | 
			
		||||
      end)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    {:ok, stream(socket, :servers, servers)}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @impl true
 | 
			
		||||
| 
						 | 
				
			
			@ -29,12 +39,22 @@ defmodule PrymnWeb.ServerLive.Index do
 | 
			
		|||
 | 
			
		||||
  @impl true
 | 
			
		||||
  def handle_info({:connect, %Servers.Server{} = server}, socket) do
 | 
			
		||||
    socket =
 | 
			
		||||
      if server.provider == :Custom,
 | 
			
		||||
        do: push_navigate(socket, to: ~p"/servers/#{server}"),
 | 
			
		||||
        else: stream_insert(socket, :servers, server)
 | 
			
		||||
    {:noreply, stream_insert(socket, :servers, server)}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
    {:noreply, socket}
 | 
			
		||||
  @impl true
 | 
			
		||||
  def handle_info({:healthcheck, status, ip}, socket) do
 | 
			
		||||
    server = Servers.get_server_by_ip!(ip)
 | 
			
		||||
 | 
			
		||||
    status =
 | 
			
		||||
      case status do
 | 
			
		||||
        :up -> "Connected"
 | 
			
		||||
        :down -> "Disconnected"
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
    {:noreply,
 | 
			
		||||
     socket
 | 
			
		||||
     |> stream_insert(:servers, Map.put(server, :connection_status, status))}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @impl true
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,19 +13,23 @@
 | 
			
		|||
  row_click={fn {_id, server} -> JS.navigate(~p"/servers/#{server}") end}
 | 
			
		||||
  row_indicator={
 | 
			
		||||
    fn
 | 
			
		||||
      {_id, %Servers.Server{connection_status: :awaiting}} ->
 | 
			
		||||
        ~H(<span class="text-grey-600">Awaiting connection</span>)
 | 
			
		||||
      {_id, %Servers.Server{status: :unregistered}} ->
 | 
			
		||||
        ~H(<span class="text-grey-600">Awaiting registration</span>)
 | 
			
		||||
 | 
			
		||||
      {_id, %Servers.Server{connection_status: :connecting}} ->
 | 
			
		||||
        ~H(<span class="text-purple-600">Connecting</span>)
 | 
			
		||||
      {_id, %Servers.Server{connection_status: nil, status: :registered}} ->
 | 
			
		||||
        ~H(<span class="text-yellow-600">Connecting...</span>)
 | 
			
		||||
 | 
			
		||||
      {_id, %Servers.Server{connection_status: :connected}} ->
 | 
			
		||||
      {_id, %Servers.Server{connection_status: "Connected"}} ->
 | 
			
		||||
        ~H(<span class="text-green-600">Connected</span>)
 | 
			
		||||
 | 
			
		||||
      {_id, %Servers.Server{connection_status: "Disconnected"}} ->
 | 
			
		||||
        ~H(<span class="text-red-600">Disconnected</span>)
 | 
			
		||||
    end
 | 
			
		||||
  }
 | 
			
		||||
  indicator_label="Status"
 | 
			
		||||
>
 | 
			
		||||
  <:col :let={{_id, server}} label="Name"><%= server.name %></:col>
 | 
			
		||||
  <:col :let={{_id, server}} label="IP"><%= server.public_ip || "N/A" %></:col>
 | 
			
		||||
  <:action :let={{id, server}}>
 | 
			
		||||
    <.link
 | 
			
		||||
      phx-click={JS.push("delete", value: %{id: server.id}) |> hide("##{id}")}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,6 +15,7 @@ defmodule PrymnWeb.ServerLive.Show do
 | 
			
		|||
    {:noreply,
 | 
			
		||||
     socket
 | 
			
		||||
     |> assign(:page_title, server.name)
 | 
			
		||||
     |> assign(:server, server)}
 | 
			
		||||
     |> assign(:server, server)
 | 
			
		||||
     |> assign(:registration_command, Servers.create_setup_command(server))}
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,7 @@
 | 
			
		|||
  Server <%= @server.name %>
 | 
			
		||||
</.header>
 | 
			
		||||
 | 
			
		||||
<section :if={@server.connection_status == :awaiting} class="my-10">
 | 
			
		||||
<section :if={@server.status == :unregistered} class="my-10">
 | 
			
		||||
  <p class="mb-9">
 | 
			
		||||
    Connect to your server using root credentials and execute the following command:
 | 
			
		||||
  </p>
 | 
			
		||||
| 
						 | 
				
			
			@ -10,7 +10,7 @@
 | 
			
		|||
    <code class="flex gap-4">
 | 
			
		||||
      <span class="select-none text-gray-500">#</span>
 | 
			
		||||
      <span class="flex-1">
 | 
			
		||||
        your command here that contains token: <%= Base.encode64(@server.registration_token) %>
 | 
			
		||||
        <%= @registration_command %>
 | 
			
		||||
      </span>
 | 
			
		||||
    </code>
 | 
			
		||||
    <button type="button" tabindex="-1">
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,12 +6,13 @@ defmodule Prymn.Repo.Migrations.CreateServers do
 | 
			
		|||
      add :name, :string
 | 
			
		||||
      add :public_ip, :string
 | 
			
		||||
      add :provider, :string
 | 
			
		||||
      add :connection_status, :string
 | 
			
		||||
      add :status, :string
 | 
			
		||||
      add :registration_token, :binary
 | 
			
		||||
 | 
			
		||||
      timestamps()
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    create index("servers", [:registration_token], unique: true)
 | 
			
		||||
    create index("servers", [:public_ip], unique: true)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue