diff --git a/Cargo.lock b/Cargo.lock index dc94000..da1e4c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,12 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "async-nats" version = "0.33.0" @@ -99,6 +105,30 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "binrw" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "173901312e9850391d4d7c1318c4e099fdc037d61870fca427429830efdb4e5f" +dependencies = [ + "array-init", + "binrw_derive", + "bytemuck", +] + +[[package]] +name = "binrw_derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb515fdd6f8d3a357c8e19b8ec59ef53880807864329b1cb1cba5c53bf76557e" +dependencies = [ + "either", + "owo-colors", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -120,6 +150,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bytemuck" +version = "1.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2ef034f05691a48569bd920a96c81b9d91bbad1ab5ac7c4616c1f6ef36cb79f" + [[package]] name = "byteorder" version = "1.5.0" @@ -225,7 +261,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -287,6 +323,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + [[package]] name = "errno" version = "0.3.8" @@ -610,6 +652,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "parking_lot" version = "0.12.1" @@ -665,7 +713,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -723,6 +771,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats", + "binrw", "bytes", "chrono", "futures", @@ -980,7 +1029,7 @@ checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1011,7 +1060,7 @@ checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1123,6 +1172,17 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.48" @@ -1165,7 +1225,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1249,7 +1309,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1329,7 +1389,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 6c80ade..6bcc99e 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0.79" async-nats = "0.33.0" +binrw = "0.13.3" bytes = "1.5.0" chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] } futures = { version = "0.3.30", default-features = false, features = ["std"] } diff --git a/agent/src/channels/messages.rs b/agent/src/channels/messages.rs new file mode 100644 index 0000000..235be37 --- /dev/null +++ b/agent/src/channels/messages.rs @@ -0,0 +1,83 @@ +use binrw::helpers::until_eof; +use binrw::prelude::*; + +#[derive(Debug, BinRead)] +enum BinaryMessage { + #[br(magic = 0u8)] + Push(PushBinaryMessage), + #[br(magic = 1u8)] + Reply(ReplyBinaryMessage), + #[br(magic = 2u8)] + Broadcast(BroadcastBinaryMessage), +} + +#[derive(Debug, BinRead)] +struct PushBinaryMessage { + ref_size: u8, + join_ref_size: u8, + _topic_size: u8, + _event_size: u8, + #[br(count = ref_size)] + r#ref: Vec, + #[br(count = join_ref_size)] + join_ref: Vec, + #[br(count = _topic_size)] + topic: Vec, + #[br(count = _event_size)] + event: Vec, + #[br(parse_with = until_eof)] + payload: Vec, +} + +#[derive(Debug, BinRead)] +struct ReplyBinaryMessage { + join_ref_size: u8, + ref_size: u8, + _topic_size: u8, + _status_size: u8, + #[br(count = join_ref_size)] + join_ref: Vec, + #[br(count = ref_size)] + r#ref: Vec, + #[br(count = _topic_size)] + topic: Vec, + #[br(count = _status_size)] + status: Vec, + #[br(parse_with = until_eof)] + payload: Vec, +} + +#[derive(Debug, BinRead)] +struct BroadcastBinaryMessage { + _topic_size: u8, + _event_size: u8, + #[br(count = _topic_size)] + topic: Vec, + #[br(count = _event_size)] + event: Vec, + #[br(parse_with = until_eof)] + payload: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_binary() { + // broadcast!("agents:1", "subject", {:binary, "hello"}) + let raw_msg = vec![ + 2, 8, 7, 97, 103, 101, 110, 116, 115, 58, 49, 115, 117, 98, 106, 101, 99, 116, 104, + 101, 108, 108, 111, + ]; + + parse_binary_message(&raw_msg[..]).unwrap(); + } + + #[test] + fn parses_json() { + // broadcast!("agents:1", "subject", "hello") + let raw_msg = r#"[null, "0", "agents:1", "subject", "hello"]"#; + parse_json_message(raw_msg).unwrap(); + } +} diff --git a/agent/src/channels/mod.rs b/agent/src/channels/mod.rs index 3eeef5e..5f0ac08 100644 --- a/agent/src/channels/mod.rs +++ b/agent/src/channels/mod.rs @@ -1,6 +1,137 @@ +mod messages; + +use std::marker::PhantomData; + +use futures::{SinkExt, StreamExt}; +// use futures::{SinkExt, StreamExt}; +use serde::{ + de::{self, Visitor}, + ser::SerializeTuple, + Deserialize, Serialize, +}; +// use tokio_tungstenite::tungstenite::Message as WSMessage; + +#[derive(Debug)] +struct Message { + join_ref: Option, + r#ref: String, + topic_name: String, + event_name: String, + payload: T, +} + +impl Serialize for Message +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut msg = serializer.serialize_tuple(5)?; + msg.serialize_element(&self.join_ref)?; + msg.serialize_element(&self.r#ref)?; + msg.serialize_element(&self.topic_name)?; + msg.serialize_element(&self.event_name)?; + msg.serialize_element(&self.payload)?; + msg.end() + } +} + +impl<'de, T> Deserialize<'de> for Message +where + T: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct TupleVisitor(PhantomData); + + impl<'de, T> Visitor<'de> for TupleVisitor + where + T: Deserialize<'de>, + { + type Value = Message; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a sequence of values") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + Ok(Message { + join_ref: seq.next_element()?.ok_or_else(|| { + de::Error::custom("missing join_ref from sequence at position 0") + })?, + r#ref: seq.next_element()?.ok_or_else(|| { + de::Error::custom("missing `ref` from sequence at position 1") + })?, + topic_name: seq.next_element()?.ok_or_else(|| { + de::Error::custom("missing `topic_name` from sequence at position 2") + })?, + event_name: seq.next_element()?.ok_or_else(|| { + de::Error::custom("missing `event_name` from sequence at position 3") + })?, + payload: seq.next_element()?.ok_or_else(|| { + de::Error::custom("missing `payload` from sequence at position 4") + })?, + }) + } + } + + deserializer.deserialize_tuple(5, TupleVisitor(PhantomData)) + } +} + pub async fn testchannel() -> anyhow::Result<()> { let (ws_stream, _) = tokio_tungstenite::connect_async("ws://localhost:4000/agent/websocket?vsn=2.0.0").await?; + let (mut send, mut recv) = ws_stream.split(); + + let msg = Message { + join_ref: Some(String::from("0")), + r#ref: String::from("0"), + topic_name: String::from("agents:1"), + event_name: String::from("phx_join"), + payload: String::from("foo"), + }; + + tokio::spawn(async move { + while let Some(msg) = recv.next().await { + let msg = msg.unwrap(); + match msg { + tokio_tungstenite::tungstenite::Message::Binary(bytes) => { + let _ = self::messages::parse_binary_message(&bytes[..]).unwrap(); + } + tokio_tungstenite::tungstenite::Message::Text(text) => { + let _ = self::messages::parse_json_message(&text).unwrap(); + } + _ => todo!(), + } + } + }); + + send.send(tokio_tungstenite::tungstenite::Message::Text( + serde_json::to_string(&msg).unwrap(), + )) + .await?; + + // let msg = serde_json::to_string(&Message { + // join_ref: Some(String::from("0")), + // r#ref: "1", + // topic_name: String::from("agents:1"), + // event_name: String::from("shout"), + // payload: Some(String::from("foo")), + // })?; + + // send.send(WSMessage::Text(msg)).await?; + + // let msg = Message::heartbeat().to_json()?; + // send.send(WSMessage::Text(msg)).await?; + Ok(()) } diff --git a/app/lib/prymn_web/channels/agent_channel.ex b/app/lib/prymn_web/channels/agent_channel.ex new file mode 100644 index 0000000..08c4a54 --- /dev/null +++ b/app/lib/prymn_web/channels/agent_channel.ex @@ -0,0 +1,23 @@ +defmodule PrymnWeb.AgentChannel do + use PrymnWeb, :channel + + @impl true + def join("agents:" <> agent_id, payload, socket) do + IO.inspect(agent_id) + IO.inspect(payload) + + {:ok, socket} + end + + # Channels can be used in a request/response fashion + # by sending replies to requests from the client + @impl true + def handle_in("ping", payload, socket) do + {:reply, {:ok, payload}, socket} + end + + def handle_in("binary", {:binary, data}, socket) do + dbg(data) + {:noreply, socket} + end +end diff --git a/app/lib/prymn_web/channels/agent_socket.ex b/app/lib/prymn_web/channels/agent_socket.ex index bda644b..6a1ff3f 100644 --- a/app/lib/prymn_web/channels/agent_socket.ex +++ b/app/lib/prymn_web/channels/agent_socket.ex @@ -6,18 +6,7 @@ defmodule PrymnWeb.AgentSocket do # It's possible to control the websocket connection and # assign values that can be accessed by your channel topics. - ## Channels - # Uncomment the following line to define a "room:*" topic - # pointing to the `PrymnWeb.RoomChannel`: - # - # channel "room:*", PrymnWeb.RoomChannel - # - # To create a channel file, use the mix task: - # - # mix phx.gen.channel Room - # - # See the [`Channels guide`](https://hexdocs.pm/phoenix/channels.html) - # for further details. + channel "agents:*", PrymnWeb.AgentChannel # Socket params are passed from the client and can # be used to verify and authenticate a user. After