webrtcsink: expose signaller as a property

in the process move the signaller field to the settings struct

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1141>
This commit is contained in:
Matthew Waters 2023-03-17 19:28:19 +11:00
parent 8236f3e5e7
commit b6e78b5f04

View file

@ -16,6 +16,7 @@ use once_cell::sync::Lazy;
use std::collections::HashMap; use std::collections::HashMap;
use std::ops::Mul; use std::ops::Mul;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::MutexGuard;
use super::homegrown_cc::CongestionController; use super::homegrown_cc::CongestionController;
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
@ -78,6 +79,7 @@ struct Settings {
enable_data_channel_navigation: bool, enable_data_channel_navigation: bool,
meta: Option<gst::Structure>, meta: Option<gst::Structure>,
ice_transport_policy: WebRTCICETransportPolicy, ice_transport_policy: WebRTCICETransportPolicy,
signaller: Signallable,
} }
/// Represents a codec we can offer /// Represents a codec we can offer
@ -174,7 +176,7 @@ struct Session {
codecs: Option<BTreeMap<i32, Codec>>, codecs: Option<BTreeMap<i32, Codec>>,
} }
#[derive(PartialEq)] #[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum SignallerState { enum SignallerState {
Started, Started,
Stopped, Stopped,
@ -201,7 +203,6 @@ struct SignallerSignals {
/* Our internal state */ /* Our internal state */
struct State { struct State {
signaller: Signallable,
signaller_state: SignallerState, signaller_state: SignallerState,
sessions: HashMap<String, Session>, sessions: HashMap<String, Session>,
codecs: BTreeMap<i32, Codec>, codecs: BTreeMap<i32, Codec>,
@ -288,6 +289,8 @@ pub struct WebRTCSink {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
let signaller = Signaller::new(WebRTCSignallerRole::Producer);
Self { Self {
video_caps: ["video/x-vp8", "video/x-h264", "video/x-vp9", "video/x-h265"] video_caps: ["video/x-vp8", "video/x-h264", "video/x-vp9", "video/x-h265"]
.into_iter() .into_iter()
@ -310,16 +313,14 @@ impl Default for Settings {
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
meta: None, meta: None,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
signaller: signaller.upcast(),
} }
} }
} }
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
let signaller = Signaller::new(WebRTCSignallerRole::Producer);
Self { Self {
signaller: signaller.upcast(),
signaller_state: SignallerState::Stopped, signaller_state: SignallerState::Stopped,
sessions: HashMap::new(), sessions: HashMap::new(),
codecs: BTreeMap::new(), codecs: BTreeMap::new(),
@ -770,7 +771,7 @@ impl VideoEncoder {
} }
impl State { impl State {
fn finalize_session(&mut self, session: &mut Session, signal: bool) { fn finalize_session(&mut self, session: &mut Session) {
gst::info!(CAT, "Ending session {}", session.id); gst::info!(CAT, "Ending session {}", session.id);
session.pipeline.debug_to_dot_file_with_ts( session.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(), gst::DebugGraphDetails::all(),
@ -784,35 +785,27 @@ impl State {
session.pipeline.call_async(|pipeline| { session.pipeline.call_async(|pipeline| {
let _ = pipeline.set_state(gst::State::Null); let _ = pipeline.set_state(gst::State::Null);
}); });
if signal {
self.signaller.end_session(&session.id);
}
} }
fn end_session(&mut self, session_id: &str, signal: bool) -> Option<Session> { fn end_session(&mut self, session_id: &str) -> Option<Session> {
if let Some(mut session) = self.sessions.remove(session_id) { if let Some(mut session) = self.sessions.remove(session_id) {
self.finalize_session(&mut session, signal); self.finalize_session(&mut session);
Some(session) Some(session)
} else { } else {
None None
} }
} }
fn maybe_start_signaller(&mut self, element: &super::WebRTCSink) { fn maybe_start_signaller(
&mut self,
element: &super::WebRTCSink,
settings: &MutexGuard<Settings>,
) {
if self.signaller_state == SignallerState::Stopped if self.signaller_state == SignallerState::Stopped
&& element.current_state() >= gst::State::Paused && element.current_state() >= gst::State::Paused
&& self.codec_discovery_done && self.codec_discovery_done
{ {
self.signaller.start(); settings.signaller.start();
}
}
fn maybe_stop_signaller(&mut self, _element: &super::WebRTCSink) {
if self.signaller_state == SignallerState::Started {
self.signaller.stop();
self.signaller_state = SignallerState::Stopped;
gst::info!(CAT, "Stopped signaller");
} }
} }
} }
@ -1369,13 +1362,17 @@ impl WebRTCSink {
fn unprepare(&self, element: &super::WebRTCSink) -> Result<(), Error> { fn unprepare(&self, element: &super::WebRTCSink) -> Result<(), Error> {
gst::info!(CAT, obj: element, "unpreparing"); gst::info!(CAT, obj: element, "unpreparing");
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect();
for id in session_ids { let sessions: Vec<_> = session_ids
state.end_session(&id, true); .iter()
} .filter_map(|id| state.end_session(id))
.collect();
state state
.streams .streams
@ -1392,11 +1389,24 @@ impl WebRTCSink {
}); });
} }
state.maybe_stop_signaller(element);
state.codec_discovery_done = false; state.codec_discovery_done = false;
state.codecs = BTreeMap::new(); state.codecs = BTreeMap::new();
let signaller_state = state.signaller_state;
if state.signaller_state == SignallerState::Started {
state.signaller_state = SignallerState::Stopped;
}
drop(state);
for session in sessions {
signaller.end_session(&session.id);
}
if signaller_state == SignallerState::Started {
signaller.stop();
gst::info!(CAT, "Stopped signaller");
}
Ok(()) Ok(())
} }
@ -1413,6 +1423,7 @@ impl WebRTCSink {
gst::StreamError::Failed, gst::StreamError::Failed,
["Signalling error: {}", error] ["Signalling error: {}", error]
); );
false
}) })
), ),
@ -1475,6 +1486,7 @@ impl WebRTCSink {
if let Err(err) = instance.imp().remove_session(instance, session_id, false) { if let Err(err) = instance.imp().remove_session(instance, session_id, false) {
gst::warning!(CAT, "{}", err); gst::warning!(CAT, "{}", err);
} }
false
}) })
), ),
@ -1491,10 +1503,10 @@ impl WebRTCSink {
/// When using a custom signaller /// When using a custom signaller
pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> { pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> {
let sigobj = signaller.clone(); let sigobj = signaller.clone();
let mut state = self.state.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
self.connect_signaller(&sigobj); self.connect_signaller(&sigobj);
state.signaller = signaller; settings.signaller = signaller;
Ok(()) Ok(())
} }
@ -1522,6 +1534,7 @@ impl WebRTCSink {
offer: gst_webrtc::WebRTCSessionDescription, offer: gst_webrtc::WebRTCSessionDescription,
session_id: &str, session_id: &str,
) { ) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get(session_id) { if let Some(session) = state.sessions.get(session_id) {
@ -1529,7 +1542,7 @@ impl WebRTCSink {
.webrtcbin .webrtcbin
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]); .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
state.signaller.send_sdp(session_id, &offer); settings.signaller.send_sdp(session_id, &offer);
} }
} }
@ -1539,6 +1552,7 @@ impl WebRTCSink {
answer: gst_webrtc::WebRTCSessionDescription, answer: gst_webrtc::WebRTCSessionDescription,
session_id: &str, session_id: &str,
) { ) {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let Some(mut session) = state.sessions.remove(session_id) { if let Some(mut session) = state.sessions.remove(session_id) {
@ -1557,12 +1571,13 @@ impl WebRTCSink {
.webrtcbin .webrtcbin
.emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]); .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
state.signaller.send_sdp(session_id, &answer); settings.signaller.send_sdp(session_id, &answer);
let session_id = session.id.clone(); let session_id = session.id.clone();
state.sessions.insert(session.id.clone(), session); state.sessions.insert(session.id.clone(), session);
drop(state); drop(state);
drop(settings);
self.on_remote_description_set(element, session_id) self.on_remote_description_set(element, session_id)
} }
@ -1872,8 +1887,8 @@ impl WebRTCSink {
sdp_m_line_index: u32, sdp_m_line_index: u32,
candidate: String, candidate: String,
) { ) {
let state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap();
state settings
.signaller .signaller
.add_ice(&session_id, &candidate, Some(sdp_m_line_index), None) .add_ice(&session_id, &candidate, Some(sdp_m_line_index), None)
} }
@ -2327,12 +2342,9 @@ impl WebRTCSink {
// so that application code can create data channels at the correct // so that application code can create data channels at the correct
// moment. // moment.
element.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); element.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
{ settings_clone
let state = this.state.lock().unwrap();
state
.signaller .signaller
.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); .emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
}
// We don't connect to on-negotiation-needed, this in order to call the above // We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock: // signal without holding the state lock:
@ -2367,17 +2379,22 @@ impl WebRTCSink {
session_id: &str, session_id: &str,
signal: bool, signal: bool,
) -> Result<(), WebRTCSinkError> { ) -> Result<(), WebRTCSinkError> {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if !state.sessions.contains_key(session_id) { if !state.sessions.contains_key(session_id) {
return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())); return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()));
} }
if let Some(session) = state.end_session(session_id, signal) { if let Some(session) = state.end_session(session_id) {
state let signaller = settings.signaller.clone();
.signaller
.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
drop(state); drop(state);
drop(settings);
signaller
.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
if signal {
signaller.end_session(session_id);
}
element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
} }
@ -2524,6 +2541,7 @@ impl WebRTCSink {
let element_clone = element.downgrade(); let element_clone = element.downgrade();
let webrtcbin = session.webrtcbin.downgrade(); let webrtcbin = session.webrtcbin.downgrade();
let session_id_clone = session_id.clone();
RUNTIME.spawn(async move { RUNTIME.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
@ -2535,7 +2553,7 @@ impl WebRTCSink {
{ {
element element
.imp() .imp()
.process_stats(&element, webrtcbin, &session_id); .process_stats(&element, webrtcbin, &session_id_clone);
} else { } else {
break; break;
} }
@ -2543,7 +2561,12 @@ impl WebRTCSink {
}); });
if remove { if remove {
state.finalize_session(&mut session, true); state.finalize_session(&mut session);
drop(state);
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
signaller.end_session(&session_id);
} else { } else {
state.sessions.insert(session.id.clone(), session); state.sessions.insert(session.id.clone(), session);
} }
@ -2608,7 +2631,13 @@ impl WebRTCSink {
media_idx, media_idx,
media_str media_str
); );
state.end_session(session_id, true); if let Some(_session) = state.end_session(session_id) {
drop(state);
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
signaller.end_session(session_id);
}
gst::warning!( gst::warning!(
CAT, CAT,
@ -2634,7 +2663,13 @@ impl WebRTCSink {
session_id, session_id,
); );
state.end_session(session_id, true); if let Some(_session) = state.end_session(session_id) {
drop(state);
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
signaller.end_session(session_id);
}
gst::warning!(CAT, obj: element, "Consumer did not provide valid payload for media sesion: {session_id} media_ix: {media_idx}"); gst::warning!(CAT, obj: element, "Consumer did not provide valid payload for media sesion: {session_id} media_ix: {media_idx}");
return; return;
@ -2948,9 +2983,10 @@ impl WebRTCSink {
); );
} }
Ok(Ok(_)) => { Ok(Ok(_)) => {
let settings = this.settings.lock().unwrap();
let mut state = this.state.lock().unwrap(); let mut state = this.state.lock().unwrap();
state.codec_discovery_done = true; state.codec_discovery_done = true;
state.maybe_start_signaller(&element); state.maybe_start_signaller(&element, &settings);
} }
_ => (), _ => (),
} }
@ -3074,6 +3110,10 @@ impl ObjectImpl for WebRTCSink {
.blurb("The policy to apply for ICE transport") .blurb("The policy to apply for ICE transport")
.mutable_ready() .mutable_ready()
.build(), .build(),
glib::ParamSpecObject::builder::<Signallable>("signaller")
.flags(glib::ParamFlags::READABLE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("The Signallable object to use to handle WebRTC Signalling")
.build(),
] ]
}); });
@ -3208,6 +3248,7 @@ impl ObjectImpl for WebRTCSink {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.ice_transport_policy.to_value() settings.ice_transport_policy.to_value()
} }
"signaller" => self.settings.lock().unwrap().signaller.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -3308,7 +3349,7 @@ impl ObjectImpl for WebRTCSink {
fn constructed(&self) { fn constructed(&self) {
self.parent_constructed(); self.parent_constructed();
let signaller = self.state.lock().unwrap().signaller.clone(); let signaller = self.settings.lock().unwrap().signaller.clone();
self.connect_signaller(&signaller); self.connect_signaller(&signaller);
@ -3466,8 +3507,9 @@ impl ElementImpl for WebRTCSink {
ret = Ok(gst::StateChangeSuccess::NoPreroll); ret = Ok(gst::StateChangeSuccess::NoPreroll);
} }
gst::StateChange::PausedToPlaying => { gst::StateChange::PausedToPlaying => {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.maybe_start_signaller(&element); state.maybe_start_signaller(&element, &settings);
} }
_ => (), _ => (),
} }
@ -3489,7 +3531,7 @@ impl ChildProxyImpl for WebRTCSink {
fn child_by_name(&self, name: &str) -> Option<glib::Object> { fn child_by_name(&self, name: &str) -> Option<glib::Object> {
match name { match name {
"signaller" => Some(self.state.lock().unwrap().signaller.clone().upcast()), "signaller" => Some(self.settings.lock().unwrap().signaller.clone().upcast()),
_ => None, _ => None,
} }
} }