Refactor signalling protocol around a Session ID

This allows for more use cases to be handled like having several session
between 2 peers, and it simplifies the code a bit and makes the protocol
sensibly cleaner

Webrtcsink has been refactored a bit to take the new concept into
account
This commit is contained in:
Thibault Saunier 2022-07-14 14:25:12 -04:00 committed by Mathieu Duponchelle
parent f243f5fe5c
commit dbcbfef8c7
7 changed files with 661 additions and 593 deletions

View file

@ -133,23 +133,30 @@ impl Signaller {
);
}
p::OutgoingMessage::Registered(_) => unreachable!(),
p::OutgoingMessage::StartSession { peer_id } => {
if let Err(err) = element.add_consumer(&peer_id) {
p::OutgoingMessage::StartSession {
session_id,
peer_id,
} => {
if let Err(err) =
element.start_session(&session_id, &peer_id)
{
gst::warning!(CAT, obj: &element, "{}", err);
}
}
p::OutgoingMessage::EndSession { peer_id } => {
if let Err(err) = element.remove_consumer(&peer_id) {
p::OutgoingMessage::EndSession(session_info) => {
if let Err(err) =
element.end_session(&session_info.session_id)
{
gst::warning!(CAT, obj: &element, "{}", err);
}
}
p::OutgoingMessage::Peer(p::PeerMessage {
peer_id,
session_id,
peer_message,
}) => match peer_message {
p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => {
if let Err(err) = element.handle_sdp(
&peer_id,
&session_id,
&gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
gst_sdp::SDPMessage::parse_buffer(
@ -175,7 +182,7 @@ impl Signaller {
sdp_m_line_index,
} => {
if let Err(err) = element.handle_ice(
&peer_id,
&session_id,
Some(sdp_m_line_index),
None,
&candidate,
@ -254,13 +261,13 @@ impl Signaller {
pub fn handle_sdp(
&self,
element: &WebRTCSink,
peer_id: &str,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) {
let state = self.state.lock().unwrap();
let msg = p::IncomingMessage::Peer(p::PeerMessage {
peer_id: peer_id.to_string(),
session_id: session_id.to_string(),
peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}),
@ -281,7 +288,7 @@ impl Signaller {
pub fn handle_ice(
&self,
element: &WebRTCSink,
peer_id: &str,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
@ -289,7 +296,7 @@ impl Signaller {
let state = self.state.lock().unwrap();
let msg = p::IncomingMessage::Peer(p::PeerMessage {
peer_id: peer_id.to_string(),
session_id: session_id.to_string(),
peer_message: p::PeerMessageInner::Ice {
candidate: candidate.to_string(),
sdp_m_line_index: sdp_m_line_index.unwrap(),
@ -331,17 +338,17 @@ impl Signaller {
}
}
pub fn consumer_removed(&self, element: &WebRTCSink, peer_id: &str) {
gst::debug!(CAT, obj: element, "Signalling consumer {} removed", peer_id);
pub fn end_session(&self, element: &WebRTCSink, session_id: &str) {
gst::debug!(CAT, obj: element, "Signalling session {} ended", session_id);
let state = self.state.lock().unwrap();
let peer_id = peer_id.to_string();
let session_id = session_id.to_string();
let element = element.downgrade();
if let Some(mut sender) = state.websocket_sender.clone() {
task::spawn(async move {
if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
peer_id: peer_id.to_string(),
session_id: session_id.to_string(),
}))
.await
{

View file

@ -34,13 +34,13 @@ impl Signallable for Signaller {
fn handle_ice(
&mut self,
element: &WebRTCSink,
peer_id: &str,
session_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
signaller.handle_ice(element, peer_id, candidate, sdp_mline_index, sdp_mid);
signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid);
Ok(())
}
@ -49,9 +49,9 @@ impl Signallable for Signaller {
signaller.stop(element);
}
fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str) {
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) {
let signaller = imp::Signaller::from_instance(self);
signaller.consumer_removed(element, peer_id);
signaller.end_session(element, session_id);
}
}

View file

@ -125,12 +125,14 @@ pub struct VideoEncoder {
filter: gst::Element,
halved_framerate: gst::Fraction,
video_info: gst_video::VideoInfo,
peer_id: String,
session_id: String,
mitigation_mode: WebRTCSinkMitigationMode,
pub transceiver: gst_webrtc::WebRTCRTPTransceiver,
}
struct Consumer {
struct Session {
id: String,
pipeline: gst::Pipeline,
webrtcbin: gst::Element,
rtprtxsend: Option<gst::Element>,
@ -165,7 +167,7 @@ struct NavigationEvent {
struct State {
signaller: Box<dyn super::SignallableObject>,
signaller_state: SignallerState,
consumers: HashMap<String, Consumer>,
sessions: HashMap<String, Session>,
codecs: BTreeMap<i32, Codec>,
/// Used to abort codec discovery
codecs_abort_handle: Option<futures::future::AbortHandle>,
@ -275,7 +277,7 @@ impl Default for State {
Self {
signaller: Box::new(signaller),
signaller_state: SignallerState::Stopped,
consumers: HashMap::new(),
sessions: HashMap::new(),
codecs: BTreeMap::new(),
codecs_abort_handle: None,
codecs_done_receiver: None,
@ -541,7 +543,7 @@ impl VideoEncoder {
filter,
halved_framerate,
video_info,
peer_id: peer_id.to_string(),
session_id: peer_id.to_string(),
mitigation_mode: WebRTCSinkMitigationMode::NONE,
transceiver,
}
@ -632,8 +634,8 @@ impl VideoEncoder {
gst::log!(
CAT,
obj: element,
"consumer {}: setting bitrate {} and caps {} on encoder {:?}",
self.peer_id,
"session {}: setting bitrate {} and caps {} on encoder {:?}",
self.session_id,
bitrate,
caps,
self.element
@ -657,39 +659,39 @@ impl VideoEncoder {
}
impl State {
fn finalize_consumer(
fn finalize_session(
&mut self,
element: &super::WebRTCSink,
consumer: &mut Consumer,
session: &mut Session,
signal: bool,
) {
consumer.pipeline.debug_to_dot_file_with_ts(
session.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!("removing-peer-{}-", consumer.peer_id,),
format!("removing-peer-{}-", session.peer_id,),
);
for ssrc in consumer.webrtc_pads.keys() {
consumer.links.remove(ssrc);
for ssrc in session.webrtc_pads.keys() {
session.links.remove(ssrc);
}
consumer.pipeline.call_async(|pipeline| {
session.pipeline.call_async(|pipeline| {
let _ = pipeline.set_state(gst::State::Null);
});
if signal {
self.signaller.consumer_removed(element, &consumer.peer_id);
self.signaller.session_ended(element, &session.peer_id);
}
}
fn remove_consumer(
fn end_session(
&mut self,
element: &super::WebRTCSink,
peer_id: &str,
session_id: &str,
signal: bool,
) -> Option<Consumer> {
if let Some(mut consumer) = self.consumers.remove(peer_id) {
self.finalize_consumer(element, &mut consumer, signal);
Some(consumer)
) -> Option<Session> {
if let Some(mut session) = self.sessions.remove(session_id) {
self.finalize_session(element, &mut session, signal);
Some(session)
} else {
None
}
@ -723,8 +725,9 @@ impl State {
}
}
impl Consumer {
impl Session {
fn new(
id: String,
pipeline: gst::Pipeline,
webrtcbin: gst::Element,
peer_id: String,
@ -732,6 +735,7 @@ impl Consumer {
cc_info: CCInfo,
) -> Self {
Self {
id,
pipeline,
webrtcbin,
peer_id,
@ -1144,10 +1148,10 @@ impl WebRTCSink {
let mut state = self.state.lock().unwrap();
let consumer_ids: Vec<_> = state.consumers.keys().map(|k| k.to_owned()).collect();
let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect();
for id in consumer_ids {
state.remove_consumer(element, &id, true);
for id in session_ids {
state.end_session(element, &id, true);
}
state
@ -1197,39 +1201,39 @@ impl WebRTCSink {
&self,
element: &super::WebRTCSink,
offer: gst_webrtc::WebRTCSessionDescription,
peer_id: &str,
session_id: &str,
) {
let mut state = self.state.lock().unwrap();
if let Some(consumer) = state.consumers.get(peer_id) {
consumer
if let Some(session) = state.sessions.get(session_id) {
session
.webrtcbin
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
if let Err(err) = state.signaller.handle_sdp(element, peer_id, &offer) {
if let Err(err) = state.signaller.handle_sdp(element, session_id, &offer) {
gst::warning!(
CAT,
"Failed to handle SDP for consumer {}: {}",
peer_id,
"Failed to handle SDP for session {}: {}",
session_id,
err
);
state.remove_consumer(element, peer_id, true);
state.end_session(element, session_id, true);
}
}
}
fn negotiate(&self, element: &super::WebRTCSink, peer_id: &str) {
fn negotiate(&self, element: &super::WebRTCSink, session_id: &str) {
let state = self.state.lock().unwrap();
gst::debug!(CAT, obj: element, "Negotiating for peer {}", peer_id);
gst::debug!(CAT, obj: element, "Negotiating for session {}", session_id);
if let Some(consumer) = state.consumers.get(peer_id) {
if let Some(session) = state.sessions.get(session_id) {
let element = element.downgrade();
gst::debug!(CAT, "Creating offer for peer {}", peer_id);
let peer_id = peer_id.to_string();
gst::debug!(CAT, "Creating offer for session {}", session_id);
let session_id = session_id.to_string();
let promise = gst::Promise::with_change_func(move |reply| {
gst::debug!(CAT, "Created offer for peer {}", peer_id);
gst::debug!(CAT, "Created offer for session {}", session_id);
if let Some(element) = element.upgrade() {
let this = Self::from_instance(&element);
@ -1240,9 +1244,9 @@ impl WebRTCSink {
CAT,
obj: &element,
"Promise returned without a reply for {}",
peer_id
session_id
);
let _ = this.remove_consumer(&element, &peer_id, true);
let _ = this.remove_session(&element, &session_id, true);
return;
}
Err(err) => {
@ -1250,10 +1254,10 @@ impl WebRTCSink {
CAT,
obj: &element,
"Promise returned with an error for {}: {:?}",
peer_id,
session_id,
err
);
let _ = this.remove_consumer(&element, &peer_id, true);
let _ = this.remove_session(&element, &session_id, true);
return;
}
};
@ -1262,28 +1266,29 @@ impl WebRTCSink {
.value("offer")
.map(|offer| offer.get::<gst_webrtc::WebRTCSessionDescription>().unwrap())
{
this.on_offer_created(&element, offer, &peer_id);
this.on_offer_created(&element, offer, &session_id);
} else {
gst::warning!(
CAT,
"Reply without an offer for consumer {}: {:?}",
peer_id,
"Reply without an offer for session {}: {:?}",
session_id,
reply
);
let _ = this.remove_consumer(&element, &peer_id, true);
let _ = this.remove_session(&element, &session_id, true);
}
}
});
consumer
session
.webrtcbin
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
} else {
gst::debug!(
CAT,
obj: element,
"consumer for peer {} no longer exists",
peer_id
"consumer for session {} no longer exists (sessions: {:?}",
session_id,
state.sessions.keys().map(|id| id)
);
}
}
@ -1291,47 +1296,58 @@ impl WebRTCSink {
fn on_ice_candidate(
&self,
element: &super::WebRTCSink,
peer_id: String,
session_id: String,
sdp_m_line_index: u32,
candidate: String,
) {
let mut state = self.state.lock().unwrap();
if let Err(err) =
state
.signaller
.handle_ice(element, &peer_id, &candidate, Some(sdp_m_line_index), None)
{
if let Err(err) = state.signaller.handle_ice(
element,
&session_id,
&candidate,
Some(sdp_m_line_index),
None,
) {
gst::warning!(
CAT,
"Failed to handle ICE for consumer {}: {}",
peer_id,
"Failed to handle ICE in session {}: {}",
session_id,
err
);
state.remove_consumer(element, &peer_id, true);
state.end_session(element, &session_id, true);
}
}
/// Called by the signaller to add a new consumer
pub fn add_consumer(
/// Called by the signaller to add a new session
pub fn start_session(
&self,
element: &super::WebRTCSink,
session_id: &str,
peer_id: &str,
) -> Result<(), WebRTCSinkError> {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let peer_id = peer_id.to_string();
let session_id = session_id.to_string();
if state.consumers.contains_key(&peer_id) {
return Err(WebRTCSinkError::DuplicateConsumerId(peer_id));
if state.sessions.contains_key(&session_id) {
return Err(WebRTCSinkError::DuplicateSessionId(session_id));
}
gst::info!(CAT, obj: element, "Adding consumer {}", peer_id);
gst::info!(
CAT,
obj: element,
"Adding session: {} for peer: {}",
peer_id,
session_id
);
let pipeline = gst::Pipeline::new(Some(&format!("consumer-pipeline-{}", peer_id)));
let pipeline = gst::Pipeline::new(Some(&format!("session-pipeline-{}", session_id)));
let webrtcbin = make_element("webrtcbin", Some(&format!("webrtcbin-{}", peer_id)))
.map_err(|err| WebRTCSinkError::ConsumerPipelineError {
let webrtcbin = make_element("webrtcbin", Some(&format!("webrtcbin-{}", session_id)))
.map_err(|err| WebRTCSinkError::SessionPipelineError {
session_id: session_id.clone(),
peer_id: peer_id.clone(),
details: err.to_string(),
})?;
@ -1351,7 +1367,7 @@ impl WebRTCSink {
webrtcbin.connect_closure(
"request-aux-sender",
false,
glib::closure!(@watch element, @strong peer_id
glib::closure!(@watch element, @strong session_id
=> move |_webrtcbin: gst::Element, _transport: gst::Object| {
let settings = element.imp().settings.lock().unwrap();
@ -1378,9 +1394,9 @@ impl WebRTCSink {
};
cc.connect_notify(Some("estimated-bitrate"),
glib::clone!(@weak element, @strong peer_id
glib::clone!(@weak element, @strong session_id
=> move |bwe, pspec| {
element.imp().set_bitrate(&element, &peer_id,
element.imp().set_bitrate(&element, &session_id,
bwe.property::<u32>(pspec.name()));
}
));
@ -1392,12 +1408,12 @@ impl WebRTCSink {
webrtcbin.connect_closure(
"deep-element-added",
false,
glib::closure!(@watch element, @strong peer_id
glib::closure!(@watch element, @strong session_id
=> move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| {
if e.factory().map_or(false, |f| f.name() == "rtprtxsend") {
if e.has_property("stuffing-kbps", Some(i32::static_type())) {
element.imp().set_rtptrxsend(&element, &peer_id, e);
element.imp().set_rtptrxsend(&element, &session_id, e);
} else {
gst::warning!(CAT, "rtprtxsend doesn't have a `stuffing-kbps` \
property, stuffing disabled");
@ -1412,7 +1428,7 @@ impl WebRTCSink {
pipeline.add(&webrtcbin).unwrap();
let element_clone = element.downgrade();
let peer_id_clone = peer_id.clone();
let session_id_clone = session_id.clone();
webrtcbin.connect("on-ice-candidate", false, move |values| {
if let Some(element) = element_clone.upgrade() {
let this = Self::from_instance(&element);
@ -1420,7 +1436,7 @@ impl WebRTCSink {
let candidate = values[2].get::<String>().expect("Invalid argument");
this.on_ice_candidate(
&element,
peer_id_clone.to_string(),
session_id_clone.to_string(),
sdp_m_line_index,
candidate,
);
@ -1430,6 +1446,7 @@ impl WebRTCSink {
let element_clone = element.downgrade();
let peer_id_clone = peer_id.clone();
let session_id_clone = session_id.clone();
webrtcbin.connect_notify(Some("connection-state"), move |webrtcbin, _pspec| {
if let Some(element) = element_clone.upgrade() {
let state =
@ -1441,16 +1458,18 @@ impl WebRTCSink {
gst::warning!(
CAT,
obj: &element,
"Connection state for consumer {} failed",
"Connection state for in session {} (peer {}) failed",
session_id_clone,
peer_id_clone
);
let _ = this.remove_consumer(&element, &peer_id_clone, true);
let _ = this.remove_session(&element, &session_id_clone, true);
}
_ => {
gst::log!(
CAT,
obj: &element,
"Connection state for consumer {} changed: {:?}",
"Connection state in session {} (peer {}) changed: {:?}",
session_id_clone,
peer_id_clone,
state
);
@ -1461,6 +1480,7 @@ impl WebRTCSink {
let element_clone = element.downgrade();
let peer_id_clone = peer_id.clone();
let session_id_clone = session_id.clone();
webrtcbin.connect_notify(Some("ice-connection-state"), move |webrtcbin, _pspec| {
if let Some(element) = element_clone.upgrade() {
let state = webrtcbin
@ -1472,16 +1492,18 @@ impl WebRTCSink {
gst::warning!(
CAT,
obj: &element,
"Ice connection state for consumer {} failed",
peer_id_clone
"Ice connection state in session {} (peer {}) failed",
session_id_clone,
peer_id_clone,
);
let _ = this.remove_consumer(&element, &peer_id_clone, true);
let _ = this.remove_session(&element, &session_id_clone, true);
}
_ => {
gst::log!(
CAT,
obj: &element,
"Ice connection state for consumer {} changed: {:?}",
"Ice connection state in session {} (peer {}) changed: {:?}",
session_id_clone,
peer_id_clone,
state
);
@ -1491,8 +1513,8 @@ impl WebRTCSink {
if state == gst_webrtc::WebRTCICEConnectionState::Completed {
let state = this.state.lock().unwrap();
if let Some(consumer) = state.consumers.get(&peer_id_clone) {
for webrtc_pad in consumer.webrtc_pads.values() {
if let Some(session) = state.sessions.get(&session_id_clone) {
for webrtc_pad in session.webrtc_pads.values() {
if let Some(srcpad) = webrtc_pad.pad.peer() {
srcpad.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
@ -1508,6 +1530,7 @@ impl WebRTCSink {
let element_clone = element.downgrade();
let peer_id_clone = peer_id.clone();
let session_id_clone = session_id.clone();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let state =
webrtcbin.property::<gst_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
@ -1516,14 +1539,16 @@ impl WebRTCSink {
gst::log!(
CAT,
obj: &element,
"Ice gathering state for consumer {} changed: {:?}",
"Ice gathering state in session {} (peer {}) changed: {:?}",
session_id_clone,
peer_id_clone,
state
);
}
});
let mut consumer = Consumer::new(
let mut session = Session::new(
session_id.clone(),
pipeline.clone(),
webrtcbin.clone(),
peer_id.clone(),
@ -1549,23 +1574,23 @@ impl WebRTCSink {
.child_by_name("rtpbin")
.unwrap();
if consumer.congestion_controller.is_some() {
let peer_id_str = peer_id.to_string();
if consumer.stats_sigid.is_none() {
consumer.stats_sigid = Some(rtpbin.connect_closure("on-new-ssrc", true,
if session.congestion_controller.is_some() {
let session_id_str = session_id.to_string();
if session.stats_sigid.is_none() {
session.stats_sigid = Some(rtpbin.connect_closure("on-new-ssrc", true,
glib::closure!(@weak-allow-none element, @weak-allow-none webrtcbin
=> move |rtpbin: gst::Object, session_id: u32, _src: u32| {
let session = rtpbin.emit_by_name::<gst::Element>("get-session", &[&session_id]);
let rtp_session = rtpbin.emit_by_name::<gst::Element>("get-session", &[&session_id]);
let element = element.expect("on-new-ssrc emited when webrtcsink has been disposed?");
let webrtcbin = webrtcbin.unwrap();
let mut state = element.imp().state.lock().unwrap();
if let Some(mut consumer) = state.consumers.get_mut(&peer_id_str) {
if let Some(mut session) = state.sessions.get_mut(&session_id_str) {
consumer.stats_sigid = Some(session.connect_notify(Some("twcc-stats"),
glib::clone!(@strong peer_id_str, @weak webrtcbin, @weak element => @default-panic, move |sess, pspec| {
session.stats_sigid = Some(rtp_session.connect_notify(Some("twcc-stats"),
glib::clone!(@strong session_id_str, @weak webrtcbin, @weak element => @default-panic, move |sess, pspec| {
// Run the Loss-based control algortithm on new peer TWCC feedbacks
element.imp().process_loss_stats(&element, &peer_id_str, &sess.property::<gst::Structure>(pspec.name()));
element.imp().process_loss_stats(&element, &session_id_str, &sess.property::<gst::Structure>(pspec.name()));
})
));
}
@ -1577,7 +1602,7 @@ impl WebRTCSink {
state
.streams
.iter()
.for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &settings, stream));
.for_each(|(_, stream)| session.request_webrtcbin_pad(element, &settings, stream));
let clock = element.clock();
@ -1588,7 +1613,7 @@ impl WebRTCSink {
let mut bus_stream = pipeline.bus().unwrap().stream();
let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade();
let peer_id_clone = peer_id.to_owned();
let session_id_clone = session_id.to_owned();
task::spawn(async move {
while let Some(msg) = bus_stream.next().await {
@ -1598,12 +1623,12 @@ impl WebRTCSink {
gst::MessageView::Error(err) => {
gst::error!(
CAT,
"Consumer {} error: {}, details: {:?}",
peer_id_clone,
"session {} error: {}, details: {:?}",
session_id_clone,
err.error(),
err.debug()
);
let _ = this.remove_consumer(&element, &peer_id_clone, true);
let _ = this.remove_session(&element, &session_id_clone, true);
}
gst::MessageView::StateChanged(state_changed) => {
if let Some(pipeline) = pipeline_clone.upgrade() {
@ -1611,8 +1636,8 @@ impl WebRTCSink {
pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"webrtcsink-peer-{}-{:?}-to-{:?}",
peer_id_clone,
"webrtcsink-session-{}-{:?}-to-{:?}",
session_id_clone,
state_changed.old(),
state_changed.current()
),
@ -1629,10 +1654,10 @@ impl WebRTCSink {
gst::MessageView::Eos(..) => {
gst::error!(
CAT,
"Unexpected end of stream for consumer {}",
peer_id_clone
"Unexpected end of stream in session {}",
session_id_clone,
);
let _ = this.remove_consumer(&element, &peer_id_clone, true);
let _ = this.remove_session(&element, &session_id_clone, true);
}
_ => (),
}
@ -1641,7 +1666,8 @@ impl WebRTCSink {
});
pipeline.set_state(gst::State::Ready).map_err(|err| {
WebRTCSinkError::ConsumerPipelineError {
WebRTCSinkError::SessionPipelineError {
session_id: session_id.to_string(),
peer_id: peer_id.to_string(),
details: err.to_string(),
}
@ -1651,7 +1677,7 @@ impl WebRTCSink {
state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin));
}
state.consumers.insert(peer_id.to_string(), consumer);
state.sessions.insert(session_id.to_string(), session);
drop(state);
@ -1669,10 +1695,11 @@ impl WebRTCSink {
//
// This is completely safe, as we know that by now all conditions are gathered:
// webrtcbin is in the Ready state, and all its transceivers have codec_preferences.
self.negotiate(element, &peer_id);
self.negotiate(element, &session_id);
pipeline.set_state(gst::State::Playing).map_err(|err| {
WebRTCSinkError::ConsumerPipelineError {
WebRTCSinkError::SessionPipelineError {
session_id: session_id.to_string(),
peer_id: peer_id.to_string(),
details: err.to_string(),
}
@ -1682,21 +1709,21 @@ impl WebRTCSink {
}
/// Called by the signaller to remove a consumer
pub fn remove_consumer(
pub fn remove_session(
&self,
element: &super::WebRTCSink,
peer_id: &str,
session_id: &str,
signal: bool,
) -> Result<(), WebRTCSinkError> {
let mut state = self.state.lock().unwrap();
if !state.consumers.contains_key(peer_id) {
return Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()));
if !state.sessions.contains_key(session_id) {
return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()));
}
if let Some(consumer) = state.remove_consumer(element, peer_id, signal) {
if let Some(session) = state.end_session(element, session_id, signal) {
drop(state);
element.emit_by_name::<()>("consumer-removed", &[&peer_id, &consumer.webrtcbin]);
element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
}
Ok(())
@ -1705,30 +1732,35 @@ impl WebRTCSink {
fn process_loss_stats(
&self,
element: &super::WebRTCSink,
peer_id: &str,
session_id: &str,
stats: &gst::Structure,
) {
let mut state = element.imp().state.lock().unwrap();
if let Some(mut consumer) = state.consumers.get_mut(peer_id) {
if let Some(congestion_controller) = consumer.congestion_controller.as_mut() {
congestion_controller.loss_control(&element, stats, &mut consumer.encoders);
if let Some(mut session) = state.sessions.get_mut(session_id) {
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
congestion_controller.loss_control(&element, stats, &mut session.encoders);
}
consumer.stats = stats.to_owned();
session.stats = stats.to_owned();
}
}
fn process_stats(&self, element: &super::WebRTCSink, webrtcbin: gst::Element, peer_id: &str) {
let peer_id = peer_id.to_string();
fn process_stats(
&self,
element: &super::WebRTCSink,
webrtcbin: gst::Element,
session_id: &str,
) {
let session_id = session_id.to_string();
let promise = gst::Promise::with_change_func(
glib::clone!(@strong peer_id, @weak element => move |reply| {
glib::clone!(@strong session_id, @weak element => move |reply| {
if let Ok(Some(stats)) = reply {
let mut state = element.imp().state.lock().unwrap();
if let Some(mut consumer) = state.consumers.get_mut(&peer_id) {
if let Some(congestion_controller) = consumer.congestion_controller.as_mut() {
congestion_controller.delay_control(&element, stats, &mut consumer.encoders,);
if let Some(mut session) = state.sessions.get_mut(&session_id) {
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
congestion_controller.delay_control(&element, stats, &mut session.encoders,);
}
consumer.stats = stats.to_owned();
session.stats = stats.to_owned();
}
}
}),
@ -1740,34 +1772,34 @@ impl WebRTCSink {
fn set_rtptrxsend(&self, element: &super::WebRTCSink, peer_id: &str, rtprtxsend: gst::Element) {
let mut state = element.imp().state.lock().unwrap();
if let Some(consumer) = state.consumers.get_mut(peer_id) {
consumer.rtprtxsend = Some(rtprtxsend);
if let Some(session) = state.sessions.get_mut(peer_id) {
session.rtprtxsend = Some(rtprtxsend);
}
}
fn set_bitrate(&self, element: &super::WebRTCSink, peer_id: &str, bitrate: u32) {
let mut state = element.imp().state.lock().unwrap();
if let Some(consumer) = state.consumers.get_mut(peer_id) {
if let Some(session) = state.sessions.get_mut(peer_id) {
let fec_ratio = {
// Start adding some FEC when the bitrate > 2Mbps as we found experimentally
// that it is not worth it below that threshold
if bitrate <= 2_000_000 || consumer.cc_info.max_bitrate <= 2_000_000 {
if bitrate <= 2_000_000 || session.cc_info.max_bitrate <= 2_000_000 {
0f64
} else {
(bitrate as f64 - 2_000_000.)
/ (consumer.cc_info.max_bitrate as f64 - 2_000_000.)
/ (session.cc_info.max_bitrate as f64 - 2_000_000.)
}
};
let fec_percentage = fec_ratio * 50f64;
let encoders_bitrate = ((bitrate as f64) / (1. + (fec_percentage / 100.))) as i32;
if let Some(ref rtpxsend) = consumer.rtprtxsend.as_ref() {
if let Some(ref rtpxsend) = session.rtprtxsend.as_ref() {
rtpxsend.set_property("stuffing-kbps", (bitrate as f64 / 1000.) as i32);
}
for encoder in consumer.encoders.iter_mut() {
for encoder in session.encoders.iter_mut() {
encoder.set_bitrate(element, encoders_bitrate);
encoder
.transceiver
@ -1776,12 +1808,12 @@ impl WebRTCSink {
}
}
fn on_remote_description_set(&self, element: &super::WebRTCSink, peer_id: String) {
fn on_remote_description_set(&self, element: &super::WebRTCSink, session_id: String) {
let mut state = self.state.lock().unwrap();
let mut remove = false;
if let Some(mut consumer) = state.consumers.remove(&peer_id) {
for webrtc_pad in consumer.webrtc_pads.clone().values() {
if let Some(mut session) = state.sessions.remove(&session_id) {
for webrtc_pad in session.webrtc_pads.clone().values() {
let transceiver = webrtc_pad
.pad
.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver");
@ -1798,14 +1830,14 @@ impl WebRTCSink {
.and_then(|stream| stream.producer.as_ref())
{
if let Err(err) =
consumer.connect_input_stream(element, producer, webrtc_pad, &state.codecs)
session.connect_input_stream(element, producer, webrtc_pad, &state.codecs)
{
gst::error!(
CAT,
obj: element,
"Failed to connect input stream {} for consumer {}: {}",
"Failed to connect input stream {} for session {}: {}",
webrtc_pad.stream_name,
peer_id,
session_id,
err
);
remove = true;
@ -1815,21 +1847,21 @@ impl WebRTCSink {
gst::error!(
CAT,
obj: element,
"No producer to connect consumer {} to",
peer_id,
"No producer to connect session {} to",
session_id,
);
remove = true;
break;
}
}
consumer.pipeline.debug_to_dot_file_with_ts(
session.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!("webrtcsink-peer-{}-remote-description-set", peer_id,),
format!("webrtcsink-peer-{}-remote-description-set", session_id,),
);
let element_clone = element.downgrade();
let webrtcbin = consumer.webrtcbin.downgrade();
let webrtcbin = session.webrtcbin.downgrade();
task::spawn(async move {
let mut interval =
async_std::stream::interval(std::time::Duration::from_millis(100));
@ -1839,7 +1871,9 @@ impl WebRTCSink {
if let (Some(webrtcbin), Some(element)) =
(webrtcbin.upgrade(), element_clone.upgrade())
{
element.imp().process_stats(&element, webrtcbin, &peer_id);
element
.imp()
.process_stats(&element, webrtcbin, &session_id);
} else {
break;
}
@ -1847,9 +1881,9 @@ impl WebRTCSink {
});
if remove {
state.finalize_consumer(element, &mut consumer, true);
state.finalize_session(element, &mut session, true);
} else {
state.consumers.insert(consumer.peer_id.clone(), consumer);
state.sessions.insert(session.id.clone(), session);
}
}
}
@ -1858,7 +1892,7 @@ impl WebRTCSink {
pub fn handle_ice(
&self,
_element: &super::WebRTCSink,
peer_id: &str,
session_id: &str,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
candidate: &str,
@ -1867,14 +1901,14 @@ impl WebRTCSink {
let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?;
if let Some(consumer) = state.consumers.get(peer_id) {
gst::trace!(CAT, "adding ice candidate for peer {}", peer_id);
consumer
if let Some(session) = state.sessions.get(session_id) {
gst::trace!(CAT, "adding ice candidate for session {}", session_id);
session
.webrtcbin
.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
Ok(())
} else {
Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()))
Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()))
}
}
@ -1882,17 +1916,17 @@ impl WebRTCSink {
pub fn handle_sdp(
&self,
element: &super::WebRTCSink,
peer_id: &str,
session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), WebRTCSinkError> {
let mut state = self.state.lock().unwrap();
if let Some(consumer) = state.consumers.get_mut(peer_id) {
if let Some(session) = state.sessions.get_mut(session_id) {
let sdp = desc.sdp();
consumer.sdp = Some(sdp.to_owned());
session.sdp = Some(sdp.to_owned());
for webrtc_pad in consumer.webrtc_pads.values_mut() {
for webrtc_pad in session.webrtc_pads.values_mut() {
let media_idx = webrtc_pad.media_idx;
/* TODO: support partial answer, webrtcbin doesn't seem
* very well equipped to deal with this at the moment */
@ -1904,15 +1938,15 @@ impl WebRTCSink {
gst::warning!(
CAT,
"consumer {} refused media {}: {:?}",
peer_id,
"consumer from session {} refused media {}: {:?}",
session_id,
media_idx,
media_str
);
state.remove_consumer(element, peer_id, true);
state.end_session(element, session_id, true);
return Err(WebRTCSinkError::ConsumerRefusedMedia {
peer_id: peer_id.to_string(),
session_id: session_id.to_string(),
media_idx,
});
}
@ -1927,39 +1961,40 @@ impl WebRTCSink {
} else {
gst::warning!(
CAT,
"consumer {} did not provide valid payload for media index {}",
peer_id,
media_idx
"consumer from session {} did not provide valid payload for media index {} for session {}",
session_id,
media_idx,
session_id,
);
state.remove_consumer(element, peer_id, true);
state.end_session(element, session_id, true);
return Err(WebRTCSinkError::ConsumerNoValidPayload {
peer_id: peer_id.to_string(),
session_id: session_id.to_string(),
media_idx,
});
}
}
let element = element.downgrade();
let peer_id = peer_id.to_string();
let session_id = session_id.to_string();
let promise = gst::Promise::with_change_func(move |reply| {
gst::debug!(CAT, "received reply {:?}", reply);
if let Some(element) = element.upgrade() {
let this = Self::from_instance(&element);
this.on_remote_description_set(&element, peer_id);
this.on_remote_description_set(&element, session_id);
}
});
consumer
session
.webrtcbin
.emit_by_name::<()>("set-remote-description", &[desc, &promise]);
Ok(())
} else {
Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()))
Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()))
}
}
@ -2135,7 +2170,7 @@ impl WebRTCSink {
self.state
.lock()
.unwrap()
.consumers
.sessions
.iter()
.map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())),
)
@ -2493,11 +2528,11 @@ impl ObjectImpl for WebRTCSink {
.param_types([String::static_type(), gst::Element::static_type()])
.build(),
/*
* RsWebRTCSink::get_consumers:
* RsWebRTCSink::get_sessions:
*
* List all consumers (by ID).
* List all sessions (by ID).
*/
glib::subclass::Signal::builder("get-consumers")
glib::subclass::Signal::builder("get-sessions")
.action()
.class_handler(|_, args| {
let element = args[0].get::<super::WebRTCSink>().expect("signal arg");
@ -2507,7 +2542,7 @@ impl ObjectImpl for WebRTCSink {
this.state
.lock()
.unwrap()
.consumers
.sessions
.keys()
.cloned()
.collect::<Vec<String>>()

View file

@ -15,18 +15,22 @@ unsafe impl Sync for WebRTCSink {}
#[derive(thiserror::Error, Debug)]
pub enum WebRTCSinkError {
#[error("no consumer with id")]
NoConsumerWithId(String),
#[error("no session with id")]
NoSessionWithId(String),
#[error("consumer refused media")]
ConsumerRefusedMedia { peer_id: String, media_idx: u32 },
ConsumerRefusedMedia { session_id: String, media_idx: u32 },
#[error("consumer did not provide valid payload for media")]
ConsumerNoValidPayload { peer_id: String, media_idx: u32 },
ConsumerNoValidPayload { session_id: String, media_idx: u32 },
#[error("SDP mline index is currently mandatory")]
MandatorySdpMlineIndex,
#[error("duplicate consumer id")]
DuplicateConsumerId(String),
#[error("duplicate session id")]
DuplicateSessionId(String),
#[error("error setting up consumer pipeline")]
ConsumerPipelineError { peer_id: String, details: String },
SessionPipelineError {
session_id: String,
peer_id: String,
details: String,
},
}
pub trait Signallable: Sync + Send + 'static {
@ -35,7 +39,7 @@ pub trait Signallable: Sync + Send + 'static {
fn handle_sdp(
&mut self,
element: &WebRTCSink,
peer_id: &str,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>>;
@ -46,13 +50,13 @@ pub trait Signallable: Sync + Send + 'static {
fn handle_ice(
&mut self,
element: &WebRTCSink,
peer_id: &str,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>>;
fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str);
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str);
fn stop(&mut self, element: &WebRTCSink);
}
@ -86,12 +90,12 @@ impl WebRTCSink {
pub fn handle_sdp(
&self,
peer_id: &str,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_sdp(self, peer_id, sdp)
ws.handle_sdp(self, session_id, sdp)
}
/// sdp_mid is exposed for future proofing, see
@ -99,14 +103,14 @@ impl WebRTCSink {
/// at the moment sdp_m_line_index must be Some
pub fn handle_ice(
&self,
peer_id: &str,
session_id: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_ice(self, peer_id, sdp_m_line_index, sdp_mid, candidate)
ws.handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate)
}
pub fn handle_signalling_error(&self, error: Box<dyn Error + Send + Sync>) {
@ -115,16 +119,16 @@ impl WebRTCSink {
ws.handle_signalling_error(self, anyhow::anyhow!(error));
}
pub fn add_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> {
pub fn start_session(&self, session_id: &str, peer_id: &str) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.add_consumer(self, peer_id)
ws.start_session(self, session_id, peer_id)
}
pub fn remove_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> {
pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.remove_consumer(self, peer_id, false)
ws.remove_session(self, session_id, false)
}
}

View file

@ -88,12 +88,17 @@ pub enum OutgoingMessage {
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Instructs a peer to generate an offer
/// Instructs a peer to generate an offer and inform about the session ID
#[serde(rename_all = "camelCase")]
StartSession { peer_id: String },
StartSession { peer_id: String, session_id: String },
/// Let consumer know that the requested session is starting with the specified identifier
#[serde(rename_all = "camelCase")]
SessionStarted { peer_id: String, session_id: String },
/// Signals that the session the peer was in was ended
#[serde(rename_all = "camelCase")]
EndSession { peer_id: String },
EndSession(EndSessionMessage),
/// Messages directly forwarded from one peer to another
Peer(PeerMessage),
/// Provides the current list of consumer peers
@ -151,7 +156,7 @@ pub struct StartSessionMessage {
pub peer_id: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Conveys a SDP
@ -168,7 +173,7 @@ pub enum SdpMessage {
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
/// Contents of the peer message
pub enum PeerMessageInner {
@ -187,19 +192,17 @@ pub enum PeerMessageInner {
#[serde(rename_all = "camelCase")]
/// Messages directly forwarded from one peer to another
pub struct PeerMessage {
/// The identifier of the peer, which must be in a session with the sender
pub peer_id: String,
/// The contents of the message
pub session_id: String,
#[serde(flatten)]
pub peer_message: PeerMessageInner,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
/// End a session
pub struct EndSessionMessage {
/// The identifier of the peer to end the session with
pub peer_id: String,
/// The identifier of the session to end
pub session_id: String,
}
#[derive(Serialize, Deserialize, Debug)]

File diff suppressed because it is too large Load diff

View file

@ -47,6 +47,7 @@ function Uint8ToString(u8a){
}
function Session(our_id, peer_id, closed_callback) {
this.id = null;
this.peer_connection = null;
this.ws_conn = null;
this.peer_id = peer_id;
@ -113,7 +114,7 @@ function Session(our_id, peer_id, closed_callback) {
this.setStatus("Sending SDP answer");
var sdp = {
'type': 'peer',
'peerId': this.peer_id,
'sessionId': this.id,
'sdp': this.peer_connection.localDescription.toJSON()
};
this.ws_conn.send(JSON.stringify(sdp));
@ -164,6 +165,9 @@ function Session(our_id, peer_id, closed_callback) {
if (msg.type == "registered") {
this.setStatus("Registered with server");
this.connectPeer();
} else if (msg.type == "sessionStarted") {
this.setStatus("Registered with server");
this.id = msg.sessionId;
} else if (msg.type == "error") {
this.handleIncomingError(msg.details);
} else if (msg.type == "endSession") {
@ -315,7 +319,7 @@ function Session(our_id, peer_id, closed_callback) {
}
this.ws_conn.send(JSON.stringify({
"type": "peer",
"peerId": this.peer_id,
"sessionId": this.id,
"ice": event.candidate.toJSON()
}));
};