net/webrtc: rename WhipSignaller as WhipClientSignaller

remove generalized names to accommodate for the WhipServer
- name the Signaller for whipsink as WhipClient
- name the Settings for whipsink as WhipClientSettings
- name the State for whipsink as WhipClientState

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1284>
This commit is contained in:
Taruntej Kanakamalla 2023-07-18 20:15:24 +05:30 committed by GStreamer Marge Bot
parent a0638ec983
commit 2d3d03b4d3
3 changed files with 33 additions and 31 deletions

View file

@ -24,7 +24,7 @@ use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMo
use crate::aws_kvs_signaller::AwsKvsSignaller; use crate::aws_kvs_signaller::AwsKvsSignaller;
use crate::livekit_signaller::LiveKitSignaller; use crate::livekit_signaller::LiveKitSignaller;
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
use crate::whip_signaller::WhipSignaller; use crate::whip_signaller::WhipClientSignaller;
use crate::RUNTIME; use crate::RUNTIME;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
@ -4131,7 +4131,7 @@ impl ObjectImpl for WhipWebRTCSink {
let element = self.obj(); let element = self.obj();
let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp(); let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp();
let _ = ws.set_signaller(WhipSignaller::default().upcast()); let _ = ws.set_signaller(WhipClientSignaller::default().upcast());
} }
} }

View file

@ -28,27 +28,27 @@ const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15; const DEFAULT_TIMEOUT: u32 = 15;
#[derive(Debug)] #[derive(Debug)]
enum State { enum WhipClientState {
Stopped, Stopped,
Post { redirects: u8 }, Post { redirects: u8 },
Running { whip_resource_url: String }, Running { whip_resource_url: String },
} }
impl Default for State { impl Default for WhipClientState {
fn default() -> Self { fn default() -> Self {
Self::Stopped Self::Stopped
} }
} }
#[derive(Clone)] #[derive(Clone)]
struct Settings { struct WhipClientSettings {
whip_endpoint: Option<String>, whip_endpoint: Option<String>,
use_link_headers: bool, use_link_headers: bool,
auth_token: Option<String>, auth_token: Option<String>,
timeout: u32, timeout: u32,
} }
impl Default for Settings { impl Default for WhipClientSettings {
fn default() -> Self { fn default() -> Self {
Self { Self {
whip_endpoint: None, whip_endpoint: None,
@ -60,13 +60,13 @@ impl Default for Settings {
} }
#[derive(Default)] #[derive(Default)]
pub struct Signaller { pub struct WhipClient {
state: Mutex<State>, state: Mutex<WhipClientState>,
settings: Mutex<Settings>, settings: Mutex<WhipClientSettings>,
canceller: Mutex<Option<futures::future::AbortHandle>>, canceller: Mutex<Option<futures::future::AbortHandle>>,
} }
impl Signaller { impl WhipClient {
fn raise_error(&self, msg: String) { fn raise_error(&self, msg: String) {
self.obj() self.obj()
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]); .emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
@ -84,7 +84,7 @@ impl Signaller {
async fn send_offer(&self, webrtcbin: &gst::Element) { async fn send_offer(&self, webrtcbin: &gst::Element) {
{ {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = State::Post { redirects: 0 }; *state = WhipClientState::Post { redirects: 0 };
drop(state); drop(state);
} }
@ -148,7 +148,7 @@ impl Signaller {
{ {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
redirects = match *state { redirects = match *state {
State::Post { redirects } => redirects, WhipClientState::Post { redirects } => redirects,
_ => { _ => {
self.raise_error("Trying to do POST in unexpected state".to_string()); self.raise_error("Trying to do POST in unexpected state".to_string());
return; return;
@ -268,7 +268,7 @@ impl Signaller {
{ {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = match *state { *state = match *state {
State::Post { redirects: _r } => State::Running { WhipClientState::Post { redirects: _r } => WhipClientState::Running {
whip_resource_url: url.to_string(), whip_resource_url: url.to_string(),
}, },
_ => { _ => {
@ -306,22 +306,24 @@ impl Signaller {
{ {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = match *state { *state = match *state {
State::Post { redirects: _r } => State::Post { WhipClientState::Post { redirects: _r } => {
redirects: redirects + 1, WhipClientState::Post {
}, redirects: redirects + 1,
}
}
/* /*
* As per section 4.6 of the specification, redirection is * As per section 4.6 of the specification, redirection is
* not required to be supported for the PATCH and DELETE * not required to be supported for the PATCH and DELETE
* requests to the final WHEP resource URL. Only the initial * requests to the final WHEP resource URL. Only the initial
* POST request may support redirection. * POST request may support redirection.
*/ */
State::Running { .. } => { WhipClientState::Running { .. } => {
self.raise_error( self.raise_error(
"Unexpected redirection in RUNNING state".to_string(), "Unexpected redirection in RUNNING state".to_string(),
); );
return; return;
} }
State::Stopped => unreachable!(), WhipClientState::Stopped => unreachable!(),
}; };
drop(state); drop(state);
} }
@ -362,7 +364,7 @@ impl Signaller {
let timeout = settings.timeout; let timeout = settings.timeout;
let resource_url = match *state { let resource_url = match *state {
State::Running { WhipClientState::Running {
whip_resource_url: ref resource_url, whip_resource_url: ref resource_url,
} => resource_url.clone(), } => resource_url.clone(),
_ => { _ => {
@ -416,7 +418,7 @@ impl Signaller {
} }
} }
impl SignallableImpl for Signaller { impl SignallableImpl for WhipClient {
fn start(&self) { fn start(&self) {
if self.settings.lock().unwrap().whip_endpoint.is_none() { if self.settings.lock().unwrap().whip_endpoint.is_none() {
self.raise_error("WHIP endpoint URL must be set".to_string()); self.raise_error("WHIP endpoint URL must be set".to_string());
@ -426,7 +428,7 @@ impl SignallableImpl for Signaller {
self.obj().connect_closure( self.obj().connect_closure(
"consumer-added", "consumer-added",
false, false,
glib::closure!(|signaller: &super::WhipSignaller, glib::closure!(|signaller: &super::WhipClientSignaller,
_consumer_identifier: &str, _consumer_identifier: &str,
webrtcbin: &gst::Element| { webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade(); let obj_weak = signaller.downgrade();
@ -475,7 +477,7 @@ impl SignallableImpl for Signaller {
} }
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let State::Running { .. } = *state { if let WhipClientState::Running { .. } = *state {
// Release server-side resources // Release server-side resources
drop(state); drop(state);
} }
@ -490,7 +492,7 @@ impl SignallableImpl for Signaller {
} }
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let State::Running { .. } = *state { if let WhipClientState::Running { .. } = *state {
// Release server-side resources // Release server-side resources
drop(state); drop(state);
self.terminate_session(); self.terminate_session();
@ -499,14 +501,14 @@ impl SignallableImpl for Signaller {
} }
#[glib::object_subclass] #[glib::object_subclass]
impl ObjectSubclass for Signaller { impl ObjectSubclass for WhipClient {
const NAME: &'static str = "GstWHIPWebRTCSinkSignaller"; const NAME: &'static str = "GstWhipClientSignaller";
type Type = super::WhipSignaller; type Type = super::WhipClientSignaller;
type ParentType = glib::Object; type ParentType = glib::Object;
type Interfaces = (Signallable,); type Interfaces = (Signallable,);
} }
impl ObjectImpl for Signaller { impl ObjectImpl for WhipClient {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecString::builder("whip-endpoint") vec![glib::ParamSpecString::builder("whip-endpoint")

View file

@ -6,13 +6,13 @@ use gst::glib;
mod imp; mod imp;
glib::wrapper! { glib::wrapper! {
pub struct WhipSignaller(ObjectSubclass<imp::Signaller>) @implements Signallable; pub struct WhipClientSignaller(ObjectSubclass<imp::WhipClient>) @implements Signallable;
} }
unsafe impl Send for WhipSignaller {} unsafe impl Send for WhipClientSignaller {}
unsafe impl Sync for WhipSignaller {} unsafe impl Sync for WhipClientSignaller {}
impl Default for WhipSignaller { impl Default for WhipClientSignaller {
fn default() -> Self { fn default() -> Self {
glib::Object::new() glib::Object::new()
} }