Make the maximum receive queue length size configurable

This commit is contained in:
Sebastian Dröge 2021-08-02 08:45:32 +03:00
parent 50548c8e6a
commit 7604a0c596
3 changed files with 70 additions and 4 deletions

View file

@ -25,6 +25,7 @@ struct Settings {
url_address: Option<String>, url_address: Option<String>,
connect_timeout: u32, connect_timeout: u32,
timeout: u32, timeout: u32,
max_queue_length: u32,
receiver_ndi_name: String, receiver_ndi_name: String,
bandwidth: ndisys::NDIlib_recv_bandwidth_e, bandwidth: ndisys::NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
@ -38,13 +39,14 @@ impl Default for Settings {
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000, connect_timeout: 10000,
timeout: 5000, timeout: 5000,
max_queue_length: 5,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest, bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTimeTimecode, timestamp_mode: TimestampMode::ReceiveTimeTimecode,
} }
} }
} }
static PROPERTIES: [subclass::Property; 7] = [ static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("ndi-name", |name| { subclass::Property("ndi-name", |name| {
glib::ParamSpec::string( glib::ParamSpec::string(
name, name,
@ -94,6 +96,17 @@ static PROPERTIES: [subclass::Property; 7] = [
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
) )
}), }),
subclass::Property("max-queue-length", |name| {
glib::ParamSpec::uint(
name,
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| { subclass::Property("bandwidth", |name| {
glib::ParamSpec::int( glib::ParamSpec::int(
name, name,
@ -274,6 +287,18 @@ impl ObjectImpl for NdiAudioSrc {
); );
settings.timeout = timeout; settings.timeout = timeout;
} }
subclass::Property("max-queue-length", ..) => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
subclass::Property("bandwidth", ..) => { subclass::Property("bandwidth", ..) => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get_some().unwrap(); let bandwidth = value.get_some().unwrap();
@ -330,6 +355,10 @@ impl ObjectImpl for NdiAudioSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value()) Ok(settings.timeout.to_value())
} }
subclass::Property("max-queue-length", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.max_queue_length.to_value())
}
subclass::Property("bandwidth", ..) => { subclass::Property("bandwidth", ..) => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value()) Ok(settings.bandwidth.to_value())
@ -416,6 +445,7 @@ impl BaseSrcImpl for NdiAudioSrc {
settings.bandwidth, settings.bandwidth,
settings.timestamp_mode, settings.timestamp_mode,
settings.timeout, settings.timeout,
settings.max_queue_length as usize,
); );
// settings.id_receiver exists // settings.id_receiver exists

View file

@ -26,6 +26,7 @@ struct Settings {
url_address: Option<String>, url_address: Option<String>,
connect_timeout: u32, connect_timeout: u32,
timeout: u32, timeout: u32,
max_queue_length: u32,
receiver_ndi_name: String, receiver_ndi_name: String,
bandwidth: ndisys::NDIlib_recv_bandwidth_e, bandwidth: ndisys::NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
@ -39,13 +40,14 @@ impl Default for Settings {
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000, connect_timeout: 10000,
timeout: 5000, timeout: 5000,
max_queue_length: 5,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest, bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTimeTimecode, timestamp_mode: TimestampMode::ReceiveTimeTimecode,
} }
} }
} }
static PROPERTIES: [subclass::Property; 7] = [ static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("ndi-name", |name| { subclass::Property("ndi-name", |name| {
glib::ParamSpec::string( glib::ParamSpec::string(
name, name,
@ -95,6 +97,17 @@ static PROPERTIES: [subclass::Property; 7] = [
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
) )
}), }),
subclass::Property("max-queue-length", |name| {
glib::ParamSpec::uint(
name,
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| { subclass::Property("bandwidth", |name| {
glib::ParamSpec::int( glib::ParamSpec::int(
name, name,
@ -309,6 +322,18 @@ impl ObjectImpl for NdiVideoSrc {
); );
settings.timeout = timeout; settings.timeout = timeout;
} }
subclass::Property("max-queue-length", ..) => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
subclass::Property("bandwidth", ..) => { subclass::Property("bandwidth", ..) => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get_some().unwrap(); let bandwidth = value.get_some().unwrap();
@ -365,6 +390,10 @@ impl ObjectImpl for NdiVideoSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value()) Ok(settings.timeout.to_value())
} }
subclass::Property("max-queue-length", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.max_queue_length.to_value())
}
subclass::Property("bandwidth", ..) => { subclass::Property("bandwidth", ..) => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value()) Ok(settings.bandwidth.to_value())
@ -451,6 +480,7 @@ impl BaseSrcImpl for NdiVideoSrc {
settings.bandwidth, settings.bandwidth,
settings.timestamp_mode, settings.timestamp_mode,
settings.timeout, settings.timeout,
settings.max_queue_length as usize,
); );
// settings.id_receiver exists // settings.id_receiver exists

View file

@ -70,6 +70,7 @@ pub struct ReceiverInner<T: ReceiverType> {
id: usize, id: usize,
queue: ReceiverQueue<T>, queue: ReceiverQueue<T>,
max_queue_length: usize,
recv: Mutex<RecvInstance>, recv: Mutex<RecvInstance>,
@ -378,6 +379,7 @@ impl<T: ReceiverType> Receiver<T> {
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
timeout: u32, timeout: u32,
connect_timeout: u32, connect_timeout: u32,
max_queue_length: usize,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
cat: gst::DebugCategory, cat: gst::DebugCategory,
) -> Self ) -> Self
@ -391,12 +393,13 @@ impl<T: ReceiverType> Receiver<T> {
capturing: true, capturing: true,
playing: false, playing: false,
flushing: false, flushing: false,
buffer_queue: VecDeque::with_capacity(5), buffer_queue: VecDeque::with_capacity(max_queue_length),
error: None, error: None,
timeout: false, timeout: false,
}), }),
Condvar::new(), Condvar::new(),
))), ))),
max_queue_length,
recv: Mutex::new(info.recv.clone()), recv: Mutex::new(info.recv.clone()),
observations: info.observations.clone(), observations: info.observations.clone(),
cat, cat,
@ -526,6 +529,7 @@ pub fn connect_ndi<T: ReceiverType>(
bandwidth: NDIlib_recv_bandwidth_e, bandwidth: NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
timeout: u32, timeout: u32,
max_queue_length: usize,
) -> Option<Receiver<T>> ) -> Option<Receiver<T>>
where where
Receiver<T>: ReceiverCapture<T>, Receiver<T>: ReceiverCapture<T>,
@ -572,6 +576,7 @@ where
timestamp_mode, timestamp_mode,
timeout, timeout,
connect_timeout, connect_timeout,
max_queue_length,
element, element,
cat, cat,
)); ));
@ -629,6 +634,7 @@ where
timestamp_mode, timestamp_mode,
timeout, timeout,
connect_timeout, connect_timeout,
max_queue_length,
element, element,
cat, cat,
); );
@ -711,7 +717,7 @@ where
match res { match res {
Ok(item) => { Ok(item) => {
let mut queue = (receiver.0.queue.0).0.lock().unwrap(); let mut queue = (receiver.0.queue.0).0.lock().unwrap();
while queue.buffer_queue.len() > 5 { while queue.buffer_queue.len() > receiver.0.max_queue_length {
gst_warning!( gst_warning!(
receiver.0.cat, receiver.0.cat,
obj: &element, obj: &element,