From 7604a0c5964b45cf56fd0de24ec9780f5e8b1935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 2 Aug 2021 08:45:32 +0300 Subject: [PATCH] Make the maximum receive queue length size configurable --- src/ndiaudiosrc.rs | 32 +++++++++++++++++++++++++++++++- src/ndivideosrc.rs | 32 +++++++++++++++++++++++++++++++- src/receiver.rs | 10 ++++++++-- 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index c4a0ca0a..b4cff6fa 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -25,6 +25,7 @@ struct Settings { url_address: Option, connect_timeout: u32, timeout: u32, + max_queue_length: u32, receiver_ndi_name: String, bandwidth: ndisys::NDIlib_recv_bandwidth_e, timestamp_mode: TimestampMode, @@ -38,13 +39,14 @@ impl Default for Settings { receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), connect_timeout: 10000, timeout: 5000, + max_queue_length: 5, bandwidth: ndisys::NDIlib_recv_bandwidth_highest, timestamp_mode: TimestampMode::ReceiveTimeTimecode, } } } -static PROPERTIES: [subclass::Property; 7] = [ +static PROPERTIES: [subclass::Property; 8] = [ subclass::Property("ndi-name", |name| { glib::ParamSpec::string( name, @@ -94,6 +96,17 @@ static PROPERTIES: [subclass::Property; 7] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("max-queue-length", |name| { + glib::ParamSpec::uint( + name, + "Max Queue Length", + "Maximum receive queue length", + 0, + u32::MAX, + 5, + glib::ParamFlags::READWRITE, + ) + }), subclass::Property("bandwidth", |name| { glib::ParamSpec::int( name, @@ -274,6 +287,18 @@ impl ObjectImpl for NdiAudioSrc { ); settings.timeout = timeout; } + subclass::Property("max-queue-length", ..) => { + let mut settings = self.settings.lock().unwrap(); + let max_queue_length = value.get_some().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing max-queue-length from {} to {}", + settings.max_queue_length, + max_queue_length, + ); + settings.max_queue_length = max_queue_length; + } subclass::Property("bandwidth", ..) => { let mut settings = self.settings.lock().unwrap(); let bandwidth = value.get_some().unwrap(); @@ -330,6 +355,10 @@ impl ObjectImpl for NdiAudioSrc { let settings = self.settings.lock().unwrap(); Ok(settings.timeout.to_value()) } + subclass::Property("max-queue-length", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.max_queue_length.to_value()) + } subclass::Property("bandwidth", ..) => { let settings = self.settings.lock().unwrap(); Ok(settings.bandwidth.to_value()) @@ -416,6 +445,7 @@ impl BaseSrcImpl for NdiAudioSrc { settings.bandwidth, settings.timestamp_mode, settings.timeout, + settings.max_queue_length as usize, ); // settings.id_receiver exists diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index e73204c5..7e41d231 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -26,6 +26,7 @@ struct Settings { url_address: Option, connect_timeout: u32, timeout: u32, + max_queue_length: u32, receiver_ndi_name: String, bandwidth: ndisys::NDIlib_recv_bandwidth_e, timestamp_mode: TimestampMode, @@ -39,13 +40,14 @@ impl Default for Settings { receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), connect_timeout: 10000, timeout: 5000, + max_queue_length: 5, bandwidth: ndisys::NDIlib_recv_bandwidth_highest, timestamp_mode: TimestampMode::ReceiveTimeTimecode, } } } -static PROPERTIES: [subclass::Property; 7] = [ +static PROPERTIES: [subclass::Property; 8] = [ subclass::Property("ndi-name", |name| { glib::ParamSpec::string( name, @@ -95,6 +97,17 @@ static PROPERTIES: [subclass::Property; 7] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("max-queue-length", |name| { + glib::ParamSpec::uint( + name, + "Max Queue Length", + "Maximum receive queue length", + 0, + u32::MAX, + 5, + glib::ParamFlags::READWRITE, + ) + }), subclass::Property("bandwidth", |name| { glib::ParamSpec::int( name, @@ -309,6 +322,18 @@ impl ObjectImpl for NdiVideoSrc { ); settings.timeout = timeout; } + subclass::Property("max-queue-length", ..) => { + let mut settings = self.settings.lock().unwrap(); + let max_queue_length = value.get_some().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing max-queue-length from {} to {}", + settings.max_queue_length, + max_queue_length, + ); + settings.max_queue_length = max_queue_length; + } subclass::Property("bandwidth", ..) => { let mut settings = self.settings.lock().unwrap(); let bandwidth = value.get_some().unwrap(); @@ -365,6 +390,10 @@ impl ObjectImpl for NdiVideoSrc { let settings = self.settings.lock().unwrap(); Ok(settings.timeout.to_value()) } + subclass::Property("max-queue-length", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.max_queue_length.to_value()) + } subclass::Property("bandwidth", ..) => { let settings = self.settings.lock().unwrap(); Ok(settings.bandwidth.to_value()) @@ -451,6 +480,7 @@ impl BaseSrcImpl for NdiVideoSrc { settings.bandwidth, settings.timestamp_mode, settings.timeout, + settings.max_queue_length as usize, ); // settings.id_receiver exists diff --git a/src/receiver.rs b/src/receiver.rs index d88e95f9..dccb1907 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -70,6 +70,7 @@ pub struct ReceiverInner { id: usize, queue: ReceiverQueue, + max_queue_length: usize, recv: Mutex, @@ -378,6 +379,7 @@ impl Receiver { timestamp_mode: TimestampMode, timeout: u32, connect_timeout: u32, + max_queue_length: usize, element: &gst_base::BaseSrc, cat: gst::DebugCategory, ) -> Self @@ -391,12 +393,13 @@ impl Receiver { capturing: true, playing: false, flushing: false, - buffer_queue: VecDeque::with_capacity(5), + buffer_queue: VecDeque::with_capacity(max_queue_length), error: None, timeout: false, }), Condvar::new(), ))), + max_queue_length, recv: Mutex::new(info.recv.clone()), observations: info.observations.clone(), cat, @@ -526,6 +529,7 @@ pub fn connect_ndi( bandwidth: NDIlib_recv_bandwidth_e, timestamp_mode: TimestampMode, timeout: u32, + max_queue_length: usize, ) -> Option> where Receiver: ReceiverCapture, @@ -572,6 +576,7 @@ where timestamp_mode, timeout, connect_timeout, + max_queue_length, element, cat, )); @@ -629,6 +634,7 @@ where timestamp_mode, timeout, connect_timeout, + max_queue_length, element, cat, ); @@ -711,7 +717,7 @@ where match res { Ok(item) => { let mut queue = (receiver.0.queue.0).0.lock().unwrap(); - while queue.buffer_queue.len() > 5 { + while queue.buffer_queue.len() > receiver.0.max_queue_length { gst_warning!( receiver.0.cat, obj: &element,