diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 86019ab9..74c62be8 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -28,6 +28,35 @@ pub enum RegisteredMessage { meta: Option, }, } + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(tag = "peerType")] +#[serde(rename_all = "camelCase")] +/// Confirms registration +pub enum UnregisteredMessage { + /// Unregistered as a producer + #[serde(rename_all = "camelCase")] + Producer { + peer_id: String, + #[serde(default)] + meta: Option, + }, + /// Unregistered as a consumer + #[serde(rename_all = "camelCase")] + Consumer { + peer_id: String, + #[serde(default)] + meta: Option, + }, + /// Unregistered as a listener + #[serde(rename_all = "camelCase")] + Listener { + peer_id: String, + #[serde(default)] + meta: Option, + }, +} + #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Peer { @@ -43,6 +72,8 @@ pub struct Peer { pub enum OutgoingMessage { /// Confirms registration Registered(RegisteredMessage), + /// Confirms registration + Unregistered(UnregisteredMessage), /// Notifies listeners that a producer was registered #[serde(rename_all = "camelCase")] ProducerAdded { @@ -96,6 +127,22 @@ pub enum RegisterMessage { }, } +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "peerType")] +#[serde(rename_all = "camelCase")] +/// Register with a peer type +pub enum UnregisterMessage { + /// Unregister a producer + #[serde(rename_all = "camelCase")] + Producer, + /// Unregister a consumer + #[serde(rename_all = "camelCase")] + Consumer, + /// Unregister a listener + #[serde(rename_all = "camelCase")] + Listener, +} + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] /// Ask the server to start a session with a producer peer @@ -162,6 +209,8 @@ pub struct EndSessionMessage { pub enum IncomingMessage { /// Register as a peer type Register(RegisterMessage), + /// Unregister as a peer type + Unregister(UnregisterMessage), /// Start a session with a producer peer StartSession(StartSessionMessage), /// End an existing session diff --git a/signalling/src/handlers/mod.rs b/signalling/src/handlers/mod.rs index c33d9f7c..c17b96e4 100644 --- a/signalling/src/handlers/mod.rs +++ b/signalling/src/handlers/mod.rs @@ -51,6 +51,58 @@ impl Handler { p::RegisterMessage::Consumer { meta } => self.register_consumer(peer_id, meta), p::RegisterMessage::Listener { meta } => self.register_listener(peer_id, meta), }, + p::IncomingMessage::Unregister(message) => { + let meta = self.meta.get(peer_id).unwrap_or_else(|| &None).clone(); + let answer = match message { + p::UnregisterMessage::Producer => { + self.remove_producer_peer(peer_id); + p::UnregisteredMessage::Producer { + peer_id: peer_id.into(), + meta, + } + } + p::UnregisterMessage::Consumer => { + self.remove_consumer_peer(peer_id); + p::UnregisteredMessage::Consumer { + peer_id: peer_id.into(), + meta, + } + } + p::UnregisterMessage::Listener => { + self.remove_listener_peer(peer_id); + p::UnregisteredMessage::Listener { + peer_id: peer_id.into(), + meta, + } + } + }; + + self.items.push_back(( + peer_id.into(), + p::OutgoingMessage::Unregistered(answer.clone()), + )); + + // We don't notify listeners about listeners activity + match message { + p::UnregisterMessage::Producer | p::UnregisterMessage::Consumer => { + let mut messages = self + .listeners + .iter() + .map(|listener| { + ( + listener.to_string(), + p::OutgoingMessage::Unregistered(answer.clone()), + ) + }) + .collect::>(); + + self.items.append(&mut messages); + } + _ => (), + } + + Ok(()) + } p::IncomingMessage::StartSession(message) => { self.start_session(&message.peer_id, peer_id) } @@ -78,13 +130,23 @@ impl Handler { } } + #[instrument(level = "debug", skip(self))] + /// Remove a peer, this can cause sessions to be ended + fn remove_listener_peer(&mut self, peer_id: &str) { + self.listeners.remove(peer_id); + } + #[instrument(level = "debug", skip(self))] /// Remove a peer, this can cause sessions to be ended fn remove_peer(&mut self, peer_id: &str) { info!(peer_id = %peer_id, "removing peer"); - self.listeners.remove(peer_id); + self.remove_listener_peer(peer_id); + self.remove_producer_peer(peer_id); + self.remove_consumer_peer(peer_id); + } + fn remove_producer_peer(&mut self, peer_id: &str) { if let Some(consumers) = self.producers.remove(peer_id) { for consumer_id in &consumers { info!(producer_id=%peer_id, consumer_id=%consumer_id, "ended session"); @@ -110,7 +172,9 @@ impl Handler { )); } } + } + fn remove_consumer_peer(&mut self, peer_id: &str) { if let Some(Some(producer_id)) = self.consumers.remove(peer_id) { info!(producer_id=%producer_id, consumer_id=%peer_id, "ended session"); @@ -1122,6 +1186,174 @@ mod tests { ); } + #[async_std::test] + async fn test_unregistering() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { + meta: Default::default(), + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { + meta: Default::default(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::StartSession { + peer_id: "consumer".to_string() + } + ); + + let message = p::IncomingMessage::Unregister(p::UnregisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession { + peer_id: "producer".to_string() + } + ); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Unregistered(p::UnregisteredMessage::Producer { + peer_id: "producer".into(), + meta: Default::default() + }) + ); + } + + #[async_std::test] + async fn test_unregistering_with_listenners() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Listener { + meta: Default::default(), + }); + tx.send(("listener".to_string(), Some(message))) + .await + .unwrap(); + let (l, _) = handler.next().await.unwrap(); + assert_eq!(l, "listener"); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { + meta: Some(json!({"some": "meta"})), + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::ProducerAdded { + peer_id: "producer".to_string(), + meta: Some(json!({"some": "meta"})), + } + ); + + let (peer_id, _msg) = handler.next().await.unwrap(); + assert_eq!(peer_id, "producer"); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { + meta: Default::default(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, _msg) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::StartSession { + peer_id: "consumer".to_string() + } + ); + + let message = p::IncomingMessage::Unregister(p::UnregisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession { + peer_id: "producer".to_string() + } + ); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::ProducerRemoved { + peer_id: "producer".into(), + meta: Some(json!({"some": "meta"})) + } + ); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Unregistered(p::UnregisteredMessage::Producer { + peer_id: "producer".into(), + meta: Some(json!({"some": "meta"})) + }) + ); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::Unregistered(p::UnregisteredMessage::Producer { + peer_id: "producer".into(), + meta: Some(json!({"some": "meta"})), + }) + ); + } + #[async_std::test] async fn test_start_session_no_consumer() { let (mut tx, rx) = mpsc::unbounded();