From dfd446f770412e1a0f02cab56223a88235b987c9 Mon Sep 17 00:00:00 2001 From: Nikos Papadakis Date: Sat, 17 Feb 2024 17:20:08 +0200 Subject: [PATCH] goagent: bare bones api --- app/lib/prymn_web/channels/agent_channel.ex | 5 +- goagent/cmd/agent/main.go | 25 +++++- goagent/go.mod | 2 +- goagent/pkg/gophx/channel.go | 53 +++++++---- goagent/pkg/gophx/message.go | 36 ++++++++ goagent/pkg/gophx/socket.go | 99 +++++++++++++++++++++ 6 files changed, 198 insertions(+), 22 deletions(-) create mode 100644 goagent/pkg/gophx/message.go create mode 100644 goagent/pkg/gophx/socket.go diff --git a/app/lib/prymn_web/channels/agent_channel.ex b/app/lib/prymn_web/channels/agent_channel.ex index c4b9c80..7a7e5cb 100644 --- a/app/lib/prymn_web/channels/agent_channel.ex +++ b/app/lib/prymn_web/channels/agent_channel.ex @@ -2,10 +2,7 @@ defmodule PrymnWeb.AgentChannel do use PrymnWeb, :channel @impl true - def join("agents:" <> agent_id, payload, socket) do - IO.inspect(agent_id) - IO.inspect(payload) - + def join("agents:" <> _agent_id, _payload, socket) do {:ok, socket} end diff --git a/goagent/cmd/agent/main.go b/goagent/cmd/agent/main.go index 787ffbf..2d93d6f 100644 --- a/goagent/cmd/agent/main.go +++ b/goagent/cmd/agent/main.go @@ -1,14 +1,35 @@ package main import ( + "context" "fmt" + "os" + "os/signal" - "git.nikos.gg/prymn/prymn/goagent/pkg/gophx" + "nikos.codes/prymn/prymn/goagent/pkg/gophx" ) func main() { - err := gophx.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + socket, err := gophx.Connect(ctx) if err != nil { fmt.Println(err) } + + channel := gophx.NewChannel("agents:1", nil, socket) + + channel.On("foo_bar", func(payload any) { + fmt.Println("received msg: ", payload) + }) + + handleSignals(cancel) +} + +func handleSignals(cancel context.CancelFunc) { + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh) + <-signalCh + cancel() } diff --git a/goagent/go.mod b/goagent/go.mod index 8f0819f..4cef863 100644 --- a/goagent/go.mod +++ b/goagent/go.mod @@ -1,4 +1,4 @@ -module git.nikos.gg/prymn/prymn/goagent +module nikos.codes/prymn/prymn/goagent go 1.21.5 diff --git a/goagent/pkg/gophx/channel.go b/goagent/pkg/gophx/channel.go index 12572c4..a85b04c 100644 --- a/goagent/pkg/gophx/channel.go +++ b/goagent/pkg/gophx/channel.go @@ -1,22 +1,45 @@ package gophx -import ( - "context" - "time" +type Channel struct { + topic string + bindings map[string]func(any) +} - "nhooyr.io/websocket" -) - -func Connect() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - c, _, err := websocket.Dial(ctx, "ws://localhost:4000/agent/websocket?vsn=2.0.0", nil) - if err != nil { - return err +func NewChannel(topic string, params any, socket *Socket) *Channel { + // TODO: rejoin when socket is reconnected + channel := &Channel{ + bindings: make(map[string]func(any)), + topic: topic, } - c.Ping(ctx) + msg := message{ + joinRef: 0, + ref: 0, + topic: topic, + event: "phx_join", + payload: params, + } - return nil + data, err := msg.serializeJSON() + if err != nil { + panic(err) + } + + socket.addChannel(channel) + socket.send(data) + return channel +} + +func (channel *Channel) On(event string, callback func(any)) { + channel.bindings[event] = callback +} + +func (channel *Channel) Push(event string, params any) { + +} + +func (channel *Channel) handleMessage(msg message) { + if binding, ok := channel.bindings[msg.event]; ok { + go binding(msg.payload) + } } diff --git a/goagent/pkg/gophx/message.go b/goagent/pkg/gophx/message.go new file mode 100644 index 0000000..7a8614d --- /dev/null +++ b/goagent/pkg/gophx/message.go @@ -0,0 +1,36 @@ +package gophx + +import ( + "encoding/json" +) + +type message struct { + joinRef int64 + ref int64 + topic string + event string + payload any +} + +func (m message) serializeJSON() ([]byte, error) { + data, err := json.Marshal([]any{m.joinRef, m.ref, m.topic, m.event, m.payload}) + if err != nil { + return nil, err + } + + return data, nil +} + +func deserializeJSON(data []byte, msg *message) error { + m := message{} + slice := []any{&m.joinRef, &m.ref, &m.topic, &m.event, &m.payload} + + err := json.Unmarshal(data, &slice) + if err != nil { + return err + } + + *msg = m + + return nil +} diff --git a/goagent/pkg/gophx/socket.go b/goagent/pkg/gophx/socket.go new file mode 100644 index 0000000..1c2575c --- /dev/null +++ b/goagent/pkg/gophx/socket.go @@ -0,0 +1,99 @@ +package gophx + +import ( + "context" + "fmt" + + "nhooyr.io/websocket" +) + +type Socket struct { + sendChan chan []byte + websocket *websocket.Conn + channels map[string]*Channel +} + +func Connect(ctx context.Context) (*Socket, error) { + serverUrl := "ws://localhost:4000/agent/websocket?vsn=2.0.0" + + c, _, err := websocket.Dial(ctx, serverUrl, nil) + if err != nil { + return nil, err + } + + socket := Socket{ + sendChan: make(chan []byte), + channels: make(map[string]*Channel), + websocket: c, + } + + go socket.readMessages(ctx) + + go writeMessages(ctx, socket.sendChan, c) + + return &socket, nil +} + +func (s *Socket) addChannel(ch *Channel) { + s.channels[ch.topic] = ch +} + +func (s *Socket) send(data []byte) { + s.sendChan <- data +} + +func (s *Socket) readMessages(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + typ, data, err := s.websocket.Read(ctx) + if err != nil { + // TODO: try to reconnect + fmt.Println("socket closed?", err) + return + } + + switch typ { + case websocket.MessageBinary: + fmt.Println("received binary", data) + case websocket.MessageText: + msg := message{} + + err := deserializeJSON(data, &msg) + if err != nil { + fmt.Println(err) + } + + s.sendToChannel(msg) + } + } + } +} + +func (s *Socket) sendToChannel(msg message) { + ch, ok := s.channels[msg.topic] + if !ok { + return + } + + ch.handleMessage(msg) +} + +func writeMessages(ctx context.Context, sendChan chan []byte, ws *websocket.Conn) { + for { + select { + case message, ok := <-sendChan: + if !ok { + fmt.Println("sending channel closed") + return + } + + err := ws.Write(ctx, websocket.MessageText, message) + if err != nil { + fmt.Println(err) + } + } + } +}