API: refactor error signalling

* Expect Box<dyn Error> from custom signaller

* Use thiserror to define the error codes returned to custom
  signallers
This commit is contained in:
Mathieu Duponchelle 2021-12-21 23:37:29 +01:00
parent 114fd3c5f6
commit 8128c14fa9
6 changed files with 86 additions and 42 deletions

1
Cargo.lock generated
View file

@ -1845,6 +1845,7 @@ dependencies = [
"serde_derive",
"serde_json",
"smallvec",
"thiserror",
"tracing",
"tracing-log",
"tracing-subscriber",

View file

@ -18,6 +18,7 @@ gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gst
once_cell = "1.0"
smallvec = "1"
anyhow = "1"
thiserror = "1"
futures = "0.3"
async-std = { version = "1", features = ["unstable"] }
async-tungstenite = { version = "0.16", features = ["async-std-runtime", "async-native-tls"] }

View file

@ -178,10 +178,9 @@ impl Signaller {
"Unknown message from server: {}",
msg
);
element.handle_signalling_error(anyhow!(
"Unknown message from server: {}",
msg
));
element.handle_signalling_error(
anyhow!("Unknown message from server: {}", msg).into(),
);
}
}
Ok(WsMessage::Close(reason)) => {
@ -195,7 +194,9 @@ impl Signaller {
}
Ok(_) => (),
Err(err) => {
element.handle_signalling_error(anyhow!("Error receiving: {}", err));
element.handle_signalling_error(
anyhow!("Error receiving: {}", err).into(),
);
break;
}
}
@ -223,7 +224,7 @@ impl Signaller {
task::spawn(async move {
let this = Self::from_instance(&this);
if let Err(err) = this.connect(&element_clone).await {
element_clone.handle_signalling_error(err);
element_clone.handle_signalling_error(err.into());
}
});
}
@ -251,7 +252,7 @@ impl Signaller {
.await
{
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err));
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
}
});
@ -284,7 +285,7 @@ impl Signaller {
.await
{
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err));
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
}
});
@ -327,7 +328,7 @@ impl Signaller {
.await
{
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err));
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
}
});

View file

@ -1,6 +1,7 @@
use crate::webrtcsink::{Signallable, WebRTCSink};
use gst::glib;
use gst::subclass::prelude::ObjectSubclassExt;
use std::error::Error;
mod imp;
@ -12,7 +13,7 @@ unsafe impl Send for Signaller {}
unsafe impl Sync for Signaller {}
impl Signallable for Signaller {
fn start(&mut self, element: &WebRTCSink) -> Result<(), anyhow::Error> {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
signaller.start(element);
@ -24,7 +25,7 @@ impl Signallable for Signaller {
element: &WebRTCSink,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), anyhow::Error> {
) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
signaller.handle_sdp(element, peer_id, sdp);
Ok(())
@ -37,7 +38,7 @@ impl Signallable for Signaller {
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), anyhow::Error> {
) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
signaller.handle_ice(element, peer_id, candidate, sdp_mline_index, sdp_mid);
Ok(())

View file

@ -15,7 +15,7 @@ use std::ops::Mul;
use std::sync::Mutex;
use super::utils::{make_element, StreamProducer};
use super::{WebRTCSinkCongestionControl, WebRTCSinkMitigationMode};
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
use crate::signaller::Signaller;
use std::collections::BTreeMap;
@ -1387,19 +1387,28 @@ impl WebRTCSink {
}
/// Called by the signaller to add a new consumer
pub fn add_consumer(&self, element: &super::WebRTCSink, peer_id: &str) -> Result<(), Error> {
pub fn add_consumer(
&self,
element: &super::WebRTCSink,
peer_id: &str,
) -> Result<(), WebRTCSinkError> {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
if state.consumers.contains_key(peer_id) {
return Err(anyhow!("We already have a consumer with id {}", peer_id));
return Err(WebRTCSinkError::DuplicateConsumerId(peer_id.to_string()));
}
gst_info!(CAT, obj: element, "Adding consumer {}", peer_id);
let pipeline = gst::Pipeline::new(Some(&format!("consumer-pipeline-{}", peer_id)));
let webrtcbin = make_element("webrtcbin", None)?;
let webrtcbin = make_element("webrtcbin", None).map_err(|err| {
WebRTCSinkError::ConsumerPipelineError {
peer_id: peer_id.to_string(),
details: err.to_string(),
}
})?;
webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
@ -1615,11 +1624,21 @@ impl WebRTCSink {
}
});
pipeline.set_state(gst::State::Ready)?;
pipeline.set_state(gst::State::Ready).map_err(|err| {
WebRTCSinkError::ConsumerPipelineError {
peer_id: peer_id.to_string(),
details: err.to_string(),
}
})?;
element.emit_by_name::<()>("new-webrtcbin", &[&peer_id, &webrtcbin]);
pipeline.set_state(gst::State::Playing)?;
pipeline.set_state(gst::State::Playing).map_err(|err| {
WebRTCSinkError::ConsumerPipelineError {
peer_id: peer_id.to_string(),
details: err.to_string(),
}
})?;
state.consumers.insert(peer_id.to_string(), consumer);
@ -1632,11 +1651,11 @@ impl WebRTCSink {
element: &super::WebRTCSink,
peer_id: &str,
signal: bool,
) -> Result<(), Error> {
) -> Result<(), WebRTCSinkError> {
let mut state = self.state.lock().unwrap();
if !state.consumers.contains_key(peer_id) {
return Err(anyhow!("No consumer with ID {}", peer_id));
return Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()));
}
state.remove_consumer(element, peer_id, signal);
@ -1747,11 +1766,11 @@ impl WebRTCSink {
sdp_mline_index: Option<u32>,
_sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), Error> {
) -> Result<(), WebRTCSinkError> {
let state = self.state.lock().unwrap();
let sdp_mline_index = sdp_mline_index
.ok_or_else(|| anyhow!("SDP mline index is not optional at this time"))?;
let sdp_mline_index =
sdp_mline_index.ok_or_else(|| WebRTCSinkError::MandatorySdpMlineIndex)?;
if let Some(consumer) = state.consumers.get(peer_id) {
gst_trace!(CAT, "adding ice candidate for peer {}", peer_id);
@ -1760,7 +1779,7 @@ impl WebRTCSink {
.emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]);
Ok(())
} else {
Err(anyhow!("No consumer with ID {}", peer_id))
Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()))
}
}
@ -1770,7 +1789,7 @@ impl WebRTCSink {
element: &super::WebRTCSink,
peer_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Error> {
) -> Result<(), WebRTCSinkError> {
let mut state = self.state.lock().unwrap();
if let Some(consumer) = state.consumers.get_mut(peer_id) {
@ -1786,7 +1805,11 @@ impl WebRTCSink {
if media.attribute_val("inactive").is_some() {
gst_warning!(CAT, "consumer {} refused media {}", peer_id, media_idx);
state.remove_consumer(element, peer_id, true);
return Err(anyhow!("consumer {} refused media {}", peer_id, media_idx));
return Err(WebRTCSinkError::ConsumerRefusedMedia {
peer_id: peer_id.to_string(),
media_idx,
});
}
}
@ -1803,12 +1826,13 @@ impl WebRTCSink {
peer_id,
media_idx
);
state.remove_consumer(element, peer_id, true);
return Err(anyhow!(
"consumer {} did not provide valid payload for media index {}",
peer_id,
media_idx
));
return Err(WebRTCSinkError::ConsumerNoValidPayload {
peer_id: peer_id.to_string(),
media_idx,
});
}
}
@ -1830,7 +1854,7 @@ impl WebRTCSink {
Ok(())
} else {
Err(anyhow!("No consumer with ID {}", peer_id))
Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()))
}
}

View file

@ -1,7 +1,7 @@
use anyhow::Error;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::ObjectSubclassExt;
use std::error::Error;
mod imp;
mod utils;
@ -13,15 +13,31 @@ glib::wrapper! {
unsafe impl Send for WebRTCSink {}
unsafe impl Sync for WebRTCSink {}
#[derive(thiserror::Error, Debug)]
pub enum WebRTCSinkError {
#[error("no consumer with id")]
NoConsumerWithId(String),
#[error("consumer refused media")]
ConsumerRefusedMedia { peer_id: String, media_idx: u32 },
#[error("consumer did not provide valid payload for media")]
ConsumerNoValidPayload { peer_id: String, media_idx: u32 },
#[error("SDP mline index is currently mandatory")]
MandatorySdpMlineIndex,
#[error("duplicate consumer id")]
DuplicateConsumerId(String),
#[error("error setting up consumer pipeline")]
ConsumerPipelineError { peer_id: String, details: String },
}
pub trait Signallable: Sync + Send + 'static {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Error>;
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>>;
fn handle_sdp(
&mut self,
element: &WebRTCSink,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Error>;
) -> Result<(), Box<dyn Error>>;
/// sdp_mid is exposed for future proofing, see
/// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174,
@ -34,7 +50,7 @@ pub trait Signallable: Sync + Send + 'static {
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Error>;
) -> Result<(), Box<dyn Error>>;
fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str);
@ -70,7 +86,7 @@ impl WebRTCSink {
&self,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Error> {
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_sdp(self, peer_id, sdp)
@ -85,25 +101,25 @@ impl WebRTCSink {
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), Error> {
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_ice(self, peer_id, sdp_mline_index, sdp_mid, candidate)
}
pub fn handle_signalling_error(&self, error: anyhow::Error) {
pub fn handle_signalling_error(&self, error: Box<dyn Error + Send + Sync>) {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_signalling_error(self, error);
ws.handle_signalling_error(self, anyhow::anyhow!(error));
}
pub fn add_consumer(&self, peer_id: &str) -> Result<(), Error> {
pub fn add_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.add_consumer(self, peer_id)
}
pub fn remove_consumer(&self, peer_id: &str) -> Result<(), Error> {
pub fn remove_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.remove_consumer(self, peer_id, false)