Improve type-safety a bit by making the Receiver generic over the stream type

This commit is contained in:
Sebastian Dröge 2019-07-19 11:32:04 +03:00
parent 33370e42ad
commit fa9f788190
4 changed files with 160 additions and 103 deletions

View file

@ -1,7 +1,6 @@
#[macro_use] #[macro_use]
extern crate glib; extern crate glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass::prelude::*;
#[macro_use] #[macro_use]
extern crate gstreamer as gst; extern crate gstreamer as gst;
extern crate gstreamer_audio as gst_audio; extern crate gstreamer_audio as gst_audio;

View file

@ -15,6 +15,7 @@ use std::{i32, u32};
use connect_ndi; use connect_ndi;
use ndisys; use ndisys;
use AudioReceiver;
use Receiver; use Receiver;
use ReceiverControlHandle; use ReceiverControlHandle;
use ReceiverItem; use ReceiverItem;
@ -121,7 +122,7 @@ static PROPERTIES: [subclass::Property; 7] = [
struct State { struct State {
info: Option<gst_audio::AudioInfo>, info: Option<gst_audio::AudioInfo>,
receiver: Option<Receiver>, receiver: Option<Receiver<AudioReceiver>>,
current_latency: gst::ClockTime, current_latency: gst::ClockTime,
} }
@ -139,7 +140,7 @@ pub(crate) struct NdiAudioSrc {
cat: gst::DebugCategory, cat: gst::DebugCategory,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle>>, receiver_controller: Mutex<Option<ReceiverControlHandle<AudioReceiver>>>,
} }
impl ObjectSubclass for NdiAudioSrc { impl ObjectSubclass for NdiAudioSrc {
@ -503,7 +504,7 @@ impl BaseSrcImpl for NdiAudioSrc {
}; };
match recv.capture() { match recv.capture() {
ReceiverItem::AudioBuffer(buffer, info) => { ReceiverItem::Buffer(buffer, info) => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.receiver = Some(recv); state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) { if state.info.as_ref() != Some(&info) {
@ -525,7 +526,6 @@ impl BaseSrcImpl for NdiAudioSrc {
ReceiverItem::Flushing => Err(gst::FlowError::Flushing), ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Timeout => Err(gst::FlowError::Eos), ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Error(err) => Err(err), ReceiverItem::Error(err) => Err(err),
ReceiverItem::VideoBuffer(..) => unreachable!(),
} }
} }
} }

View file

@ -21,6 +21,7 @@ use Receiver;
use ReceiverControlHandle; use ReceiverControlHandle;
use ReceiverItem; use ReceiverItem;
use TimestampMode; use TimestampMode;
use VideoReceiver;
use DEFAULT_RECEIVER_NDI_NAME; use DEFAULT_RECEIVER_NDI_NAME;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -124,7 +125,7 @@ static PROPERTIES: [subclass::Property; 7] = [
struct State { struct State {
info: Option<gst_video::VideoInfo>, info: Option<gst_video::VideoInfo>,
current_latency: gst::ClockTime, current_latency: gst::ClockTime,
receiver: Option<Receiver>, receiver: Option<Receiver<VideoReceiver>>,
} }
impl Default for State { impl Default for State {
@ -141,7 +142,7 @@ pub(crate) struct NdiVideoSrc {
cat: gst::DebugCategory, cat: gst::DebugCategory,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle>>, receiver_controller: Mutex<Option<ReceiverControlHandle<VideoReceiver>>>,
} }
impl ObjectSubclass for NdiVideoSrc { impl ObjectSubclass for NdiVideoSrc {
@ -543,7 +544,7 @@ impl BaseSrcImpl for NdiVideoSrc {
}; };
match recv.capture() { match recv.capture() {
ReceiverItem::VideoBuffer(buffer, info) => { ReceiverItem::Buffer(buffer, info) => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.receiver = Some(recv); state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) { if state.info.as_ref() != Some(&info) {
@ -565,7 +566,6 @@ impl BaseSrcImpl for NdiVideoSrc {
ReceiverItem::Timeout => Err(gst::FlowError::Eos), ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Flushing => Err(gst::FlowError::Flushing), ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Error(err) => Err(err), ReceiverItem::Error(err) => Err(err),
ReceiverItem::AudioBuffer(..) => unreachable!(),
} }
} }
} }

View file

@ -20,8 +20,8 @@ enum ReceiverInfo {
id: usize, id: usize,
ndi_name: Option<String>, ndi_name: Option<String>,
ip_address: Option<String>, ip_address: Option<String>,
video: Option<Weak<ReceiverInner>>, video: Option<Weak<ReceiverInner<VideoReceiver>>>,
audio: Option<Weak<ReceiverInner>>, audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
observations: Observations, observations: Observations,
}, },
Connected { Connected {
@ -29,8 +29,8 @@ enum ReceiverInfo {
ndi_name: String, ndi_name: String,
ip_address: String, ip_address: String,
recv: RecvInstance, recv: RecvInstance,
video: Option<Weak<ReceiverInner>>, video: Option<Weak<ReceiverInner<VideoReceiver>>>,
audio: Option<Weak<ReceiverInner>>, audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
observations: Observations, observations: Observations,
}, },
} }
@ -44,24 +44,44 @@ lazy_static! {
static ID_RECEIVER: AtomicUsize = AtomicUsize::new(0); static ID_RECEIVER: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)] pub trait ReceiverType: 'static {
pub struct Receiver(Arc<ReceiverInner>); type InfoType: Send + 'static;
const IS_VIDEO: bool;
}
pub enum AudioReceiver {}
pub enum VideoReceiver {}
impl ReceiverType for AudioReceiver {
type InfoType = gst_audio::AudioInfo;
const IS_VIDEO: bool = false;
}
impl ReceiverType for VideoReceiver {
type InfoType = gst_video::VideoInfo;
const IS_VIDEO: bool = true;
}
pub struct Receiver<T: ReceiverType>(Arc<ReceiverInner<T>>);
impl<T: ReceiverType> Clone for Receiver<T> {
fn clone(&self) -> Self {
Receiver(self.0.clone())
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum ReceiverItem { pub enum ReceiverItem<T: ReceiverType> {
AudioBuffer(gst::Buffer, gst_audio::AudioInfo), Buffer(gst::Buffer, T::InfoType),
VideoBuffer(gst::Buffer, gst_video::VideoInfo),
Flushing, Flushing,
Timeout, Timeout,
Error(gst::FlowError), Error(gst::FlowError),
} }
struct ReceiverInner { pub struct ReceiverInner<T: ReceiverType> {
id: usize, id: usize,
queue: ReceiverQueue, queue: ReceiverQueue<T>,
video: bool,
recv: Mutex<Option<RecvInstance>>, recv: Mutex<Option<RecvInstance>>,
recv_cond: Condvar, recv_cond: Condvar,
@ -76,10 +96,15 @@ struct ReceiverInner {
thread: Mutex<Option<std::thread::JoinHandle<()>>>, thread: Mutex<Option<std::thread::JoinHandle<()>>>,
} }
#[derive(Clone)] struct ReceiverQueue<T: ReceiverType>(Arc<(Mutex<ReceiverQueueInner<T>>, Condvar)>);
struct ReceiverQueue(Arc<(Mutex<ReceiverQueueInner>, Condvar)>);
struct ReceiverQueueInner { impl<T: ReceiverType> Clone for ReceiverQueue<T> {
fn clone(&self) -> Self {
ReceiverQueue(self.0.clone())
}
}
struct ReceiverQueueInner<T: ReceiverType> {
// If we should be capturing at all or go out of our capture loop // If we should be capturing at all or go out of our capture loop
// //
// This is true as long as the source element is in Paused/Playing // This is true as long as the source element is in Paused/Playing
@ -94,7 +119,7 @@ struct ReceiverQueueInner {
// Queue containing our buffers. This holds at most 5 buffers at a time. // Queue containing our buffers. This holds at most 5 buffers at a time.
// //
// On timeout/error will contain a single item and then never be filled again // On timeout/error will contain a single item and then never be filled again
buffer_queue: VecDeque<ReceiverItem>, buffer_queue: VecDeque<(gst::Buffer, T::InfoType)>,
error: Option<gst::FlowError>, error: Option<gst::FlowError>,
timeout: bool, timeout: bool,
@ -320,12 +345,19 @@ impl Default for TimeMapping {
} }
} }
#[derive(Clone)] pub struct ReceiverControlHandle<T: ReceiverType> {
pub struct ReceiverControlHandle { queue: ReceiverQueue<T>,
queue: ReceiverQueue,
} }
impl ReceiverControlHandle { impl<T: ReceiverType> Clone for ReceiverControlHandle<T> {
fn clone(&self) -> Self {
ReceiverControlHandle {
queue: self.queue.clone(),
}
}
}
impl<T: ReceiverType> ReceiverControlHandle<T> {
pub fn set_flushing(&self, flushing: bool) { pub fn set_flushing(&self, flushing: bool) {
let mut queue = (self.queue.0).0.lock().unwrap(); let mut queue = (self.queue.0).0.lock().unwrap();
queue.flushing = flushing; queue.flushing = flushing;
@ -344,49 +376,34 @@ impl ReceiverControlHandle {
} }
} }
impl Receiver { impl<T: ReceiverType> Receiver<T> {
fn new( fn new(
info: &mut ReceiverInfo, info: &mut ReceiverInfo,
video: bool,
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
timeout: u32, timeout: u32,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
cat: gst::DebugCategory, cat: gst::DebugCategory,
) -> Self { ) -> Self
let (id, storage, recv, observations) = if video { where
match info { Receiver<T>: ReceiverCapture<T>,
ReceiverInfo::Connecting { {
id, let (id, storage_video, storage_audio, recv, observations) = match info {
ref mut video, ReceiverInfo::Connecting {
ref observations, id,
.. ref observations,
} => (*id, video, None, observations), ref mut audio,
ReceiverInfo::Connected { ref mut video,
id, ..
ref mut video, } => (*id, video, audio, None, observations),
ref mut recv, ReceiverInfo::Connected {
ref observations, id,
.. ref mut recv,
} => (*id, video, Some(recv.clone()), observations), ref observations,
} ref mut audio,
} else { ref mut video,
match info { ..
ReceiverInfo::Connecting { } => (*id, video, audio, Some(recv.clone()), observations),
id,
ref mut audio,
ref observations,
..
} => (*id, audio, None, observations),
ReceiverInfo::Connected {
id,
ref mut audio,
ref mut recv,
ref observations,
..
} => (*id, audio, Some(recv.clone()), observations),
}
}; };
assert!(storage.is_none());
let receiver = Receiver(Arc::new(ReceiverInner { let receiver = Receiver(Arc::new(ReceiverInner {
id, id,
@ -401,7 +418,6 @@ impl Receiver {
}), }),
Condvar::new(), Condvar::new(),
))), ))),
video,
recv: Mutex::new(recv), recv: Mutex::new(recv),
recv_cond: Condvar::new(), recv_cond: Condvar::new(),
observations: observations.clone(), observations: observations.clone(),
@ -439,14 +455,14 @@ impl Receiver {
}); });
let weak = Arc::downgrade(&receiver.0); let weak = Arc::downgrade(&receiver.0);
*storage = Some(weak); Self::store_internal(storage_video, storage_audio, weak);
*receiver.0.thread.lock().unwrap() = Some(thread); *receiver.0.thread.lock().unwrap() = Some(thread);
receiver receiver
} }
pub fn receiver_control_handle(&self) -> ReceiverControlHandle { pub fn receiver_control_handle(&self) -> ReceiverControlHandle<T> {
ReceiverControlHandle { ReceiverControlHandle {
queue: self.0.queue.clone(), queue: self.0.queue.clone(),
} }
@ -469,7 +485,7 @@ impl Receiver {
(self.0.queue.0).1.notify_all(); (self.0.queue.0).1.notify_all();
} }
pub fn capture(&self) -> ReceiverItem { pub fn capture(&self) -> ReceiverItem<T> {
let mut queue = (self.0.queue.0).0.lock().unwrap(); let mut queue = (self.0.queue.0).0.lock().unwrap();
loop { loop {
if let Some(err) = queue.error { if let Some(err) = queue.error {
@ -478,8 +494,8 @@ impl Receiver {
return ReceiverItem::Timeout; return ReceiverItem::Timeout;
} else if queue.flushing || !queue.capturing { } else if queue.flushing || !queue.capturing {
return ReceiverItem::Flushing; return ReceiverItem::Flushing;
} else if let Some(item) = queue.buffer_queue.pop_front() { } else if let Some((buffer, info)) = queue.buffer_queue.pop_front() {
return item; return ReceiverItem::Buffer(buffer, info);
} }
queue = (self.0.queue.0).1.wait(queue).unwrap(); queue = (self.0.queue.0).1.wait(queue).unwrap();
@ -487,7 +503,7 @@ impl Receiver {
} }
} }
impl Drop for ReceiverInner { impl<T: ReceiverType> Drop for ReceiverInner<T> {
fn drop(&mut self) { fn drop(&mut self) {
// Will shut down the receiver thread on the next iteration // Will shut down the receiver thread on the next iteration
let mut queue = (self.queue.0).0.lock().unwrap(); let mut queue = (self.queue.0).0.lock().unwrap();
@ -516,7 +532,7 @@ impl Drop for ReceiverInner {
} => (audio, video), } => (audio, video),
}; };
if video.is_some() && audio.is_some() { if video.is_some() && audio.is_some() {
if self.video { if T::IS_VIDEO {
*video = None; *video = None;
} else { } else {
*audio = None; *audio = None;
@ -532,7 +548,7 @@ impl Drop for ReceiverInner {
} }
} }
pub fn connect_ndi( pub fn connect_ndi<T: ReceiverType>(
cat: gst::DebugCategory, cat: gst::DebugCategory,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
ip_address: Option<&str>, ip_address: Option<&str>,
@ -542,13 +558,14 @@ pub fn connect_ndi(
bandwidth: NDIlib_recv_bandwidth_e, bandwidth: NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
timeout: u32, timeout: u32,
) -> Option<Receiver> { ) -> Option<Receiver<T>>
where
Receiver<T>: ReceiverCapture<T>,
{
gst_debug!(cat, obj: element, "Starting NDI connection..."); gst_debug!(cat, obj: element, "Starting NDI connection...");
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let video = element.get_type() == ndivideosrc::NdiVideoSrc::get_type();
// Check if we already have a receiver for this very stream // Check if we already have a receiver for this very stream
for val in receivers.values_mut() { for val in receivers.values_mut() {
let (val_audio, val_video, val_ip_address, val_ndi_name) = match val { let (val_audio, val_video, val_ip_address, val_ndi_name) = match val {
@ -579,26 +596,19 @@ pub fn connect_ndi(
}; };
if val_ip_address == ip_address || val_ndi_name == ndi_name { if val_ip_address == ip_address || val_ndi_name == ndi_name {
if (val_video.is_some() || !video) && (val_audio.is_some() || video) { if (val_video.is_some() || !T::IS_VIDEO) && (val_audio.is_some() || T::IS_VIDEO) {
gst_error!( gst_error!(
cat, cat,
obj: element, obj: element,
"Source with ndi-name '{:?}' and ip-address '{:?}' already in use for {}", "Source with ndi-name '{:?}' and ip-address '{:?}' already in use for {}",
val_ndi_name, val_ndi_name,
val_ip_address, val_ip_address,
if video { "video" } else { "audio" }, if T::IS_VIDEO { "video" } else { "audio" },
); );
return None; return None;
} else { } else {
return Some(Receiver::new( return Some(Receiver::new(val, timestamp_mode, timeout, element, cat));
val,
video,
timestamp_mode,
timeout,
element,
cat,
));
} }
} }
} }
@ -614,7 +624,7 @@ pub fn connect_ndi(
observations: Observations::new(), observations: Observations::new(),
}; };
let receiver = Receiver::new(&mut info, video, timestamp_mode, timeout, element, cat); let receiver = Receiver::new(&mut info, timestamp_mode, timeout, element, cat);
receivers.insert(id_receiver, info); receivers.insert(id_receiver, info);
@ -840,7 +850,10 @@ fn connect_ndi_async(
Ok(()) Ok(())
} }
fn receive_thread(receiver: &Weak<ReceiverInner>) { fn receive_thread<T: ReceiverType>(receiver: &Weak<ReceiverInner<T>>)
where
Receiver<T>: ReceiverCapture<T>,
{
// First loop until we actually are connected, or an error happened // First loop until we actually are connected, or an error happened
let recv = { let recv = {
let receiver = match receiver.upgrade().map(Receiver) { let receiver = match receiver.upgrade().map(Receiver) {
@ -907,13 +920,12 @@ fn receive_thread(receiver: &Weak<ReceiverInner>) {
} }
let queue = recv.get_queue(); let queue = recv.get_queue();
if (!receiver.0.video && queue.audio_frames() <= 1) if (!T::IS_VIDEO && queue.audio_frames() <= 1) || (T::IS_VIDEO && queue.video_frames() <= 1)
|| (receiver.0.video && queue.video_frames() <= 1)
{ {
break; break;
} }
let _ = recv.capture(receiver.0.video, !receiver.0.video, false, 0); let _ = recv.capture(T::IS_VIDEO, !T::IS_VIDEO, false, 0);
} }
// And if that went fine, capture until we're done // And if that went fine, capture until we're done
@ -933,15 +945,7 @@ fn receive_thread(receiver: &Weak<ReceiverInner>) {
} }
} }
let res = if receiver.0.video { let res = receiver.capture_internal(&element, &recv);
receiver
.capture_video(&element, &recv)
.map(|(buffer, info)| ReceiverItem::VideoBuffer(buffer, info))
} else {
receiver
.capture_audio(&element, &recv)
.map(|(buffer, info)| ReceiverItem::AudioBuffer(buffer, info))
};
match res { match res {
Ok(item) => { Ok(item) => {
@ -983,7 +987,59 @@ fn receive_thread(receiver: &Weak<ReceiverInner>) {
} }
} }
impl Receiver { pub trait ReceiverCapture<T: ReceiverType> {
fn capture_internal(
&self,
element: &gst_base::BaseSrc,
recv: &RecvInstance,
) -> Result<(gst::Buffer, T::InfoType), gst::FlowError>;
fn store_internal(
storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
weak: Weak<ReceiverInner<T>>,
);
}
impl ReceiverCapture<VideoReceiver> for Receiver<VideoReceiver> {
fn capture_internal(
&self,
element: &gst_base::BaseSrc,
recv: &RecvInstance,
) -> Result<(gst::Buffer, gst_video::VideoInfo), gst::FlowError> {
self.capture_video(element, recv)
}
fn store_internal(
storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
_storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
weak: Weak<ReceiverInner<VideoReceiver>>,
) {
assert!(storage_video.is_none());
*storage_video = Some(weak);
}
}
impl ReceiverCapture<AudioReceiver> for Receiver<AudioReceiver> {
fn capture_internal(
&self,
element: &gst_base::BaseSrc,
recv: &RecvInstance,
) -> Result<(gst::Buffer, gst_audio::AudioInfo), gst::FlowError> {
self.capture_audio(element, recv)
}
fn store_internal(
_storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
weak: Weak<ReceiverInner<AudioReceiver>>,
) {
assert!(storage_audio.is_none());
*storage_audio = Some(weak);
}
}
impl Receiver<VideoReceiver> {
fn capture_video( fn capture_video(
&self, &self,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
@ -1414,7 +1470,9 @@ impl Receiver {
Ok(vframe.into_buffer()) Ok(vframe.into_buffer())
} }
}
impl Receiver<AudioReceiver> {
fn capture_audio( fn capture_audio(
&self, &self,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,