From 3e066fd23a8f4efbad8bea3764cef42f4ad484a6 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Sat, 3 Feb 2024 13:11:31 +0200 Subject: [PATCH] subscription --- app/lib/prymn/messaging/subscription.ex | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 app/lib/prymn/messaging/subscription.ex diff --git a/app/lib/prymn/messaging/subscription.ex b/app/lib/prymn/messaging/subscription.ex new file mode 100644 index 0000000..04c43fa --- /dev/null +++ b/app/lib/prymn/messaging/subscription.ex @@ -0,0 +1,36 @@ +defmodule Prymn.Messaging.Subscription do + use GenServer + + alias Prymn.Messaging + + def start_link(%{agent_id: agent_id, subject: subject} = init_arg) do + name = {:via, Registry, {Prymn.Messaging.SubscriptionRegistry, agent_id <> subject}} + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + @impl true + def init(%{agent_id: agent_id, subject: subject, gnat: gnat, reply: reply}) do + Process.monitor(reply) + subject = "agents.v1.#{agent_id}.#{subject}" + {:ok, sub} = Gnat.sub(gnat, self(), subject) + + {:ok, %{reply: reply, sub: sub, gnat: gnat}} + end + + @impl true + def handle_info({:msg, %{body: body, topic: subject}}, %{reply: reply} = state) do + msg = Messaging.Messages.handle_message(subject, body) + send(reply, msg) + + {:noreply, state} + end + + def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do + {:stop, {:shutdown, :gone}, state} + end + + @impl true + def terminate({:shutdown, _}, state) do + Gnat.unsub(state.gnat, state.sub) + end +end