webrtcsrc: add capability to initiate offer and handle answer

- add the handler for session-request signal from the signaller
to initiate an offer

- rename `handle_offer` function as `handle_remote_description` to use
if for the both remote offer and answer.

- in the function `remote_description_set` add checks to deal with answer and
offer separately

- in the session-description signal handler, call the handle_remote_description
for both offer and answer type remote description

- add a new member clock_rate in the Codec struct which is determined from the depayloader
pad templates

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1545>
This commit is contained in:
Taruntej Kanakamalla 2024-12-04 15:06:18 +05:30 committed by GStreamer Marge Bot
parent 34462445e8
commit 6d0cdb30f2
2 changed files with 280 additions and 57 deletions

View file

@ -436,6 +436,7 @@ pub struct Codec {
pub is_raw: bool,
payload_type: Option<i32>,
clock_rate: Option<i32>,
decoding_info: Option<DecodingInfo>,
encoding_info: Option<EncodingInfo>,
}
@ -450,8 +451,9 @@ impl Codec {
encoders: &glib::List<gst::ElementFactory>,
payloaders: &glib::List<gst::ElementFactory>,
) -> Self {
let mut clock_rate: Option<i32> = None;
let has_decoder = Self::has_decoder_for_caps(caps, decoders);
let has_depayloader = Self::has_depayloader_for_codec(name, depayloaders);
let has_depayloader = Self::has_depayloader_for_codec(name, depayloaders, &mut clock_rate);
let decoding_info = if has_depayloader && has_decoder {
Some(DecodingInfo {
@ -480,6 +482,7 @@ impl Codec {
name: name.into(),
is_raw: false,
payload_type: None,
clock_rate,
decoding_info,
encoding_info,
}
@ -491,7 +494,9 @@ impl Codec {
depayloaders: &glib::List<gst::ElementFactory>,
payloaders: &glib::List<gst::ElementFactory>,
) -> Self {
let decoding_info = if Self::has_depayloader_for_codec(name, depayloaders) {
let mut clock_rate: Option<i32> = None;
let decoding_info = if Self::has_depayloader_for_codec(name, depayloaders, &mut clock_rate)
{
Some(DecodingInfo {
has_decoder: AtomicBool::new(false),
})
@ -523,6 +528,7 @@ impl Codec {
name: name.into(),
is_raw: true,
payload_type: None,
clock_rate,
decoding_info,
encoding_info,
}
@ -632,6 +638,7 @@ impl Codec {
fn has_depayloader_for_codec(
codec: &str,
depayloaders: &glib::List<gst::ElementFactory>,
clock_rate: &mut Option<i32>,
) -> bool {
depayloaders.iter().any(|factory| {
factory.static_pad_templates().iter().any(|template| {
@ -646,7 +653,22 @@ impl Codec {
&& s.get::<gst::List>("encoding-name").map_or_else(
|_| {
if let Ok(encoding_name) = s.get::<&str>("encoding-name") {
encoding_name == codec
if encoding_name == codec {
if s.has_field("clock-rate") {
match s.get_optional::<i32>("clock-rate") {
Ok(Some(rate)) => {
*clock_rate = Some(rate);
}
_ => {
// if None or Err or IntRange
*clock_rate = None;
}
};
}
true
} else {
false
}
} else {
false
}
@ -668,6 +690,10 @@ impl Codec {
self.payload_type
}
pub fn clock_rate(&self) -> Option<i32> {
self.clock_rate
}
pub fn build_encoder(&self) -> Option<Result<gst::Element, Error>> {
self.encoding_info.as_ref().and_then(|info| {
info.encoder.as_ref().map(|encoder| {

View file

@ -333,6 +333,7 @@ struct SignallerSignals {
request_meta: glib::SignalHandlerId,
session_description: glib::SignalHandlerId,
handle_ice: glib::SignalHandlerId,
session_requested: glib::SignalHandlerId,
}
impl Session {
@ -818,12 +819,148 @@ impl Session {
ghostpad
}
fn generate_offer(&self, element: &super::BaseWebRTCSrc) {
let sess_id = self.id.clone();
let webrtcbin = self.webrtcbin();
let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly;
let mut pt = 96..127;
let settings = element.imp().settings.lock().unwrap();
let caps = settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|codec| {
let name = &codec.name;
let Some(pt) = pt.next() else {
gst::warning!(
CAT,
obj = element,
"exhausted the list of dynamic payload types, not adding transceiver for {name}"
);
return None;
};
let (media, clock_rate) = if codec.is_video() {
("video", codec.clock_rate().unwrap_or(90000))
} else {
("audio", codec.clock_rate().unwrap_or(48000))
};
let mut caps = gst::Caps::new_empty();
{
let caps = caps.get_mut().unwrap();
let mut s = gst::Structure::builder("application/x-rtp")
.field("media", media)
.field("payload", pt)
.field("encoding-name", name.as_str())
.field("clock-rate", clock_rate)
.build();
if name.eq_ignore_ascii_case("H264") {
// support the constrained-baseline profile for now
// TODO: extend this to other supported profiles by querying the decoders
s.set("profile-level-id", "42e016");
}
caps.append_structure(s);
}
Some(caps)
});
for c in caps.flatten() {
gst::info!(CAT, obj = element, "Adding transceiver with caps: {c:#?}");
let transceiver = webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&direction, &c],
);
transceiver.set_property("do_nack", settings.do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
let webrtcbin_weak = webrtcbin.downgrade();
let promise = gst::Promise::with_change_func(glib::clone!(
#[weak (rename_to = ele)]
element,
move |reply| {
let Some(webrtcbin) = webrtcbin_weak.upgrade() else {
gst::error!(CAT, obj = ele, "generate offer::failed to get webrtcbin");
ele.imp().signaller().end_session(sess_id.as_str());
return;
};
let reply = match reply {
Ok(Some(reply)) => reply,
Ok(None) => {
gst::error!(
CAT,
obj = ele,
"generate offer::Promise returned with no reply"
);
ele.imp().signaller().end_session(sess_id.as_str());
return;
}
Err(e) => {
gst::error!(
CAT,
obj = ele,
"generate offer::Promise returned with error {:?}",
e
);
ele.imp().signaller().end_session(sess_id.as_str());
return;
}
};
if let Ok(offer_sdp) = reply
.value("offer")
.map(|offer| offer.get::<gst_webrtc::WebRTCSessionDescription>().unwrap())
{
webrtcbin.emit_by_name::<()>(
"set-local-description",
&[&offer_sdp, &None::<gst::Promise>],
);
gst::log!(
CAT,
obj = ele,
"Sending SDP, {}",
offer_sdp.sdp().to_string()
);
let signaller = ele.imp().signaller();
signaller.send_sdp(sess_id.as_str(), &offer_sdp);
} else {
let error = reply
.value("error")
.expect("structure must have an error value")
.get::<glib::Error>()
.expect("value must be a GLib error");
gst::error!(
CAT,
obj = ele,
"generate offer::Promise returned with error: {}",
error
);
ele.imp().signaller().end_session(sess_id.as_str());
}
}
));
webrtcbin
.clone()
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
}
fn remote_description_set(
&mut self,
element: &super::BaseWebRTCSrc,
offer: &gst_webrtc::WebRTCSessionDescription,
desc: &gst_webrtc::WebRTCSessionDescription,
) -> (gst::Promise, gst::Bin) {
let sdp = offer.sdp();
let sdp = desc.sdp();
let desc_type = desc.type_();
let webrtcbin = self.webrtcbin();
for (i, media) in sdp.medias().enumerate() {
let (codec_names, do_retransmission) = {
@ -890,40 +1027,53 @@ impl Session {
.imp()
.create_and_probe_src_pad(&caps, &stream_id, self)
{
gst::info!(
if desc_type == gst_webrtc::WebRTCSDPType::Offer {
gst::info!(
CAT,
obj = element,
"Getting transceiver for {stream_id} and index {i} with caps: {caps:#?}"
);
let mut transceiver = None;
let mut idx = 0i32;
// find the transceiver with this mline
loop {
let Some(to_check) = webrtcbin
.emit_by_name::<Option<gst_webrtc::WebRTCRTPTransceiver>>(
"get-transceiver",
&[&idx],
)
else {
break;
};
let mline = to_check.property::<u32>("mlineindex");
if mline as usize == i {
transceiver = Some(to_check);
break;
);
let mut transceiver = None;
let mut idx = 0i32;
// find the transceiver with this mline
loop {
let Some(to_check) = webrtcbin
.emit_by_name::<Option<gst_webrtc::WebRTCRTPTransceiver>>(
"get-transceiver",
&[&idx],
)
else {
break;
};
let mline = to_check.property::<u32>("mlineindex");
if mline as usize == i {
transceiver = Some(to_check);
break;
}
idx += 1;
}
idx += 1;
}
let transceiver = transceiver.unwrap_or_else(|| {
let transceiver = transceiver.unwrap_or_else(|| {
gst::warning!(CAT, "Transceiver for idx {i} does not exist, GStreamer <= 1.24, adding it ourself");
webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly, &caps])
});
});
transceiver.set_property("do_nack", do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
transceiver.set_property("codec-preferences", caps);
transceiver.set_property("do_nack", do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
transceiver.set_property("codec-preferences", caps);
} else {
// SDP type is answer,
// so the transceiver must have already been created while sending offer
}
} else {
gst::error!(
CAT,
obj = element,
"Failed to create src pad with caps {:?}",
caps
);
}
} else {
gst::info!(
@ -940,53 +1090,78 @@ impl Session {
#[strong(rename_to = session_id)]
self.id,
move |reply| {
let state = element.imp().state.lock().unwrap();
gst::info!(CAT, obj = element, "got answer for session {session_id:?}");
let Some(session) = state.sessions.get(&session_id) else {
gst::error!(CAT, obj = element, "no session {session_id:?}");
if desc_type == gst_webrtc::WebRTCSDPType::Offer {
let state = element.imp().state.lock().unwrap();
gst::info!(CAT, obj = element, "got answer for session {session_id:?}");
let Some(session) = state.sessions.get(&session_id) else {
gst::error!(CAT, obj = element, "no session {session_id:?}");
return;
};
session.on_answer_created(reply, &element);
} else {
gst::log!(
CAT,
obj = element,
"Nothing to do in the promise in case of an answer"
);
return;
};
session.on_answer_created(reply, &element);
}
}
));
(promise, webrtcbin.clone())
}
fn handle_offer(
fn handle_remote_description(
&mut self,
offer: &gst_webrtc::WebRTCSessionDescription,
desc: &gst_webrtc::WebRTCSessionDescription,
element: &super::BaseWebRTCSrc,
) -> (gst::Promise, gst::Bin) {
gst::log!(CAT, obj = element, "Got offer {}", offer.sdp().to_string());
gst::debug!(
CAT,
obj = element,
"Got remote description: {}",
desc.sdp().to_string()
);
let promise = gst::Promise::with_change_func(glib::clone!(
#[weak]
element,
#[strong]
offer,
desc,
#[strong(rename_to = session_id)]
self.id,
move |_| {
let mut state = element.imp().state.lock().unwrap();
gst::info!(CAT, obj = element, "got offer for session {session_id:?}");
gst::info!(
CAT,
obj = element,
"got {:?} for session {session_id:?}",
desc.type_()
);
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::error!(CAT, obj = element, "no session {session_id:?}");
return;
};
let (promise, webrtcbin) = session.remote_description_set(&element, &offer);
let (promise, webrtcbin) = session.remote_description_set(&element, &desc);
drop(state);
webrtcbin.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
if desc.type_() == gst_webrtc::WebRTCSDPType::Offer {
webrtcbin
.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
} else {
// Nothing to do with the promise in case of an answer
promise.reply(None);
}
}
));
// We cannot emit `set-remote-description` from here. The promise
// function needs the state lock which is held by the caller
// of `handle_offer`. So return the promise to the caller so that
// the it can drop the `state` and safely emit `set-remote-description`
// of `handle_remote_description`. So return the promise to the caller so that
// it can drop the `state` and safely emit `set-remote-description`
(promise, self.webrtcbin().clone())
}
@ -1225,6 +1400,22 @@ impl BaseWebRTCSrc {
),
),
session_requested: signaller.connect_closure(
"session-requested",
false,
glib::closure!(#[watch] instance, move |_signaler: glib::Object, session_id: &str, _peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{
if offer.is_none() {
let this = instance.imp();
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
gst::error!(CAT, imp = this, "session {session_id:?} not found");
return
};
session.generate_offer(&this.obj());
}
}),
),
request_meta: signaller.connect_closure(
"request-meta",
false,
@ -1246,20 +1437,26 @@ impl BaseWebRTCSrc {
move |_signaller: glib::Object,
session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription| {
assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer);
let this = instance.imp();
gst::info!(CAT, imp = this, "got sdp offer");
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(session_id) else {
gst::error!(CAT, imp = this, "session {session_id:?} not found");
return;
};
match desc.type_() {
gst_webrtc::WebRTCSDPType::Offer | gst_webrtc::WebRTCSDPType::Answer => {
let this = instance.imp();
gst::info!(CAT, imp = this, "got sdp : {:?}", desc.type_());
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(session_id) else {
gst::error!(CAT, imp = this, "session {session_id:?} not found");
return;
};
let (promise, webrtcbin) = session.handle_offer(desc, &this.obj());
drop(state);
webrtcbin
.emit_by_name::<()>("set-remote-description", &[&desc, &promise]);
let (promise, webrtcbin) = session.handle_remote_description(desc, &this.obj());
drop(state);
webrtcbin
.emit_by_name::<()>("set-remote-description", &[&desc, &promise]);
},
_ => {
unimplemented!("{:?} type remote description not handled", desc.type_());
},
}
}
),
),