goagent: bare bones api

This commit is contained in:
Nikos Papadakis 2024-02-17 17:20:08 +02:00
parent b4edfd7585
commit dfd446f770
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
6 changed files with 198 additions and 22 deletions

View file

@ -2,10 +2,7 @@ defmodule PrymnWeb.AgentChannel do
use PrymnWeb, :channel use PrymnWeb, :channel
@impl true @impl true
def join("agents:" <> agent_id, payload, socket) do def join("agents:" <> _agent_id, _payload, socket) do
IO.inspect(agent_id)
IO.inspect(payload)
{:ok, socket} {:ok, socket}
end end

View file

@ -1,14 +1,35 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"os"
"os/signal"
"git.nikos.gg/prymn/prymn/goagent/pkg/gophx" "nikos.codes/prymn/prymn/goagent/pkg/gophx"
) )
func main() { func main() {
err := gophx.Connect() ctx, cancel := context.WithCancel(context.Background())
defer cancel()
socket, err := gophx.Connect(ctx)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
channel := gophx.NewChannel("agents:1", nil, socket)
channel.On("foo_bar", func(payload any) {
fmt.Println("received msg: ", payload)
})
handleSignals(cancel)
}
func handleSignals(cancel context.CancelFunc) {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh)
<-signalCh
cancel()
} }

View file

@ -1,4 +1,4 @@
module git.nikos.gg/prymn/prymn/goagent module nikos.codes/prymn/prymn/goagent
go 1.21.5 go 1.21.5

View file

@ -1,22 +1,45 @@
package gophx package gophx
import ( type Channel struct {
"context" topic string
"time" bindings map[string]func(any)
}
"nhooyr.io/websocket" func NewChannel(topic string, params any, socket *Socket) *Channel {
) // TODO: rejoin when socket is reconnected
channel := &Channel{
bindings: make(map[string]func(any)),
topic: topic,
}
func Connect() error { msg := message{
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) joinRef: 0,
defer cancel() ref: 0,
topic: topic,
event: "phx_join",
payload: params,
}
c, _, err := websocket.Dial(ctx, "ws://localhost:4000/agent/websocket?vsn=2.0.0", nil) data, err := msg.serializeJSON()
if err != nil { if err != nil {
return err panic(err)
} }
c.Ping(ctx) socket.addChannel(channel)
socket.send(data)
return nil return channel
}
func (channel *Channel) On(event string, callback func(any)) {
channel.bindings[event] = callback
}
func (channel *Channel) Push(event string, params any) {
}
func (channel *Channel) handleMessage(msg message) {
if binding, ok := channel.bindings[msg.event]; ok {
go binding(msg.payload)
}
} }

View file

@ -0,0 +1,36 @@
package gophx
import (
"encoding/json"
)
type message struct {
joinRef int64
ref int64
topic string
event string
payload any
}
func (m message) serializeJSON() ([]byte, error) {
data, err := json.Marshal([]any{m.joinRef, m.ref, m.topic, m.event, m.payload})
if err != nil {
return nil, err
}
return data, nil
}
func deserializeJSON(data []byte, msg *message) error {
m := message{}
slice := []any{&m.joinRef, &m.ref, &m.topic, &m.event, &m.payload}
err := json.Unmarshal(data, &slice)
if err != nil {
return err
}
*msg = m
return nil
}

View file

@ -0,0 +1,99 @@
package gophx
import (
"context"
"fmt"
"nhooyr.io/websocket"
)
type Socket struct {
sendChan chan []byte
websocket *websocket.Conn
channels map[string]*Channel
}
func Connect(ctx context.Context) (*Socket, error) {
serverUrl := "ws://localhost:4000/agent/websocket?vsn=2.0.0"
c, _, err := websocket.Dial(ctx, serverUrl, nil)
if err != nil {
return nil, err
}
socket := Socket{
sendChan: make(chan []byte),
channels: make(map[string]*Channel),
websocket: c,
}
go socket.readMessages(ctx)
go writeMessages(ctx, socket.sendChan, c)
return &socket, nil
}
func (s *Socket) addChannel(ch *Channel) {
s.channels[ch.topic] = ch
}
func (s *Socket) send(data []byte) {
s.sendChan <- data
}
func (s *Socket) readMessages(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
typ, data, err := s.websocket.Read(ctx)
if err != nil {
// TODO: try to reconnect
fmt.Println("socket closed?", err)
return
}
switch typ {
case websocket.MessageBinary:
fmt.Println("received binary", data)
case websocket.MessageText:
msg := message{}
err := deserializeJSON(data, &msg)
if err != nil {
fmt.Println(err)
}
s.sendToChannel(msg)
}
}
}
}
func (s *Socket) sendToChannel(msg message) {
ch, ok := s.channels[msg.topic]
if !ok {
return
}
ch.handleMessage(msg)
}
func writeMessages(ctx context.Context, sendChan chan []byte, ws *websocket.Conn) {
for {
select {
case message, ok := <-sendChan:
if !ok {
fmt.Println("sending channel closed")
return
}
err := ws.Write(ctx, websocket.MessageText, message)
if err != nil {
fmt.Println(err)
}
}
}
}