diff --git a/signalling/src/handlers/mod.rs b/signalling/src/handlers/mod.rs index 71c9c3da..64d59af8 100644 --- a/signalling/src/handlers/mod.rs +++ b/signalling/src/handlers/mod.rs @@ -120,7 +120,7 @@ impl Handler { .sessions .iter() .filter_map(|(session_id, session)| { - if session.producer == peer_id { + if session.producer == peer_id || session.consumer == peer_id { Some(session_id.clone()) } else { None @@ -652,6 +652,57 @@ mod tests { ); } + #[async_std::test] + async fn test_disconnect_consumer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + new_peer(&mut tx, &mut handler, "consumer").await; + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + + let _ = handler.next().await.unwrap(); + + tx.send(("consumer".to_string(), None)).await.unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id + }) + ); + } + #[async_std::test] async fn test_end_session_producer() { let (mut tx, rx) = mpsc::unbounded();