Allow creating source elements from the device provider and don't do discovery during element setup anymore

While making it possible to create elements from the device provider,
this also speeds up the connection generally when starting up the
elements.

Also add the url-address property for additional filtering in addition
to the NDI name.
This commit is contained in:
Sebastian Dröge 2020-01-16 11:14:10 +02:00
parent f2a4699d13
commit 8d2c025e47
4 changed files with 183 additions and 390 deletions

View file

@ -149,24 +149,6 @@ impl<'a> Source<'a> {
}
}
fn ndi_name_ptr(&self) -> *const ::std::os::raw::c_char {
unsafe {
match *self {
Source::Borrowed(ptr, _) => ptr.as_ref().p_ndi_name,
Source::Owned(_, ref ndi_name, _) => ndi_name.as_ptr(),
}
}
}
fn url_address_ptr(&self) -> *const ::std::os::raw::c_char {
unsafe {
match *self {
Source::Borrowed(ptr, _) => ptr.as_ref().p_url_address,
Source::Owned(_, _, ref url_address) => url_address.as_ptr(),
}
}
}
pub fn to_owned<'b>(&self) -> Source<'b> {
unsafe {
let (ndi_name, url_address) = match *self {
@ -200,11 +182,11 @@ impl<'a> PartialEq for Source<'a> {
#[derive(Debug)]
pub struct RecvBuilder<'a> {
source_to_connect_to: &'a Source<'a>,
source_to_connect_to: (&'a str, Option<&'a str>),
allow_video_fields: bool,
bandwidth: NDIlib_recv_bandwidth_e,
color_format: NDIlib_recv_color_format_e,
ndi_name: &'a str,
ndi_recv_name: &'a str,
}
impl<'a> RecvBuilder<'a> {
@ -228,16 +210,25 @@ impl<'a> RecvBuilder<'a> {
pub fn build(self) -> Option<RecvInstance> {
unsafe {
let ndi_name = ffi::CString::new(self.ndi_name).unwrap();
let ndi_recv_name = ffi::CString::new(self.ndi_recv_name).unwrap();
let ndi_name = ffi::CString::new(self.source_to_connect_to.0).unwrap();
let url_address = self
.source_to_connect_to
.1
.as_ref()
.map(|s| ffi::CString::new(*s).unwrap());
let ptr = NDIlib_recv_create_v3(&NDIlib_recv_create_v3_t {
source_to_connect_to: NDIlib_source_t {
p_ndi_name: self.source_to_connect_to.ndi_name_ptr(),
p_url_address: self.source_to_connect_to.url_address_ptr(),
p_ndi_name: ndi_name.as_ptr(),
p_url_address: url_address
.as_ref()
.map(|s| s.as_ptr())
.unwrap_or_else(|| ptr::null_mut()),
},
allow_video_fields: self.allow_video_fields,
bandwidth: self.bandwidth,
color_format: self.color_format,
p_ndi_recv_name: ndi_name.as_ptr(),
p_ndi_recv_name: ndi_recv_name.as_ptr(),
});
if ptr.is_null() {
@ -266,13 +257,16 @@ unsafe impl Send for RecvInstanceInner {}
unsafe impl Sync for RecvInstanceInner {}
impl RecvInstance {
pub fn builder<'a>(source_to_connect_to: &'a Source, ndi_name: &'a str) -> RecvBuilder<'a> {
pub fn builder<'a>(
source_to_connect_to: (&'a str, Option<&'a str>),
ndi_recv_name: &'a str,
) -> RecvBuilder<'a> {
RecvBuilder {
source_to_connect_to,
allow_video_fields: true,
bandwidth: NDIlib_recv_bandwidth_highest,
color_format: NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA,
ndi_name,
ndi_recv_name,
}
}

View file

@ -24,6 +24,7 @@ use crate::DEFAULT_RECEIVER_NDI_NAME;
#[derive(Debug, Clone)]
struct Settings {
ndi_name: Option<String>,
url_address: Option<String>,
connect_timeout: u32,
timeout: u32,
receiver_ndi_name: String,
@ -35,6 +36,7 @@ impl Default for Settings {
fn default() -> Self {
Settings {
ndi_name: None,
url_address: None,
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000,
timeout: 5000,
@ -44,7 +46,7 @@ impl Default for Settings {
}
}
static PROPERTIES: [subclass::Property; 6] = [
static PROPERTIES: [subclass::Property; 7] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
@ -54,6 +56,15 @@ static PROPERTIES: [subclass::Property; 6] = [
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("url-address", |name| {
glib::ParamSpec::string(
name,
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961. This is used as an additional filter together with the NDI name.",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
@ -216,6 +227,18 @@ impl ObjectImpl for NdiAudioSrc {
);
settings.ndi_name = ndi_name;
}
subclass::Property("url-address", ..) => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
subclass::Property("receiver-ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get().unwrap();
@ -293,6 +316,10 @@ impl ObjectImpl for NdiAudioSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value())
}
subclass::Property("url-address", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.url_address.to_value())
}
subclass::Property("receiver-ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.receiver_ndi_name.to_value())
@ -374,19 +401,18 @@ impl BaseSrcImpl for NdiAudioSrc {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
let ndi_name = if let Some(ref ndi_name) = settings.ndi_name {
ndi_name
} else {
if settings.ndi_name.is_none() {
return Err(gst_error_msg!(
gst::LibraryError::Settings,
["No IP address or NDI name given"]
));
};
}
let receiver = connect_ndi(
self.cat,
element,
ndi_name,
settings.ndi_name.as_ref().unwrap().as_str(),
settings.url_address.as_ref().map(String::as_str),
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,

View file

@ -25,6 +25,7 @@ use crate::DEFAULT_RECEIVER_NDI_NAME;
#[derive(Debug, Clone)]
struct Settings {
ndi_name: Option<String>,
url_address: Option<String>,
connect_timeout: u32,
timeout: u32,
receiver_ndi_name: String,
@ -36,6 +37,7 @@ impl Default for Settings {
fn default() -> Self {
Settings {
ndi_name: None,
url_address: None,
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000,
timeout: 5000,
@ -45,7 +47,7 @@ impl Default for Settings {
}
}
static PROPERTIES: [subclass::Property; 6] = [
static PROPERTIES: [subclass::Property; 7] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
@ -55,6 +57,15 @@ static PROPERTIES: [subclass::Property; 6] = [
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("url-address", |name| {
glib::ParamSpec::string(
name,
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961. This is used as an additional filter together with the NDI name.",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
@ -251,6 +262,18 @@ impl ObjectImpl for NdiVideoSrc {
);
settings.ndi_name = ndi_name;
}
subclass::Property("url-address", ..) => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
subclass::Property("receiver-ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get().unwrap();
@ -328,6 +351,10 @@ impl ObjectImpl for NdiVideoSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value())
}
subclass::Property("url-address", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.url_address.to_value())
}
subclass::Property("receiver-ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.receiver_ndi_name.to_value())
@ -409,19 +436,18 @@ impl BaseSrcImpl for NdiVideoSrc {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
let ndi_name = if let Some(ref ndi_name) = settings.ndi_name {
ndi_name
} else {
if settings.ndi_name.is_none() {
return Err(gst_error_msg!(
gst::LibraryError::Settings,
["No IP address or NDI name given"]
["No NDI name given"]
));
};
}
let receiver = connect_ndi(
self.cat,
element,
ndi_name,
settings.ndi_name.as_ref().unwrap().as_str(),
settings.url_address.as_ref().map(String::as_str),
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,

View file

@ -9,28 +9,20 @@ use byte_slice_cast::AsMutSliceOf;
use std::cmp;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::thread;
use super::*;
enum ReceiverInfo {
Connecting {
id: usize,
ndi_name: String,
video: Option<Weak<ReceiverInner<VideoReceiver>>>,
audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
observations: Observations,
},
Connected {
id: usize,
ndi_name: String,
recv: RecvInstance,
video: Option<Weak<ReceiverInner<VideoReceiver>>>,
audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
observations: Observations,
},
pub struct ReceiverInfo {
id: usize,
ndi_name: String,
url_address: Option<String>,
recv: RecvInstance,
video: Option<Weak<ReceiverInner<VideoReceiver>>>,
audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
observations: Observations,
}
lazy_static! {
@ -81,15 +73,17 @@ pub struct ReceiverInner<T: ReceiverType> {
queue: ReceiverQueue<T>,
recv: Mutex<Option<RecvInstance>>,
recv_cond: Condvar,
recv: Mutex<RecvInstance>,
observations: Observations,
cat: gst::DebugCategory,
element: glib::WeakRef<gst_base::BaseSrc>,
timestamp_mode: TimestampMode,
first_frame: AtomicBool,
timeout: u32,
connect_timeout: u32,
thread: Mutex<Option<std::thread::JoinHandle<()>>>,
}
@ -385,32 +379,15 @@ impl<T: ReceiverType> Receiver<T> {
info: &mut ReceiverInfo,
timestamp_mode: TimestampMode,
timeout: u32,
connect_timeout: u32,
element: &gst_base::BaseSrc,
cat: gst::DebugCategory,
) -> Self
where
Receiver<T>: ReceiverCapture<T>,
{
let (id, storage_video, storage_audio, recv, observations) = match info {
ReceiverInfo::Connecting {
id,
ref observations,
ref mut audio,
ref mut video,
..
} => (*id, video, audio, None, observations),
ReceiverInfo::Connected {
id,
ref mut recv,
ref observations,
ref mut audio,
ref mut video,
..
} => (*id, video, audio, Some(recv.clone()), observations),
};
let receiver = Receiver(Arc::new(ReceiverInner {
id,
id: info.id,
queue: ReceiverQueue(Arc::new((
Mutex::new(ReceiverQueueInner {
capturing: true,
@ -422,13 +399,14 @@ impl<T: ReceiverType> Receiver<T> {
}),
Condvar::new(),
))),
recv: Mutex::new(recv),
recv_cond: Condvar::new(),
observations: observations.clone(),
recv: Mutex::new(info.recv.clone()),
observations: info.observations.clone(),
cat,
element: element.downgrade(),
timestamp_mode,
first_frame: AtomicBool::new(true),
timeout,
connect_timeout,
thread: Mutex::new(None),
}));
@ -459,7 +437,7 @@ impl<T: ReceiverType> Receiver<T> {
});
let weak = Arc::downgrade(&receiver.0);
Self::store_internal(storage_video, storage_audio, weak);
Self::store_internal(info, weak);
*receiver.0.thread.lock().unwrap() = Some(thread);
@ -522,24 +500,12 @@ impl<T: ReceiverType> Drop for ReceiverInner<T> {
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
{
let val = receivers.get_mut(&self.id).unwrap();
let (audio, video) = match val {
ReceiverInfo::Connecting {
ref mut audio,
ref mut video,
..
} => (audio, video),
ReceiverInfo::Connected {
ref mut audio,
ref mut video,
..
} => (audio, video),
};
if video.is_some() && audio.is_some() {
let receiver = receivers.get_mut(&self.id).unwrap();
if receiver.audio.is_some() && receiver.video.is_some() {
if T::IS_VIDEO {
*video = None;
receiver.video = None;
} else {
*audio = None;
receiver.audio = None;
}
return;
}
@ -556,6 +522,7 @@ pub fn connect_ndi<T: ReceiverType>(
cat: gst::DebugCategory,
element: &gst_base::BaseSrc,
ndi_name: &str,
url_address: Option<&str>,
receiver_ndi_name: &str,
connect_timeout: u32,
bandwidth: NDIlib_recv_bandwidth_e,
@ -570,227 +537,61 @@ where
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
// Check if we already have a receiver for this very stream
for val in receivers.values_mut() {
let (val_audio, val_video, val_ndi_name) = match val {
ReceiverInfo::Connecting {
ref mut audio,
ref mut video,
ref ndi_name,
..
} => (audio, video, ndi_name.as_str()),
ReceiverInfo::Connected {
ref mut audio,
ref mut video,
ref ndi_name,
..
} => (audio, video, ndi_name.as_str()),
};
if val_ndi_name == ndi_name {
if (val_video.is_some() || !T::IS_VIDEO) && (val_audio.is_some() || T::IS_VIDEO) {
for receiver in receivers.values_mut() {
if receiver.ndi_name == ndi_name
&& receiver.url_address.as_ref().map(String::as_str) == url_address
{
if (receiver.video.is_some() || !T::IS_VIDEO)
&& (receiver.audio.is_some() || T::IS_VIDEO)
{
gst_element_error!(
element,
gst::ResourceError::OpenRead,
[
"Source with ndi-name '{}' already in use for {}",
val_ndi_name,
receiver.ndi_name,
if T::IS_VIDEO { "video" } else { "audio" }
]
);
return None;
} else {
return Some(Receiver::new(val, timestamp_mode, timeout, element, cat));
return Some(Receiver::new(
receiver,
timestamp_mode,
timeout,
connect_timeout,
element,
cat,
));
}
}
}
// Otherwise asynchronously search for it and return the receiver to the caller
let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst);
let mut info = ReceiverInfo::Connecting {
id: id_receiver,
ndi_name: String::from(ndi_name),
video: None,
audio: None,
observations: Observations::new(),
};
let receiver = Receiver::new(&mut info, timestamp_mode, timeout, element, cat);
receivers.insert(id_receiver, info);
let receiver_ndi_name = String::from(receiver_ndi_name);
let element = element.clone();
thread::spawn(move || {
use std::panic;
let res = match panic::catch_unwind(move || {
connect_ndi_async(
cat,
&element,
id_receiver,
receiver_ndi_name,
connect_timeout,
bandwidth,
)
}) {
Ok(res) => res,
Err(_) => Err(Some(gst_error_msg!(
gst::LibraryError::Failed,
["Panic while connecting to NDI source"]
))),
};
match res {
Ok(_) => (),
Err(None) => {
gst_debug!(cat, "Shutting down while connecting");
}
Err(Some(err)) => {
gst_error!(cat, "Error while connecting: {:?}", err);
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let info = match receivers.get_mut(&id_receiver) {
None => return,
Some(val) => val,
};
let (audio, video) = match info {
ReceiverInfo::Connecting {
ref audio,
ref video,
..
} => (audio, video),
ReceiverInfo::Connected { .. } => unreachable!(),
};
assert!(audio.is_some() || video.is_some());
if let Some(audio) = audio.as_ref().and_then(|v| v.upgrade()).map(Receiver) {
if let Some(element) = audio.0.element.upgrade() {
element.post_error_message(&err);
}
let audio_recv = audio.0.recv.lock().unwrap();
let mut queue = (audio.0.queue.0).0.lock().unwrap();
assert!(audio_recv.is_none());
queue.error = Some(gst::FlowError::Error);
audio.0.recv_cond.notify_one();
(audio.0.queue.0).1.notify_one();
}
if let Some(video) = video.as_ref().and_then(|v| v.upgrade()).map(Receiver) {
if let Some(element) = video.0.element.upgrade() {
element.post_error_message(&err);
}
let video_recv = video.0.recv.lock().unwrap();
let mut queue = (video.0.queue.0).0.lock().unwrap();
assert!(video_recv.is_none());
queue.error = Some(gst::FlowError::Error);
video.0.recv_cond.notify_one();
(video.0.queue.0).1.notify_one();
}
}
}
});
Some(receiver)
}
fn connect_ndi_async(
cat: gst::DebugCategory,
element: &gst_base::BaseSrc,
id_receiver: usize,
receiver_ndi_name: String,
connect_timeout: u32,
bandwidth: NDIlib_recv_bandwidth_e,
) -> Result<(), Option<gst::ErrorMessage>> {
let mut find = match FindInstance::builder().build() {
None => {
return Err(Some(gst_error_msg!(
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_find_create_v2 error"]
)));
}
Some(find) => find,
};
let timer = time::Instant::now();
let source = loop {
let new_sources = find.wait_for_sources(100);
let sources = find.get_current_sources();
gst_debug!(
cat,
obj: element,
"Total sources found in network {}",
sources.len(),
);
if new_sources {
for source in &sources {
gst_debug!(
cat,
obj: element,
"Found source '{}' with URL {}",
source.ndi_name(),
source.url_address(),
);
}
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let info = match receivers.get(&id_receiver) {
None => return Err(None),
Some(val) => val,
};
let ndi_name = match info {
ReceiverInfo::Connecting {
ref ndi_name,
ref audio,
ref video,
..
} => {
assert!(audio.is_some() || video.is_some());
ndi_name
}
ReceiverInfo::Connected { .. } => unreachable!(),
};
let source = sources.iter().find(|s| s.ndi_name() == ndi_name.as_str());
if let Some(source) = source {
break source.to_owned();
}
}
if timer.elapsed().as_millis() >= connect_timeout as u128 {
return Err(Some(gst_error_msg!(
gst::ResourceError::NotFound,
["Stream not found"]
)));
}
};
// Otherwise create a new one and return it to the caller
gst_debug!(
cat,
obj: element,
"Connecting to NDI source with ndi-name '{}' and URL {}",
source.ndi_name(),
source.url_address(),
"Connecting to NDI source with ndi-name '{}' and URL/Address {:?}",
ndi_name,
url_address,
);
// FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be
// broken with interlaced content currently
let recv = RecvInstance::builder(&source, &receiver_ndi_name)
let recv = RecvInstance::builder((ndi_name, url_address), &receiver_ndi_name)
.bandwidth(bandwidth)
.color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA)
.allow_video_fields(true)
.build();
let recv = match recv {
None => {
return Err(Some(gst_error_msg!(
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Failed to connect to source"]
)));
);
return None;
}
Some(recv) => recv,
};
@ -800,96 +601,36 @@ fn connect_ndi_async(
let enable_hw_accel = MetadataFrame::new(0, Some("<ndi_hwaccel enabled=\"true\"/>"));
recv.send_metadata(&enable_hw_accel);
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let info = match receivers.get_mut(&id_receiver) {
None => return Err(None),
Some(val) => val,
};
let (audio, video, observations) = match info {
ReceiverInfo::Connecting {
ref audio,
ref video,
ref observations,
..
} => (audio.clone(), video.clone(), observations),
ReceiverInfo::Connected { .. } => unreachable!(),
};
assert!(audio.is_some() || video.is_some());
*info = ReceiverInfo::Connected {
let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst);
let mut info = ReceiverInfo {
id: id_receiver,
ndi_name: source.ndi_name().to_owned(),
recv: recv.clone(),
video: video.clone(),
audio: audio.clone(),
observations: observations.clone(),
ndi_name: String::from(ndi_name),
url_address: url_address.map(String::from),
recv,
video: None,
audio: None,
observations: Observations::new(),
};
gst_debug!(cat, obj: element, "Started NDI connection");
// This will set info.audio/video accordingly
let receiver = Receiver::new(
&mut info,
timestamp_mode,
timeout,
connect_timeout,
element,
cat,
);
if let Some(audio) = audio.and_then(|v| v.upgrade()).map(Receiver) {
let mut audio_recv = audio.0.recv.lock().unwrap();
assert!(audio_recv.is_none());
*audio_recv = Some(recv.clone());
audio.0.recv_cond.notify_one();
}
receivers.insert(id_receiver, info);
if let Some(video) = video.and_then(|v| v.upgrade()).map(Receiver) {
let mut video_recv = video.0.recv.lock().unwrap();
assert!(video_recv.is_none());
*video_recv = Some(recv);
video.0.recv_cond.notify_one();
}
Ok(())
Some(receiver)
}
fn receive_thread<T: ReceiverType>(receiver: &Weak<ReceiverInner<T>>)
where
Receiver<T>: ReceiverCapture<T>,
{
// First loop until we actually are connected, or an error happened
let recv = {
let receiver = match receiver.upgrade().map(Receiver) {
None => return,
Some(receiver) => receiver,
};
let element = match receiver.0.element.upgrade() {
None => return,
Some(element) => element,
};
let mut recv = receiver.0.recv.lock().unwrap();
loop {
{
let queue = (receiver.0.queue.0).0.lock().unwrap();
if !queue.capturing {
gst_debug!(receiver.0.cat, obj: &element, "Shutting down");
return;
}
// If an error happened in the meantime, just go out of here
if queue.error.is_some() {
gst_error!(
receiver.0.cat,
obj: &element,
"Error while waiting for connection"
);
return;
}
}
if let Some(ref recv) = *recv {
break recv.clone();
}
recv = receiver.0.recv_cond.wait(recv).unwrap();
}
};
// Now first capture frames until the queues are empty so that we're sure that we output only
// the very latest frame that is available now
loop {
@ -921,6 +662,8 @@ where
}
}
let recv = receiver.0.recv.lock().unwrap();
let queue = recv.get_queue();
if (!T::IS_VIDEO && queue.audio_frames() <= 1) || (T::IS_VIDEO && queue.video_frames() <= 1)
{
@ -950,6 +693,8 @@ where
}
}
let recv = receiver.0.recv.lock().unwrap();
let res = receiver.capture_internal(&element, &recv);
match res {
@ -999,11 +744,7 @@ pub trait ReceiverCapture<T: ReceiverType> {
recv: &RecvInstance,
) -> Result<(gst::Buffer, T::InfoType), gst::FlowError>;
fn store_internal(
storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
weak: Weak<ReceiverInner<T>>,
);
fn store_internal(info: &mut ReceiverInfo, weak: Weak<ReceiverInner<T>>);
}
impl ReceiverCapture<VideoReceiver> for Receiver<VideoReceiver> {
@ -1015,13 +756,9 @@ impl ReceiverCapture<VideoReceiver> for Receiver<VideoReceiver> {
self.capture_video(element, recv)
}
fn store_internal(
storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
_storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
weak: Weak<ReceiverInner<VideoReceiver>>,
) {
assert!(storage_video.is_none());
*storage_video = Some(weak);
fn store_internal(info: &mut ReceiverInfo, weak: Weak<ReceiverInner<VideoReceiver>>) {
assert!(info.video.is_none());
info.video = Some(weak);
}
}
@ -1109,13 +846,9 @@ impl ReceiverCapture<AudioReceiver> for Receiver<AudioReceiver> {
self.capture_audio(element, recv)
}
fn store_internal(
_storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
weak: Weak<ReceiverInner<AudioReceiver>>,
) {
assert!(storage_audio.is_none());
*storage_audio = Some(weak);
fn store_internal(info: &mut ReceiverInfo, weak: Weak<ReceiverInner<AudioReceiver>>) {
assert!(info.audio.is_none());
info.audio = Some(weak);
}
}
@ -1125,7 +858,12 @@ impl Receiver<VideoReceiver> {
element: &gst_base::BaseSrc,
recv: &RecvInstance,
) -> Result<(gst::Buffer, gst_video::VideoInfo), gst::FlowError> {
let timeout = time::Instant::now();
let timer = time::Instant::now();
let timeout = if self.0.first_frame.load(Ordering::SeqCst) {
self.0.connect_timeout
} else {
self.0.timeout
};
let mut flushing;
let mut playing;
@ -1156,7 +894,7 @@ impl Receiver<VideoReceiver> {
);
return Err(gst::FlowError::Error);
}
Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => {
Ok(None) if timer.elapsed().as_millis() >= timeout as u128 => {
gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",);
return Err(gst::FlowError::Eos);
}
@ -1174,6 +912,8 @@ impl Receiver<VideoReceiver> {
break video_frame;
};
self.0.first_frame.store(false, Ordering::SeqCst);
gst_debug!(
self.0.cat,
obj: element,
@ -1546,7 +1286,12 @@ impl Receiver<AudioReceiver> {
element: &gst_base::BaseSrc,
recv: &RecvInstance,
) -> Result<(gst::Buffer, gst_audio::AudioInfo), gst::FlowError> {
let timeout = time::Instant::now();
let timer = time::Instant::now();
let timeout = if self.0.first_frame.load(Ordering::SeqCst) {
self.0.connect_timeout
} else {
self.0.timeout
};
let mut flushing;
let mut playing;
@ -1577,7 +1322,7 @@ impl Receiver<AudioReceiver> {
);
return Err(gst::FlowError::Error);
}
Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => {
Ok(None) if timer.elapsed().as_millis() >= timeout as u128 => {
gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",);
return Err(gst::FlowError::Eos);
}
@ -1595,6 +1340,8 @@ impl Receiver<AudioReceiver> {
break audio_frame;
};
self.0.first_frame.store(false, Ordering::SeqCst);
gst_debug!(
self.0.cat,
obj: element,