backend: add "agents" context

Manages connection processes to a remote agent
This commit is contained in:
Nikos Papadakis 2023-06-27 22:28:00 +03:00
parent bd51310f84
commit ecf03b8f93
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
9 changed files with 176 additions and 8 deletions

View file

@ -15,8 +15,8 @@ async fn main() -> anyhow::Result<()> {
shutdown_tx.send(()).expect("bug: channel closed");
});
let addr = "0.0.0.0:5012".parse()?;
tracing::info!("Serving new agent at address: {}", addr);
let addr = "0.0.0.0:50012".parse()?;
tracing::info!(message = "starting new agent", %addr);
new_server()
.serve_with_shutdown(addr, async {

View file

@ -18,14 +18,24 @@ type Result<T> = std::result::Result<T, tonic::Status>;
#[tonic::async_trait]
impl rpc::agent_server::Agent for Server {
#[tracing::instrument(skip(self))]
async fn echo(&self, req: Request<rpc::EchoRequest>) -> Result<Response<rpc::EchoResponse>> {
Ok(Response::new(rpc::EchoResponse {
message: req.into_inner().message,
}))
}
#[tracing::instrument(skip(self))]
async fn get_sys_info(&self, _: Request<()>) -> Result<Response<rpc::SysInfoResponse>> {
let sys = self.sys.lock().unwrap();
let mut sys = self.sys.lock().unwrap();
sys.refresh_specifics(
sysinfo::RefreshKind::new()
.with_disks()
.with_memory()
.with_processes(sysinfo::ProcessRefreshKind::everything())
.with_cpu(sysinfo::CpuRefreshKind::everything()),
);
let cpus = sys
.cpus()
@ -64,6 +74,7 @@ impl rpc::agent_server::Agent for Server {
type ExecStream = Pin<Box<dyn Stream<Item = Result<rpc::ExecResponse>> + Send + Sync>>;
#[tracing::instrument(skip(self))]
async fn exec(&self, req: Request<rpc::ExecRequest>) -> Result<Response<Self::ExecStream>> {
use exec::*;
@ -89,7 +100,11 @@ pub fn new_server() -> Router {
sys: Arc::new(Mutex::new(System::new_all())),
};
tonic::transport::Server::builder().add_service(rpc::agent_server::AgentServer::new(server))
let service = rpc::agent_server::AgentServer::new(server);
tonic::transport::Server::builder()
.trace_fn(|_| tracing::info_span!("agent_server"))
.add_service(service)
}
#[cfg(test)]

View file

@ -1 +1 @@
tonic::include_proto!("agent");
tonic::include_proto!("prymn");

View file

@ -0,0 +1,51 @@
defmodule Prymn.Agents do
@moduledoc ~S"""
Prymn Agents are programs that manage a remote client machine. Prymn backend
communicates with them using GRPC calls. GRPC connections are started using
the Prymn.Agents.Supervisor (a DynamicSupervisor) and are book-kept using the
Prymn.Agents.Registry.
## Examples
TODO
"""
@doc """
Ensures a connection with the Prymn Agent exists and is kept in memory.
Returns `:ok` when a new connection is successfuly established or is already established
Returns `{:error, reason}` when the connection could not be established
"""
@spec ensure_connection(String.t()) :: :ok | {:error, term}
def ensure_connection(address) do
child = {Prymn.Agents.Connection, address}
case DynamicSupervisor.start_child(Prymn.Agents.Supervisor, child) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, error} -> {:error, error}
end
end
@doc """
Terminates the process and drops the connection gracefully.
"""
@spec drop_connection(String.t()) :: :ok | {:error, :not_found}
def drop_connection(address) do
:ok = Prymn.Agents.Connection.drop(address)
catch
:exit, _ -> {:error, :not_found}
end
@doc """
Get the channel for the given `address`. The channel is used to make GRPC
calls.
"""
@spec get_channel(String.t()) :: GRPC.Channel.t() | {:error, :not_found}
def get_channel(address) do
Prymn.Agents.Connection.get_channel(address)
catch
:exit, _ -> {:error, :not_found}
end
end

View file

@ -0,0 +1,95 @@
defmodule Prymn.Agents.Connection do
@moduledoc false
defstruct [:channel, up?: false]
@ping_interval 20000
use GenServer, restart: :transient
require Logger
@spec start_link(String.t()) :: GenServer.on_start()
def start_link(addr) do
GenServer.start_link(__MODULE__, addr, name: via(addr))
end
@spec get_channel(String.t()) :: GRPC.Channel.t()
def get_channel(addr) do
GenServer.call(via(addr), :get_channel)
end
@spec drop(String.t()) :: :ok
def drop(addr) do
GenServer.stop(via(addr), :shutdown)
end
@impl true
def init(addr) do
case GRPC.Stub.connect(addr) do
{:ok, channel} ->
Logger.info("Starting new connection at address #{addr}")
state = %__MODULE__{channel: channel, up?: true}
Process.send_after(self(), :do_healthcheck, @ping_interval)
{:ok, state}
{:error, error} ->
{:stop, error}
end
end
@impl true
def handle_call(:get_channel, _from, state) do
{:reply, state.channel, state}
end
@impl true
def handle_info({:gun_up, _pid, _protocol}, state) do
Logger.info("regained connection (#{inspect(state.channel)})")
{:noreply, %{state | up?: true}}
end
@impl true
def handle_info({:gun_down, _pid, _protocol, reason, _killed_streams}, state) do
Logger.info("lost connection (reason: #{inspect(reason)}, #{inspect(state.channel)})")
{:noreply, %{state | up?: false}}
end
@impl true
def handle_info(:do_healthcheck, %{channel: channel} = state) do
request = %PrymnProto.Prymn.EchoRequest{message: "hello"}
case PrymnProto.Prymn.Agent.Stub.echo(channel, request) do
{:ok, _reply} ->
:noop
# IO.inspect(reply)
{:error, error} ->
Logger.warning("healthcheck error for server #{channel.host}, reason: #{inspect(error)}")
end
Process.send_after(self(), :do_healthcheck, @ping_interval)
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
Logger.info("received unexpected message: #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(_reason, %{channel: channel}) do
GRPC.Stub.disconnect(channel)
end
defp via(name) do
{:via, Registry, {Prymn.Agents.Registry, name}}
end
end

View file

@ -17,9 +17,10 @@ defmodule Prymn.Application do
# Start Finch
{Finch, name: Prymn.Finch},
# Start the Endpoint (http/https)
PrymnWeb.Endpoint
# Start a worker by calling: Prymn.Worker.start_link(arg)
# {Prymn.Worker, arg}
PrymnWeb.Endpoint,
# Start the prymn agent (grpc) registry and the supervisor
{Registry, keys: :unique, name: Prymn.Agents.Registry},
{DynamicSupervisor, name: Prymn.Agents.Supervisor, strategy: :one_for_one}
]
# See https://hexdocs.pm/elixir/Supervisor.html

View file

@ -5,6 +5,10 @@ defmodule PrymnWeb.ServerLive.Index do
@impl true
def mount(_params, _session, socket) do
# Run this for every server:
# make sure an agent connection is made (async "cheap" request)
# then wait for events
# pubsub will eventually send a connected or a disconnected (and anything else) event
{:ok, stream(socket, :servers, Servers.list_servers())}
end

View file

@ -49,6 +49,7 @@ defmodule Prymn.MixProject do
{:plug_cowboy, "~> 2.5"},
{:grpc, github: "elixir-grpc/grpc", ref: "691ac2146eac1691e703e31985765f042ec5e91a"},
{:protobuf, "~> 0.12.0"},
{:google_protos, "~> 0.3.0"},
# Test
{:floki, ">= 0.30.0", only: :test},

View file

@ -15,6 +15,7 @@
"finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"},
"floki": {:hex, :floki, "0.34.3", "5e2dcaec5d7c228ce5b1d3501502e308b2d79eb655e4191751a1fe491c37feac", [:mix], [], "hexpm", "9577440eea5b97924b4bf3c7ea55f7b8b6dce589f9b28b096cc294a8dc342341"},
"gettext": {:hex, :gettext, "0.22.2", "6bfca374de34ecc913a28ba391ca184d88d77810a3e427afa8454a71a51341ac", [:mix], [{:expo, "~> 0.4.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "8a2d389673aea82d7eae387e6a2ccc12660610080ae7beb19452cfdc1ec30f60"},
"google_protos": {:hex, :google_protos, "0.3.0", "15faf44dce678ac028c289668ff56548806e313e4959a3aaf4f6e1ebe8db83f4", [:mix], [{:protobuf, "~> 0.10", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "1f6b7fb20371f72f418b98e5e48dae3e022a9a6de1858d4b254ac5a5d0b4035f"},
"grpc": {:git, "https://github.com/elixir-grpc/grpc.git", "691ac2146eac1691e703e31985765f042ec5e91a", [ref: "691ac2146eac1691e703e31985765f042ec5e91a"]},
"gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},