This commit is contained in:
Nikos Papadakis 2024-02-15 11:21:57 +02:00
parent 6846e96f86
commit 1a5225ef6f
Signed by untrusted user who does not match committer: nikos
GPG key ID: 78871F9905ADFF02
6 changed files with 306 additions and 19 deletions

74
Cargo.lock generated
View file

@ -32,6 +32,12 @@ version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
[[package]]
name = "array-init"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc"
[[package]] [[package]]
name = "async-nats" name = "async-nats"
version = "0.33.0" version = "0.33.0"
@ -99,6 +105,30 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "binrw"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "173901312e9850391d4d7c1318c4e099fdc037d61870fca427429830efdb4e5f"
dependencies = [
"array-init",
"binrw_derive",
"bytemuck",
]
[[package]]
name = "binrw_derive"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb515fdd6f8d3a357c8e19b8ec59ef53880807864329b1cb1cba5c53bf76557e"
dependencies = [
"either",
"owo-colors",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
@ -120,6 +150,12 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "bytemuck"
version = "1.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2ef034f05691a48569bd920a96c81b9d91bbad1ab5ac7c4616c1f6ef36cb79f"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.5.0" version = "1.5.0"
@ -225,7 +261,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]
@ -287,6 +323,12 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "either"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
[[package]] [[package]]
name = "errno" name = "errno"
version = "0.3.8" version = "0.3.8"
@ -610,6 +652,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "owo-colors"
version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"
@ -665,7 +713,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]
@ -723,6 +771,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",
"binrw",
"bytes", "bytes",
"chrono", "chrono",
"futures", "futures",
@ -980,7 +1029,7 @@ checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]
@ -1011,7 +1060,7 @@ checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]
@ -1123,6 +1172,17 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.48" version = "2.0.48"
@ -1165,7 +1225,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]
@ -1249,7 +1309,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]
@ -1329,7 +1389,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.48",
] ]
[[package]] [[package]]

View file

@ -6,6 +6,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.79" anyhow = "1.0.79"
async-nats = "0.33.0" async-nats = "0.33.0"
binrw = "0.13.3"
bytes = "1.5.0" bytes = "1.5.0"
chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] } chrono = { version = "0.4.33", default-features = false, features = ["now", "serde"] }
futures = { version = "0.3.30", default-features = false, features = ["std"] } futures = { version = "0.3.30", default-features = false, features = ["std"] }

View file

@ -0,0 +1,83 @@
use binrw::helpers::until_eof;
use binrw::prelude::*;
#[derive(Debug, BinRead)]
enum BinaryMessage {
#[br(magic = 0u8)]
Push(PushBinaryMessage),
#[br(magic = 1u8)]
Reply(ReplyBinaryMessage),
#[br(magic = 2u8)]
Broadcast(BroadcastBinaryMessage),
}
#[derive(Debug, BinRead)]
struct PushBinaryMessage {
ref_size: u8,
join_ref_size: u8,
_topic_size: u8,
_event_size: u8,
#[br(count = ref_size)]
r#ref: Vec<u8>,
#[br(count = join_ref_size)]
join_ref: Vec<u8>,
#[br(count = _topic_size)]
topic: Vec<u8>,
#[br(count = _event_size)]
event: Vec<u8>,
#[br(parse_with = until_eof)]
payload: Vec<u8>,
}
#[derive(Debug, BinRead)]
struct ReplyBinaryMessage {
join_ref_size: u8,
ref_size: u8,
_topic_size: u8,
_status_size: u8,
#[br(count = join_ref_size)]
join_ref: Vec<u8>,
#[br(count = ref_size)]
r#ref: Vec<u8>,
#[br(count = _topic_size)]
topic: Vec<u8>,
#[br(count = _status_size)]
status: Vec<u8>,
#[br(parse_with = until_eof)]
payload: Vec<u8>,
}
#[derive(Debug, BinRead)]
struct BroadcastBinaryMessage {
_topic_size: u8,
_event_size: u8,
#[br(count = _topic_size)]
topic: Vec<u8>,
#[br(count = _event_size)]
event: Vec<u8>,
#[br(parse_with = until_eof)]
payload: Vec<u8>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_binary() {
// broadcast!("agents:1", "subject", {:binary, "hello"})
let raw_msg = vec![
2, 8, 7, 97, 103, 101, 110, 116, 115, 58, 49, 115, 117, 98, 106, 101, 99, 116, 104,
101, 108, 108, 111,
];
parse_binary_message(&raw_msg[..]).unwrap();
}
#[test]
fn parses_json() {
// broadcast!("agents:1", "subject", "hello")
let raw_msg = r#"[null, "0", "agents:1", "subject", "hello"]"#;
parse_json_message(raw_msg).unwrap();
}
}

View file

@ -1,6 +1,137 @@
mod messages;
use std::marker::PhantomData;
use futures::{SinkExt, StreamExt};
// use futures::{SinkExt, StreamExt};
use serde::{
de::{self, Visitor},
ser::SerializeTuple,
Deserialize, Serialize,
};
// use tokio_tungstenite::tungstenite::Message as WSMessage;
#[derive(Debug)]
struct Message<T> {
join_ref: Option<String>,
r#ref: String,
topic_name: String,
event_name: String,
payload: T,
}
impl<T> Serialize for Message<T>
where
T: Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut msg = serializer.serialize_tuple(5)?;
msg.serialize_element(&self.join_ref)?;
msg.serialize_element(&self.r#ref)?;
msg.serialize_element(&self.topic_name)?;
msg.serialize_element(&self.event_name)?;
msg.serialize_element(&self.payload)?;
msg.end()
}
}
impl<'de, T> Deserialize<'de> for Message<T>
where
T: Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct TupleVisitor<T>(PhantomData<T>);
impl<'de, T> Visitor<'de> for TupleVisitor<T>
where
T: Deserialize<'de>,
{
type Value = Message<T>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a sequence of values")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
Ok(Message {
join_ref: seq.next_element()?.ok_or_else(|| {
de::Error::custom("missing join_ref from sequence at position 0")
})?,
r#ref: seq.next_element()?.ok_or_else(|| {
de::Error::custom("missing `ref` from sequence at position 1")
})?,
topic_name: seq.next_element()?.ok_or_else(|| {
de::Error::custom("missing `topic_name` from sequence at position 2")
})?,
event_name: seq.next_element()?.ok_or_else(|| {
de::Error::custom("missing `event_name` from sequence at position 3")
})?,
payload: seq.next_element()?.ok_or_else(|| {
de::Error::custom("missing `payload` from sequence at position 4")
})?,
})
}
}
deserializer.deserialize_tuple(5, TupleVisitor(PhantomData))
}
}
pub async fn testchannel() -> anyhow::Result<()> { pub async fn testchannel() -> anyhow::Result<()> {
let (ws_stream, _) = let (ws_stream, _) =
tokio_tungstenite::connect_async("ws://localhost:4000/agent/websocket?vsn=2.0.0").await?; tokio_tungstenite::connect_async("ws://localhost:4000/agent/websocket?vsn=2.0.0").await?;
let (mut send, mut recv) = ws_stream.split();
let msg = Message {
join_ref: Some(String::from("0")),
r#ref: String::from("0"),
topic_name: String::from("agents:1"),
event_name: String::from("phx_join"),
payload: String::from("foo"),
};
tokio::spawn(async move {
while let Some(msg) = recv.next().await {
let msg = msg.unwrap();
match msg {
tokio_tungstenite::tungstenite::Message::Binary(bytes) => {
let _ = self::messages::parse_binary_message(&bytes[..]).unwrap();
}
tokio_tungstenite::tungstenite::Message::Text(text) => {
let _ = self::messages::parse_json_message(&text).unwrap();
}
_ => todo!(),
}
}
});
send.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&msg).unwrap(),
))
.await?;
// let msg = serde_json::to_string(&Message {
// join_ref: Some(String::from("0")),
// r#ref: "1",
// topic_name: String::from("agents:1"),
// event_name: String::from("shout"),
// payload: Some(String::from("foo")),
// })?;
// send.send(WSMessage::Text(msg)).await?;
// let msg = Message::heartbeat().to_json()?;
// send.send(WSMessage::Text(msg)).await?;
Ok(()) Ok(())
} }

View file

@ -0,0 +1,23 @@
defmodule PrymnWeb.AgentChannel do
use PrymnWeb, :channel
@impl true
def join("agents:" <> agent_id, payload, socket) do
IO.inspect(agent_id)
IO.inspect(payload)
{:ok, socket}
end
# Channels can be used in a request/response fashion
# by sending replies to requests from the client
@impl true
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
def handle_in("binary", {:binary, data}, socket) do
dbg(data)
{:noreply, socket}
end
end

View file

@ -6,18 +6,7 @@ defmodule PrymnWeb.AgentSocket do
# It's possible to control the websocket connection and # It's possible to control the websocket connection and
# assign values that can be accessed by your channel topics. # assign values that can be accessed by your channel topics.
## Channels channel "agents:*", PrymnWeb.AgentChannel
# Uncomment the following line to define a "room:*" topic
# pointing to the `PrymnWeb.RoomChannel`:
#
# channel "room:*", PrymnWeb.RoomChannel
#
# To create a channel file, use the mix task:
#
# mix phx.gen.channel Room
#
# See the [`Channels guide`](https://hexdocs.pm/phoenix/channels.html)
# for further details.
# Socket params are passed from the client and can # Socket params are passed from the client and can
# be used to verify and authenticate a user. After # be used to verify and authenticate a user. After