mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-03-10 07:21:25 +00:00
net/webrtc: Handle concurrent POSTs to whipserversrc
Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/657 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2096>
This commit is contained in:
parent
bd6be75107
commit
a34efda201
1 changed files with 17 additions and 8 deletions
|
@ -15,6 +15,7 @@ use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
|
||||||
use reqwest::header::HeaderMap;
|
use reqwest::header::HeaderMap;
|
||||||
use reqwest::header::HeaderValue;
|
use reqwest::header::HeaderValue;
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::LazyLock;
|
use std::sync::LazyLock;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
@ -631,7 +632,7 @@ struct WhipServerSettings {
|
||||||
timeout: u32,
|
timeout: u32,
|
||||||
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
|
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
|
||||||
server_handle: Option<tokio::task::JoinHandle<()>>,
|
server_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
|
sdp_answer: HashMap<String, mpsc::Sender<Option<SDPMessage>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for WhipServerSettings {
|
impl Default for WhipServerSettings {
|
||||||
|
@ -643,7 +644,7 @@ impl Default for WhipServerSettings {
|
||||||
timeout: DEFAULT_TIMEOUT,
|
timeout: DEFAULT_TIMEOUT,
|
||||||
shutdown_signal: None,
|
shutdown_signal: None,
|
||||||
server_handle: None,
|
server_handle: None,
|
||||||
sdp_answer: None,
|
sdp_answer: HashMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -651,19 +652,20 @@ impl Default for WhipServerSettings {
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct WhipServer {
|
pub struct WhipServer {
|
||||||
settings: Mutex<WhipServerSettings>,
|
settings: Mutex<WhipServerSettings>,
|
||||||
canceller: Mutex<Option<futures::future::AbortHandle>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WhipServer {
|
impl WhipServer {
|
||||||
pub fn on_webrtcbin_ready(&self) -> RustClosure {
|
pub fn on_webrtcbin_ready(&self) -> RustClosure {
|
||||||
glib::closure!(|signaller: &super::WhipServerSignaller,
|
glib::closure!(|signaller: &super::WhipServerSignaller,
|
||||||
_producer_identifier: &str,
|
session_id: &str,
|
||||||
webrtcbin: &gst::Element| {
|
webrtcbin: &gst::Element| {
|
||||||
webrtcbin.connect_notify(
|
webrtcbin.connect_notify(
|
||||||
Some("ice-gathering-state"),
|
Some("ice-gathering-state"),
|
||||||
glib::clone!(
|
glib::clone!(
|
||||||
#[weak]
|
#[weak]
|
||||||
signaller,
|
signaller,
|
||||||
|
#[to_owned]
|
||||||
|
session_id,
|
||||||
move |webrtcbin, _pspec| {
|
move |webrtcbin, _pspec| {
|
||||||
let state =
|
let state =
|
||||||
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
|
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
|
||||||
|
@ -673,7 +675,11 @@ impl WhipServer {
|
||||||
gst::info!(CAT, obj = signaller, "ICE gathering started");
|
gst::info!(CAT, obj = signaller, "ICE gathering started");
|
||||||
}
|
}
|
||||||
WebRTCICEGatheringState::Complete => {
|
WebRTCICEGatheringState::Complete => {
|
||||||
gst::info!(CAT, obj = signaller, "ICE gathering complete");
|
gst::info!(
|
||||||
|
CAT,
|
||||||
|
obj = signaller,
|
||||||
|
"ICE gathering complete for {session_id}"
|
||||||
|
);
|
||||||
let ans: Option<gst_sdp::SDPMessage>;
|
let ans: Option<gst_sdp::SDPMessage>;
|
||||||
let mut settings = signaller.imp().settings.lock().unwrap();
|
let mut settings = signaller.imp().settings.lock().unwrap();
|
||||||
if let Some(answer_desc) = webrtcbin
|
if let Some(answer_desc) = webrtcbin
|
||||||
|
@ -687,7 +693,7 @@ impl WhipServer {
|
||||||
}
|
}
|
||||||
let tx = settings
|
let tx = settings
|
||||||
.sdp_answer
|
.sdp_answer
|
||||||
.take()
|
.remove(&session_id)
|
||||||
.expect("SDP answer Sender needs to be valid");
|
.expect("SDP answer Sender needs to be valid");
|
||||||
|
|
||||||
RUNTIME.spawn(glib::clone!(
|
RUNTIME.spawn(glib::clone!(
|
||||||
|
@ -796,7 +802,7 @@ impl WhipServer {
|
||||||
let wait_timeout = {
|
let wait_timeout = {
|
||||||
let mut settings = self.settings.lock().unwrap();
|
let mut settings = self.settings.lock().unwrap();
|
||||||
let wait_timeout = settings.timeout;
|
let wait_timeout = settings.timeout;
|
||||||
settings.sdp_answer = Some(tx);
|
settings.sdp_answer.insert(session_id.clone(), tx);
|
||||||
drop(settings);
|
drop(settings);
|
||||||
wait_timeout
|
wait_timeout
|
||||||
};
|
};
|
||||||
|
@ -821,7 +827,10 @@ impl WhipServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
|
// We don't support cancellation, instead we depend on the server shutdown oneshot and wait
|
||||||
|
// timeout here. FIXME: Maybe we can do better?
|
||||||
|
let canceller = Mutex::new(None);
|
||||||
|
let result = wait_async(&canceller, rx.recv(), wait_timeout).await;
|
||||||
|
|
||||||
let answer = match result {
|
let answer = match result {
|
||||||
Ok(ans) => match ans {
|
Ok(ans) => match ans {
|
||||||
|
|
Loading…
Reference in a new issue