signalling: track consumer and producer sessions in dedicated HashMaps

This avoids full traversals of the sessions HashMap.

Also fixes accidental session teardown when a producer is stopped while
also in a session as a consumer.
This commit is contained in:
Mathieu Duponchelle 2022-09-30 23:23:05 +02:00
parent 61c0fdcb5d
commit ef7cf4dd98

View file

@ -4,7 +4,7 @@ use futures::prelude::*;
use futures::ready; use futures::ready;
use p::PeerStatus; use p::PeerStatus;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context as TaskContext, Poll}; use std::task::{Context as TaskContext, Poll};
use tracing::log::error; use tracing::log::error;
@ -40,6 +40,8 @@ pin_project! {
items: VecDeque<(String, p::OutgoingMessage)>, items: VecDeque<(String, p::OutgoingMessage)>,
peers: HashMap<PeerId, PeerStatus>, peers: HashMap<PeerId, PeerStatus>,
sessions: HashMap<String, Session>, sessions: HashMap<String, Session>,
consumer_sessions: HashMap<String, HashSet<String>>,
producer_sessions: HashMap<String, HashSet<String>>,
} }
} }
@ -54,6 +56,8 @@ impl Handler {
items: VecDeque::new(), items: VecDeque::new(),
peers: Default::default(), peers: Default::default(),
sessions: Default::default(), sessions: Default::default(),
consumer_sessions: Default::default(),
producer_sessions: Default::default(),
} }
} }
@ -116,23 +120,23 @@ impl Handler {
} }
fn stop_producer(&mut self, peer_id: &str) { fn stop_producer(&mut self, peer_id: &str) {
let sessions_to_end = self if let Some(session_ids) = self.producer_sessions.remove(peer_id) {
.sessions for session_id in session_ids {
.iter() if let Err(e) = self.end_session(peer_id, &session_id) {
.filter_map(|(session_id, session)| {
if session.producer == peer_id || session.consumer == peer_id {
Some(session_id.clone())
} else {
None
}
})
.collect::<Vec<String>>();
sessions_to_end.iter().for_each(|session_id| {
if let Err(e) = self.end_session(peer_id, session_id) {
error!("Could not end session {session_id}: {e:?}"); error!("Could not end session {session_id}: {e:?}");
} }
}); }
}
}
fn stop_consumer(&mut self, peer_id: &str) {
if let Some(session_ids) = self.consumer_sessions.remove(peer_id) {
for session_id in session_ids {
if let Err(e) = self.end_session(peer_id, &session_id) {
error!("Could not end session {session_id}: {e:?}");
}
}
}
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
@ -145,6 +149,7 @@ impl Handler {
}; };
self.stop_producer(peer_id); self.stop_producer(peer_id);
self.stop_consumer(peer_id);
for (id, p) in self.peers.iter() { for (id, p) in self.peers.iter() {
if !p.listening() { if !p.listening() {
@ -168,6 +173,18 @@ impl Handler {
.remove(session_id) .remove(session_id)
.with_context(|| format!("Session {session_id} doesn't exist"))?; .with_context(|| format!("Session {session_id} doesn't exist"))?;
self.consumer_sessions
.entry(session.consumer.clone())
.and_modify(|sessions| {
sessions.remove(session_id);
});
self.producer_sessions
.entry(session.producer.clone())
.and_modify(|sessions| {
sessions.remove(session_id);
});
self.items.push_back(( self.items.push_back((
session.other_peer_id(peer_id)?.to_string(), session.other_peer_id(peer_id)?.to_string(),
p::OutgoingMessage::EndSession(p::EndSessionMessage { p::OutgoingMessage::EndSession(p::EndSessionMessage {
@ -272,6 +289,14 @@ impl Handler {
producer: producer_id.to_string(), producer: producer_id.to_string(),
}, },
); );
self.consumer_sessions
.entry(consumer_id.to_string())
.or_insert(HashSet::new())
.insert(session_id.clone());
self.producer_sessions
.entry(producer_id.to_string())
.or_insert(HashSet::new())
.insert(session_id.clone());
self.items.push_back(( self.items.push_back((
consumer_id.to_string(), consumer_id.to_string(),
p::OutgoingMessage::SessionStarted { p::OutgoingMessage::SessionStarted {