mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-09-24 13:00:12 +00:00
Merge pull request #47 from sdroege/device-provider
Add device provider
This commit is contained in:
commit
9d4e427e14
7 changed files with 535 additions and 394 deletions
|
@ -16,6 +16,7 @@ gstreamer-audio = "0.15"
|
|||
gstreamer-video = { version = "0.15", features = ["v1_12"] }
|
||||
lazy_static = "1.1.0"
|
||||
byte-slice-cast = "0.3.0"
|
||||
once_cell = "1.0"
|
||||
|
||||
[build-dependencies]
|
||||
gst-plugin-version-helper = "0.1"
|
||||
|
|
315
src/device_provider.rs
Normal file
315
src/device_provider.rs
Normal file
|
@ -0,0 +1,315 @@
|
|||
use glib;
|
||||
use glib::subclass;
|
||||
use gst;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use std::sync::atomic;
|
||||
use std::sync::Mutex;
|
||||
use std::thread;
|
||||
|
||||
use crate::ndi;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct DeviceProvider {
|
||||
cat: gst::DebugCategory,
|
||||
thread: Mutex<Option<thread::JoinHandle<()>>>,
|
||||
current_devices: Mutex<Vec<gst::Device>>,
|
||||
find: Mutex<Option<ndi::FindInstance>>,
|
||||
is_running: atomic::AtomicBool,
|
||||
}
|
||||
|
||||
impl ObjectSubclass for DeviceProvider {
|
||||
const NAME: &'static str = "NdiDeviceProvider";
|
||||
type ParentType = gst::DeviceProvider;
|
||||
type Instance = subclass::simple::InstanceStruct<Self>;
|
||||
type Class = subclass::simple::ClassStruct<Self>;
|
||||
|
||||
glib_object_subclass!();
|
||||
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
cat: gst::DebugCategory::new(
|
||||
"ndideviceprovider",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("NewTek NDI Device Provider"),
|
||||
),
|
||||
thread: Mutex::new(None),
|
||||
current_devices: Mutex::new(vec![]),
|
||||
find: Mutex::new(None),
|
||||
is_running: atomic::AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
|
||||
klass.set_metadata(
|
||||
"NewTek NDI Device Provider",
|
||||
"Source/Audio/Video/Network",
|
||||
"NewTek NDI Device Provider",
|
||||
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectImpl for DeviceProvider {
|
||||
glib_object_impl!();
|
||||
}
|
||||
|
||||
impl DeviceProviderImpl for DeviceProvider {
|
||||
fn probe(&self, _device_provider: &gst::DeviceProvider) -> Vec<gst::Device> {
|
||||
self.current_devices.lock().unwrap().clone()
|
||||
}
|
||||
fn start(&self, device_provider: &gst::DeviceProvider) -> Result<(), gst::LoggableError> {
|
||||
let mut thread_guard = self.thread.lock().unwrap();
|
||||
if thread_guard.is_some() {
|
||||
gst_log!(
|
||||
self.cat,
|
||||
obj: device_provider,
|
||||
"Device provider already started"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.is_running.store(true, atomic::Ordering::SeqCst);
|
||||
|
||||
let device_provider_weak = device_provider.downgrade();
|
||||
let mut first = true;
|
||||
*thread_guard = Some(thread::spawn(move || {
|
||||
let device_provider = match device_provider_weak.upgrade() {
|
||||
None => return,
|
||||
Some(device_provider) => device_provider,
|
||||
};
|
||||
|
||||
let imp = DeviceProvider::from_instance(&device_provider);
|
||||
{
|
||||
let mut find_guard = imp.find.lock().unwrap();
|
||||
if find_guard.is_some() {
|
||||
gst_log!(imp.cat, obj: &device_provider, "Already started");
|
||||
return;
|
||||
}
|
||||
|
||||
let find = match ndi::FindInstance::builder().build() {
|
||||
None => {
|
||||
gst_error!(
|
||||
imp.cat,
|
||||
obj: &device_provider,
|
||||
"Failed to create Find instance"
|
||||
);
|
||||
return;
|
||||
}
|
||||
Some(find) => find,
|
||||
};
|
||||
*find_guard = Some(find);
|
||||
}
|
||||
|
||||
loop {
|
||||
let device_provider = match device_provider_weak.upgrade() {
|
||||
None => break,
|
||||
Some(device_provider) => device_provider,
|
||||
};
|
||||
|
||||
let imp = DeviceProvider::from_instance(&device_provider);
|
||||
if !imp.is_running.load(atomic::Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
|
||||
imp.poll(&device_provider, first);
|
||||
first = false;
|
||||
}
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn stop(&self, _device_provider: &gst::DeviceProvider) {
|
||||
if let Some(_thread) = self.thread.lock().unwrap().take() {
|
||||
self.is_running.store(false, atomic::Ordering::SeqCst);
|
||||
// Don't actually join because that might take a while
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DeviceProvider {
|
||||
fn poll(&self, device_provider: &gst::DeviceProvider, first: bool) {
|
||||
let mut find_guard = self.find.lock().unwrap();
|
||||
let find = match *find_guard {
|
||||
None => return,
|
||||
Some(ref mut find) => find,
|
||||
};
|
||||
|
||||
if !find.wait_for_sources(if first { 1000 } else { 5000 }) {
|
||||
gst_trace!(self.cat, obj: device_provider, "No new sources found");
|
||||
return;
|
||||
}
|
||||
|
||||
let sources = find.get_current_sources();
|
||||
let mut sources = sources.iter().map(|s| s.to_owned()).collect::<Vec<_>>();
|
||||
|
||||
let mut current_devices_guard = self.current_devices.lock().unwrap();
|
||||
let mut expired_devices = vec![];
|
||||
let mut remaining_sources = vec![];
|
||||
|
||||
// First check for each device we previously knew if it's still available
|
||||
for old_device in &*current_devices_guard {
|
||||
let old_device_imp = Device::from_instance(old_device);
|
||||
let old_source = old_device_imp.source.get().unwrap();
|
||||
|
||||
if !sources.contains(&old_source.0) {
|
||||
gst_log!(
|
||||
self.cat,
|
||||
obj: device_provider,
|
||||
"Source {:?} disappeared",
|
||||
old_source
|
||||
);
|
||||
expired_devices.push(old_device.clone());
|
||||
} else {
|
||||
// Otherwise remember that we had it before already and don't have to announce it
|
||||
// again. After the loop we're going to remove these all from the sources vec.
|
||||
remaining_sources.push(old_source.0.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
for remaining_source in remaining_sources {
|
||||
sources.retain(|s| s != &remaining_source);
|
||||
}
|
||||
|
||||
// Remove all expired devices from the list of cached devices
|
||||
current_devices_guard.retain(|d| !expired_devices.contains(d));
|
||||
// And also notify the device provider of them having disappeared
|
||||
for old_device in expired_devices {
|
||||
device_provider.device_remove(&old_device);
|
||||
}
|
||||
|
||||
// Now go through all new devices and announce them
|
||||
for source in sources {
|
||||
gst_log!(
|
||||
self.cat,
|
||||
obj: device_provider,
|
||||
"Source {:?} appeared",
|
||||
source
|
||||
);
|
||||
// Add once for audio, another time for video
|
||||
let device = Device::new(&source, true);
|
||||
device_provider.device_add(&device);
|
||||
current_devices_guard.push(device);
|
||||
|
||||
let device = Device::new(&source, false);
|
||||
device_provider.device_add(&device);
|
||||
current_devices_guard.push(device);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Device {
|
||||
cat: gst::DebugCategory,
|
||||
source: OnceCell<(ndi::Source<'static>, glib::Type)>,
|
||||
}
|
||||
|
||||
impl ObjectSubclass for Device {
|
||||
const NAME: &'static str = "NdiDevice";
|
||||
type ParentType = gst::Device;
|
||||
type Instance = subclass::simple::InstanceStruct<Self>;
|
||||
type Class = subclass::simple::ClassStruct<Self>;
|
||||
|
||||
glib_object_subclass!();
|
||||
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
cat: gst::DebugCategory::new(
|
||||
"ndidevice",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("NewTek NDI Device"),
|
||||
),
|
||||
source: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectImpl for Device {
|
||||
glib_object_impl!();
|
||||
}
|
||||
|
||||
impl DeviceImpl for Device {
|
||||
fn create_element(
|
||||
&self,
|
||||
_device: &gst::Device,
|
||||
name: Option<&str>,
|
||||
) -> Result<gst::Element, gst::LoggableError> {
|
||||
let source_info = self.source.get().unwrap();
|
||||
let element = glib::Object::new(
|
||||
source_info.1,
|
||||
&[
|
||||
("name", &name),
|
||||
("ndi-name", &source_info.0.ndi_name()),
|
||||
("url-address", &source_info.0.url_address()),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
.dynamic_cast::<gst::Element>()
|
||||
.unwrap();
|
||||
|
||||
Ok(element)
|
||||
}
|
||||
}
|
||||
|
||||
impl Device {
|
||||
fn new(source: &ndi::Source<'_>, is_audio: bool) -> gst::Device {
|
||||
let display_name = format!(
|
||||
"{} ({})",
|
||||
source.ndi_name(),
|
||||
if is_audio { "Audio" } else { "Video" }
|
||||
);
|
||||
let device_class = format!(
|
||||
"Source/{}/Network",
|
||||
if is_audio { "Audio" } else { "Video" }
|
||||
);
|
||||
|
||||
// Get the caps from the template caps of the corresponding source element
|
||||
let element_type = if is_audio {
|
||||
crate::ndiaudiosrc::NdiAudioSrc::get_type()
|
||||
} else {
|
||||
crate::ndivideosrc::NdiVideoSrc::get_type()
|
||||
};
|
||||
let element_class = gst::ElementClass::from_type(element_type).unwrap();
|
||||
let templ = element_class.get_pad_template("src").unwrap();
|
||||
let caps = templ.get_caps().unwrap();
|
||||
|
||||
// Put the url-address into the extra properties
|
||||
let extra_properties = gst::Structure::builder("properties")
|
||||
.field("ndi-name", &source.ndi_name())
|
||||
.field("url-address", &source.url_address())
|
||||
.build();
|
||||
|
||||
let device = glib::Object::new(
|
||||
Device::get_type(),
|
||||
&[
|
||||
("caps", &caps),
|
||||
("display-name", &display_name),
|
||||
("device-class", &device_class),
|
||||
("properties", &extra_properties),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
.dynamic_cast::<gst::Device>()
|
||||
.unwrap();
|
||||
let device_impl = Device::from_instance(&device);
|
||||
|
||||
device_impl
|
||||
.source
|
||||
.set((source.to_owned(), element_type))
|
||||
.unwrap();
|
||||
|
||||
device
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
gst::DeviceProvider::register(
|
||||
Some(plugin),
|
||||
"ndideviceprovider",
|
||||
gst::Rank::Primary,
|
||||
DeviceProvider::get_type(),
|
||||
)
|
||||
}
|
|
@ -11,6 +11,7 @@ extern crate gstreamer_video as gst_video;
|
|||
extern crate lazy_static;
|
||||
extern crate byte_slice_cast;
|
||||
|
||||
mod device_provider;
|
||||
pub mod ndi;
|
||||
mod ndiaudiosrc;
|
||||
pub mod ndisys;
|
||||
|
@ -39,6 +40,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
|||
|
||||
ndivideosrc::register(plugin)?;
|
||||
ndiaudiosrc::register(plugin)?;
|
||||
device_provider::register(plugin)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
63
src/ndi.rs
63
src/ndi.rs
|
@ -122,6 +122,7 @@ pub enum Source<'a> {
|
|||
}
|
||||
|
||||
unsafe impl<'a> Send for Source<'a> {}
|
||||
unsafe impl<'a> Sync for Source<'a> {}
|
||||
|
||||
impl<'a> Source<'a> {
|
||||
pub fn ndi_name(&self) -> &str {
|
||||
|
@ -148,24 +149,6 @@ impl<'a> Source<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
fn ndi_name_ptr(&self) -> *const ::std::os::raw::c_char {
|
||||
unsafe {
|
||||
match *self {
|
||||
Source::Borrowed(ptr, _) => ptr.as_ref().p_ndi_name,
|
||||
Source::Owned(_, ref ndi_name, _) => ndi_name.as_ptr(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn url_address_ptr(&self) -> *const ::std::os::raw::c_char {
|
||||
unsafe {
|
||||
match *self {
|
||||
Source::Borrowed(ptr, _) => ptr.as_ref().p_url_address,
|
||||
Source::Owned(_, _, ref url_address) => url_address.as_ptr(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_owned<'b>(&self) -> Source<'b> {
|
||||
unsafe {
|
||||
let (ndi_name, url_address) = match *self {
|
||||
|
@ -191,13 +174,20 @@ impl<'a> Source<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a> PartialEq for Source<'a> {
|
||||
fn eq(&self, other: &Source<'a>) -> bool {
|
||||
self.ndi_name() == other.ndi_name() && self.url_address() == other.url_address()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RecvBuilder<'a> {
|
||||
source_to_connect_to: &'a Source<'a>,
|
||||
ndi_name: Option<&'a str>,
|
||||
url_address: Option<&'a str>,
|
||||
allow_video_fields: bool,
|
||||
bandwidth: NDIlib_recv_bandwidth_e,
|
||||
color_format: NDIlib_recv_color_format_e,
|
||||
ndi_name: &'a str,
|
||||
ndi_recv_name: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> RecvBuilder<'a> {
|
||||
|
@ -221,16 +211,30 @@ impl<'a> RecvBuilder<'a> {
|
|||
|
||||
pub fn build(self) -> Option<RecvInstance> {
|
||||
unsafe {
|
||||
let ndi_name = ffi::CString::new(self.ndi_name).unwrap();
|
||||
let ndi_recv_name = ffi::CString::new(self.ndi_recv_name).unwrap();
|
||||
let ndi_name = self
|
||||
.ndi_name
|
||||
.as_ref()
|
||||
.map(|s| ffi::CString::new(*s).unwrap());
|
||||
let url_address = self
|
||||
.url_address
|
||||
.as_ref()
|
||||
.map(|s| ffi::CString::new(*s).unwrap());
|
||||
let ptr = NDIlib_recv_create_v3(&NDIlib_recv_create_v3_t {
|
||||
source_to_connect_to: NDIlib_source_t {
|
||||
p_ndi_name: self.source_to_connect_to.ndi_name_ptr(),
|
||||
p_url_address: self.source_to_connect_to.url_address_ptr(),
|
||||
p_ndi_name: ndi_name
|
||||
.as_ref()
|
||||
.map(|s| s.as_ptr())
|
||||
.unwrap_or_else(|| ptr::null_mut()),
|
||||
p_url_address: url_address
|
||||
.as_ref()
|
||||
.map(|s| s.as_ptr())
|
||||
.unwrap_or_else(|| ptr::null_mut()),
|
||||
},
|
||||
allow_video_fields: self.allow_video_fields,
|
||||
bandwidth: self.bandwidth,
|
||||
color_format: self.color_format,
|
||||
p_ndi_recv_name: ndi_name.as_ptr(),
|
||||
p_ndi_recv_name: ndi_recv_name.as_ptr(),
|
||||
});
|
||||
|
||||
if ptr.is_null() {
|
||||
|
@ -259,13 +263,18 @@ unsafe impl Send for RecvInstanceInner {}
|
|||
unsafe impl Sync for RecvInstanceInner {}
|
||||
|
||||
impl RecvInstance {
|
||||
pub fn builder<'a>(source_to_connect_to: &'a Source, ndi_name: &'a str) -> RecvBuilder<'a> {
|
||||
pub fn builder<'a>(
|
||||
ndi_name: Option<&'a str>,
|
||||
url_address: Option<&'a str>,
|
||||
ndi_recv_name: &'a str,
|
||||
) -> RecvBuilder<'a> {
|
||||
RecvBuilder {
|
||||
source_to_connect_to,
|
||||
ndi_name,
|
||||
url_address,
|
||||
allow_video_fields: true,
|
||||
bandwidth: NDIlib_recv_bandwidth_highest,
|
||||
color_format: NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA,
|
||||
ndi_name,
|
||||
ndi_recv_name,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ use crate::DEFAULT_RECEIVER_NDI_NAME;
|
|||
#[derive(Debug, Clone)]
|
||||
struct Settings {
|
||||
ndi_name: Option<String>,
|
||||
url_address: Option<String>,
|
||||
connect_timeout: u32,
|
||||
timeout: u32,
|
||||
receiver_ndi_name: String,
|
||||
|
@ -35,6 +36,7 @@ impl Default for Settings {
|
|||
fn default() -> Self {
|
||||
Settings {
|
||||
ndi_name: None,
|
||||
url_address: None,
|
||||
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
|
||||
connect_timeout: 10000,
|
||||
timeout: 5000,
|
||||
|
@ -44,7 +46,7 @@ impl Default for Settings {
|
|||
}
|
||||
}
|
||||
|
||||
static PROPERTIES: [subclass::Property; 6] = [
|
||||
static PROPERTIES: [subclass::Property; 7] = [
|
||||
subclass::Property("ndi-name", |name| {
|
||||
glib::ParamSpec::string(
|
||||
name,
|
||||
|
@ -54,6 +56,15 @@ static PROPERTIES: [subclass::Property; 6] = [
|
|||
glib::ParamFlags::READWRITE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("url-address", |name| {
|
||||
glib::ParamSpec::string(
|
||||
name,
|
||||
"URL/Address",
|
||||
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
|
||||
None,
|
||||
glib::ParamFlags::READWRITE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("receiver-ndi-name", |name| {
|
||||
glib::ParamSpec::string(
|
||||
name,
|
||||
|
@ -216,6 +227,18 @@ impl ObjectImpl for NdiAudioSrc {
|
|||
);
|
||||
settings.ndi_name = ndi_name;
|
||||
}
|
||||
subclass::Property("url-address", ..) => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let url_address = value.get().unwrap();
|
||||
gst_debug!(
|
||||
self.cat,
|
||||
obj: basesrc,
|
||||
"Changing url-address from {:?} to {:?}",
|
||||
settings.url_address,
|
||||
url_address,
|
||||
);
|
||||
settings.url_address = url_address;
|
||||
}
|
||||
subclass::Property("receiver-ndi-name", ..) => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let receiver_ndi_name = value.get().unwrap();
|
||||
|
@ -293,6 +316,10 @@ impl ObjectImpl for NdiAudioSrc {
|
|||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.ndi_name.to_value())
|
||||
}
|
||||
subclass::Property("url-address", ..) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.url_address.to_value())
|
||||
}
|
||||
subclass::Property("receiver-ndi-name", ..) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.receiver_ndi_name.to_value())
|
||||
|
@ -374,19 +401,18 @@ impl BaseSrcImpl for NdiAudioSrc {
|
|||
*self.state.lock().unwrap() = Default::default();
|
||||
let settings = self.settings.lock().unwrap().clone();
|
||||
|
||||
let ndi_name = if let Some(ref ndi_name) = settings.ndi_name {
|
||||
ndi_name
|
||||
} else {
|
||||
if settings.ndi_name.is_none() && settings.url_address.is_none() {
|
||||
return Err(gst_error_msg!(
|
||||
gst::LibraryError::Settings,
|
||||
["No IP address or NDI name given"]
|
||||
["No NDI name or URL/address given"]
|
||||
));
|
||||
};
|
||||
}
|
||||
|
||||
let receiver = connect_ndi(
|
||||
self.cat,
|
||||
element,
|
||||
ndi_name,
|
||||
settings.ndi_name.as_ref().map(String::as_str),
|
||||
settings.url_address.as_ref().map(String::as_str),
|
||||
&settings.receiver_ndi_name,
|
||||
settings.connect_timeout,
|
||||
settings.bandwidth,
|
||||
|
|
|
@ -25,6 +25,7 @@ use crate::DEFAULT_RECEIVER_NDI_NAME;
|
|||
#[derive(Debug, Clone)]
|
||||
struct Settings {
|
||||
ndi_name: Option<String>,
|
||||
url_address: Option<String>,
|
||||
connect_timeout: u32,
|
||||
timeout: u32,
|
||||
receiver_ndi_name: String,
|
||||
|
@ -36,6 +37,7 @@ impl Default for Settings {
|
|||
fn default() -> Self {
|
||||
Settings {
|
||||
ndi_name: None,
|
||||
url_address: None,
|
||||
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
|
||||
connect_timeout: 10000,
|
||||
timeout: 5000,
|
||||
|
@ -45,7 +47,7 @@ impl Default for Settings {
|
|||
}
|
||||
}
|
||||
|
||||
static PROPERTIES: [subclass::Property; 6] = [
|
||||
static PROPERTIES: [subclass::Property; 7] = [
|
||||
subclass::Property("ndi-name", |name| {
|
||||
glib::ParamSpec::string(
|
||||
name,
|
||||
|
@ -55,6 +57,15 @@ static PROPERTIES: [subclass::Property; 6] = [
|
|||
glib::ParamFlags::READWRITE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("url-address", |name| {
|
||||
glib::ParamSpec::string(
|
||||
name,
|
||||
"URL/Address",
|
||||
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
|
||||
None,
|
||||
glib::ParamFlags::READWRITE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("receiver-ndi-name", |name| {
|
||||
glib::ParamSpec::string(
|
||||
name,
|
||||
|
@ -251,6 +262,18 @@ impl ObjectImpl for NdiVideoSrc {
|
|||
);
|
||||
settings.ndi_name = ndi_name;
|
||||
}
|
||||
subclass::Property("url-address", ..) => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let url_address = value.get().unwrap();
|
||||
gst_debug!(
|
||||
self.cat,
|
||||
obj: basesrc,
|
||||
"Changing url-address from {:?} to {:?}",
|
||||
settings.url_address,
|
||||
url_address,
|
||||
);
|
||||
settings.url_address = url_address;
|
||||
}
|
||||
subclass::Property("receiver-ndi-name", ..) => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let receiver_ndi_name = value.get().unwrap();
|
||||
|
@ -328,6 +351,10 @@ impl ObjectImpl for NdiVideoSrc {
|
|||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.ndi_name.to_value())
|
||||
}
|
||||
subclass::Property("url-address", ..) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.url_address.to_value())
|
||||
}
|
||||
subclass::Property("receiver-ndi-name", ..) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.receiver_ndi_name.to_value())
|
||||
|
@ -409,19 +436,18 @@ impl BaseSrcImpl for NdiVideoSrc {
|
|||
*self.state.lock().unwrap() = Default::default();
|
||||
let settings = self.settings.lock().unwrap().clone();
|
||||
|
||||
let ndi_name = if let Some(ref ndi_name) = settings.ndi_name {
|
||||
ndi_name
|
||||
} else {
|
||||
if settings.ndi_name.is_none() && settings.url_address.is_none() {
|
||||
return Err(gst_error_msg!(
|
||||
gst::LibraryError::Settings,
|
||||
["No IP address or NDI name given"]
|
||||
["No NDI name or URL/address given"]
|
||||
));
|
||||
};
|
||||
}
|
||||
|
||||
let receiver = connect_ndi(
|
||||
self.cat,
|
||||
element,
|
||||
ndi_name,
|
||||
settings.ndi_name.as_ref().map(String::as_str),
|
||||
settings.url_address.as_ref().map(String::as_str),
|
||||
&settings.receiver_ndi_name,
|
||||
settings.connect_timeout,
|
||||
settings.bandwidth,
|
||||
|
@ -476,6 +502,8 @@ impl BaseSrcImpl for NdiVideoSrc {
|
|||
|
||||
let max = 5 * state.current_latency;
|
||||
|
||||
println!("Returning latency min {} max {}", min, max,);
|
||||
|
||||
gst_debug!(
|
||||
self.cat,
|
||||
obj: element,
|
||||
|
|
466
src/receiver.rs
466
src/receiver.rs
|
@ -9,28 +9,20 @@ use byte_slice_cast::AsMutSliceOf;
|
|||
|
||||
use std::cmp;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex, Weak};
|
||||
use std::thread;
|
||||
|
||||
use super::*;
|
||||
|
||||
enum ReceiverInfo {
|
||||
Connecting {
|
||||
id: usize,
|
||||
ndi_name: String,
|
||||
video: Option<Weak<ReceiverInner<VideoReceiver>>>,
|
||||
audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
|
||||
observations: Observations,
|
||||
},
|
||||
Connected {
|
||||
id: usize,
|
||||
ndi_name: String,
|
||||
recv: RecvInstance,
|
||||
video: Option<Weak<ReceiverInner<VideoReceiver>>>,
|
||||
audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
|
||||
observations: Observations,
|
||||
},
|
||||
pub struct ReceiverInfo {
|
||||
id: usize,
|
||||
ndi_name: Option<String>,
|
||||
url_address: Option<String>,
|
||||
recv: RecvInstance,
|
||||
video: Option<Weak<ReceiverInner<VideoReceiver>>>,
|
||||
audio: Option<Weak<ReceiverInner<AudioReceiver>>>,
|
||||
observations: Observations,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
|
@ -81,15 +73,17 @@ pub struct ReceiverInner<T: ReceiverType> {
|
|||
|
||||
queue: ReceiverQueue<T>,
|
||||
|
||||
recv: Mutex<Option<RecvInstance>>,
|
||||
recv_cond: Condvar,
|
||||
recv: Mutex<RecvInstance>,
|
||||
|
||||
observations: Observations,
|
||||
|
||||
cat: gst::DebugCategory,
|
||||
element: glib::WeakRef<gst_base::BaseSrc>,
|
||||
timestamp_mode: TimestampMode,
|
||||
|
||||
first_frame: AtomicBool,
|
||||
timeout: u32,
|
||||
connect_timeout: u32,
|
||||
|
||||
thread: Mutex<Option<std::thread::JoinHandle<()>>>,
|
||||
}
|
||||
|
@ -385,32 +379,15 @@ impl<T: ReceiverType> Receiver<T> {
|
|||
info: &mut ReceiverInfo,
|
||||
timestamp_mode: TimestampMode,
|
||||
timeout: u32,
|
||||
connect_timeout: u32,
|
||||
element: &gst_base::BaseSrc,
|
||||
cat: gst::DebugCategory,
|
||||
) -> Self
|
||||
where
|
||||
Receiver<T>: ReceiverCapture<T>,
|
||||
{
|
||||
let (id, storage_video, storage_audio, recv, observations) = match info {
|
||||
ReceiverInfo::Connecting {
|
||||
id,
|
||||
ref observations,
|
||||
ref mut audio,
|
||||
ref mut video,
|
||||
..
|
||||
} => (*id, video, audio, None, observations),
|
||||
ReceiverInfo::Connected {
|
||||
id,
|
||||
ref mut recv,
|
||||
ref observations,
|
||||
ref mut audio,
|
||||
ref mut video,
|
||||
..
|
||||
} => (*id, video, audio, Some(recv.clone()), observations),
|
||||
};
|
||||
|
||||
let receiver = Receiver(Arc::new(ReceiverInner {
|
||||
id,
|
||||
id: info.id,
|
||||
queue: ReceiverQueue(Arc::new((
|
||||
Mutex::new(ReceiverQueueInner {
|
||||
capturing: true,
|
||||
|
@ -422,13 +399,14 @@ impl<T: ReceiverType> Receiver<T> {
|
|||
}),
|
||||
Condvar::new(),
|
||||
))),
|
||||
recv: Mutex::new(recv),
|
||||
recv_cond: Condvar::new(),
|
||||
observations: observations.clone(),
|
||||
recv: Mutex::new(info.recv.clone()),
|
||||
observations: info.observations.clone(),
|
||||
cat,
|
||||
element: element.downgrade(),
|
||||
timestamp_mode,
|
||||
first_frame: AtomicBool::new(true),
|
||||
timeout,
|
||||
connect_timeout,
|
||||
thread: Mutex::new(None),
|
||||
}));
|
||||
|
||||
|
@ -459,7 +437,7 @@ impl<T: ReceiverType> Receiver<T> {
|
|||
});
|
||||
|
||||
let weak = Arc::downgrade(&receiver.0);
|
||||
Self::store_internal(storage_video, storage_audio, weak);
|
||||
Self::store_internal(info, weak);
|
||||
|
||||
*receiver.0.thread.lock().unwrap() = Some(thread);
|
||||
|
||||
|
@ -522,24 +500,12 @@ impl<T: ReceiverType> Drop for ReceiverInner<T> {
|
|||
|
||||
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
|
||||
{
|
||||
let val = receivers.get_mut(&self.id).unwrap();
|
||||
let (audio, video) = match val {
|
||||
ReceiverInfo::Connecting {
|
||||
ref mut audio,
|
||||
ref mut video,
|
||||
..
|
||||
} => (audio, video),
|
||||
ReceiverInfo::Connected {
|
||||
ref mut audio,
|
||||
ref mut video,
|
||||
..
|
||||
} => (audio, video),
|
||||
};
|
||||
if video.is_some() && audio.is_some() {
|
||||
let receiver = receivers.get_mut(&self.id).unwrap();
|
||||
if receiver.audio.is_some() && receiver.video.is_some() {
|
||||
if T::IS_VIDEO {
|
||||
*video = None;
|
||||
receiver.video = None;
|
||||
} else {
|
||||
*audio = None;
|
||||
receiver.audio = None;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -555,7 +521,8 @@ impl<T: ReceiverType> Drop for ReceiverInner<T> {
|
|||
pub fn connect_ndi<T: ReceiverType>(
|
||||
cat: gst::DebugCategory,
|
||||
element: &gst_base::BaseSrc,
|
||||
ndi_name: &str,
|
||||
ndi_name: Option<&str>,
|
||||
url_address: Option<&str>,
|
||||
receiver_ndi_name: &str,
|
||||
connect_timeout: u32,
|
||||
bandwidth: NDIlib_recv_bandwidth_e,
|
||||
|
@ -567,230 +534,77 @@ where
|
|||
{
|
||||
gst_debug!(cat, obj: element, "Starting NDI connection...");
|
||||
|
||||
assert!(ndi_name.is_some() || url_address.is_some());
|
||||
|
||||
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
|
||||
|
||||
// Check if we already have a receiver for this very stream
|
||||
for val in receivers.values_mut() {
|
||||
let (val_audio, val_video, val_ndi_name) = match val {
|
||||
ReceiverInfo::Connecting {
|
||||
ref mut audio,
|
||||
ref mut video,
|
||||
ref ndi_name,
|
||||
..
|
||||
} => (audio, video, ndi_name.as_str()),
|
||||
ReceiverInfo::Connected {
|
||||
ref mut audio,
|
||||
ref mut video,
|
||||
ref ndi_name,
|
||||
..
|
||||
} => (audio, video, ndi_name.as_str()),
|
||||
};
|
||||
|
||||
if val_ndi_name == ndi_name {
|
||||
if (val_video.is_some() || !T::IS_VIDEO) && (val_audio.is_some() || T::IS_VIDEO) {
|
||||
for receiver in receivers.values_mut() {
|
||||
// If both are provided they both must match, if only one is provided
|
||||
// then that one has to match and the other one does not matter
|
||||
if (ndi_name.is_some()
|
||||
&& url_address.is_some()
|
||||
&& receiver.ndi_name.as_ref().map(String::as_str) == ndi_name
|
||||
&& receiver.url_address.as_ref().map(String::as_str) == url_address)
|
||||
|| (ndi_name.is_some()
|
||||
&& url_address.is_none()
|
||||
&& receiver.ndi_name.as_ref().map(String::as_str) == ndi_name)
|
||||
|| (ndi_name.is_none()
|
||||
&& url_address.is_some()
|
||||
&& receiver.url_address.as_ref().map(String::as_str) == url_address)
|
||||
{
|
||||
if (receiver.video.is_some() || !T::IS_VIDEO)
|
||||
&& (receiver.audio.is_some() || T::IS_VIDEO)
|
||||
{
|
||||
gst_element_error!(
|
||||
element,
|
||||
gst::ResourceError::OpenRead,
|
||||
[
|
||||
"Source with ndi-name '{}' already in use for {}",
|
||||
val_ndi_name,
|
||||
"Source with NDI name '{:?}' / URL/address '{:?}' already in use for {}",
|
||||
receiver.ndi_name,
|
||||
receiver.url_address,
|
||||
if T::IS_VIDEO { "video" } else { "audio" }
|
||||
]
|
||||
);
|
||||
|
||||
return None;
|
||||
} else {
|
||||
return Some(Receiver::new(val, timestamp_mode, timeout, element, cat));
|
||||
return Some(Receiver::new(
|
||||
receiver,
|
||||
timestamp_mode,
|
||||
timeout,
|
||||
connect_timeout,
|
||||
element,
|
||||
cat,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise asynchronously search for it and return the receiver to the caller
|
||||
let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst);
|
||||
let mut info = ReceiverInfo::Connecting {
|
||||
id: id_receiver,
|
||||
ndi_name: String::from(ndi_name),
|
||||
video: None,
|
||||
audio: None,
|
||||
observations: Observations::new(),
|
||||
};
|
||||
|
||||
let receiver = Receiver::new(&mut info, timestamp_mode, timeout, element, cat);
|
||||
|
||||
receivers.insert(id_receiver, info);
|
||||
|
||||
let receiver_ndi_name = String::from(receiver_ndi_name);
|
||||
let element = element.clone();
|
||||
thread::spawn(move || {
|
||||
use std::panic;
|
||||
|
||||
let res = match panic::catch_unwind(move || {
|
||||
connect_ndi_async(
|
||||
cat,
|
||||
&element,
|
||||
id_receiver,
|
||||
receiver_ndi_name,
|
||||
connect_timeout,
|
||||
bandwidth,
|
||||
)
|
||||
}) {
|
||||
Ok(res) => res,
|
||||
Err(_) => Err(Some(gst_error_msg!(
|
||||
gst::LibraryError::Failed,
|
||||
["Panic while connecting to NDI source"]
|
||||
))),
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(_) => (),
|
||||
Err(None) => {
|
||||
gst_debug!(cat, "Shutting down while connecting");
|
||||
}
|
||||
Err(Some(err)) => {
|
||||
gst_error!(cat, "Error while connecting: {:?}", err);
|
||||
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
|
||||
let info = match receivers.get_mut(&id_receiver) {
|
||||
None => return,
|
||||
Some(val) => val,
|
||||
};
|
||||
|
||||
let (audio, video) = match info {
|
||||
ReceiverInfo::Connecting {
|
||||
ref audio,
|
||||
ref video,
|
||||
..
|
||||
} => (audio, video),
|
||||
ReceiverInfo::Connected { .. } => unreachable!(),
|
||||
};
|
||||
|
||||
assert!(audio.is_some() || video.is_some());
|
||||
|
||||
if let Some(audio) = audio.as_ref().and_then(|v| v.upgrade()).map(Receiver) {
|
||||
if let Some(element) = audio.0.element.upgrade() {
|
||||
element.post_error_message(&err);
|
||||
}
|
||||
let audio_recv = audio.0.recv.lock().unwrap();
|
||||
let mut queue = (audio.0.queue.0).0.lock().unwrap();
|
||||
assert!(audio_recv.is_none());
|
||||
queue.error = Some(gst::FlowError::Error);
|
||||
audio.0.recv_cond.notify_one();
|
||||
(audio.0.queue.0).1.notify_one();
|
||||
}
|
||||
|
||||
if let Some(video) = video.as_ref().and_then(|v| v.upgrade()).map(Receiver) {
|
||||
if let Some(element) = video.0.element.upgrade() {
|
||||
element.post_error_message(&err);
|
||||
}
|
||||
let video_recv = video.0.recv.lock().unwrap();
|
||||
let mut queue = (video.0.queue.0).0.lock().unwrap();
|
||||
assert!(video_recv.is_none());
|
||||
queue.error = Some(gst::FlowError::Error);
|
||||
video.0.recv_cond.notify_one();
|
||||
(video.0.queue.0).1.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Some(receiver)
|
||||
}
|
||||
|
||||
fn connect_ndi_async(
|
||||
cat: gst::DebugCategory,
|
||||
element: &gst_base::BaseSrc,
|
||||
id_receiver: usize,
|
||||
receiver_ndi_name: String,
|
||||
connect_timeout: u32,
|
||||
bandwidth: NDIlib_recv_bandwidth_e,
|
||||
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
let mut find = match FindInstance::builder().build() {
|
||||
None => {
|
||||
return Err(Some(gst_error_msg!(
|
||||
gst::CoreError::Negotiation,
|
||||
["Cannot run NDI: NDIlib_find_create_v2 error"]
|
||||
)));
|
||||
}
|
||||
Some(find) => find,
|
||||
};
|
||||
|
||||
let timer = time::Instant::now();
|
||||
let source = loop {
|
||||
let new_sources = find.wait_for_sources(100);
|
||||
let sources = find.get_current_sources();
|
||||
|
||||
gst_debug!(
|
||||
cat,
|
||||
obj: element,
|
||||
"Total sources found in network {}",
|
||||
sources.len(),
|
||||
);
|
||||
|
||||
if new_sources {
|
||||
for source in &sources {
|
||||
gst_debug!(
|
||||
cat,
|
||||
obj: element,
|
||||
"Found source '{}' with URL {}",
|
||||
source.ndi_name(),
|
||||
source.url_address(),
|
||||
);
|
||||
}
|
||||
|
||||
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
|
||||
let info = match receivers.get(&id_receiver) {
|
||||
None => return Err(None),
|
||||
Some(val) => val,
|
||||
};
|
||||
|
||||
let ndi_name = match info {
|
||||
ReceiverInfo::Connecting {
|
||||
ref ndi_name,
|
||||
ref audio,
|
||||
ref video,
|
||||
..
|
||||
} => {
|
||||
assert!(audio.is_some() || video.is_some());
|
||||
ndi_name
|
||||
}
|
||||
ReceiverInfo::Connected { .. } => unreachable!(),
|
||||
};
|
||||
|
||||
let source = sources.iter().find(|s| s.ndi_name() == ndi_name.as_str());
|
||||
|
||||
if let Some(source) = source {
|
||||
break source.to_owned();
|
||||
}
|
||||
}
|
||||
|
||||
if timer.elapsed().as_millis() >= connect_timeout as u128 {
|
||||
return Err(Some(gst_error_msg!(
|
||||
gst::ResourceError::NotFound,
|
||||
["Stream not found"]
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
// Otherwise create a new one and return it to the caller
|
||||
gst_debug!(
|
||||
cat,
|
||||
obj: element,
|
||||
"Connecting to NDI source with ndi-name '{}' and URL {}",
|
||||
source.ndi_name(),
|
||||
source.url_address(),
|
||||
"Connecting to NDI source with NDI name '{:?}' and URL/Address {:?}",
|
||||
ndi_name,
|
||||
url_address,
|
||||
);
|
||||
|
||||
// FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be
|
||||
// broken with interlaced content currently
|
||||
let recv = RecvInstance::builder(&source, &receiver_ndi_name)
|
||||
let recv = RecvInstance::builder(ndi_name, url_address, &receiver_ndi_name)
|
||||
.bandwidth(bandwidth)
|
||||
.color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA)
|
||||
.allow_video_fields(true)
|
||||
.build();
|
||||
let recv = match recv {
|
||||
None => {
|
||||
return Err(Some(gst_error_msg!(
|
||||
gst_element_error!(
|
||||
element,
|
||||
gst::CoreError::Negotiation,
|
||||
["Failed to connect to source"]
|
||||
)));
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Some(recv) => recv,
|
||||
};
|
||||
|
@ -800,96 +614,36 @@ fn connect_ndi_async(
|
|||
let enable_hw_accel = MetadataFrame::new(0, Some("<ndi_hwaccel enabled=\"true\"/>"));
|
||||
recv.send_metadata(&enable_hw_accel);
|
||||
|
||||
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
|
||||
let info = match receivers.get_mut(&id_receiver) {
|
||||
None => return Err(None),
|
||||
Some(val) => val,
|
||||
};
|
||||
|
||||
let (audio, video, observations) = match info {
|
||||
ReceiverInfo::Connecting {
|
||||
ref audio,
|
||||
ref video,
|
||||
ref observations,
|
||||
..
|
||||
} => (audio.clone(), video.clone(), observations),
|
||||
ReceiverInfo::Connected { .. } => unreachable!(),
|
||||
};
|
||||
|
||||
assert!(audio.is_some() || video.is_some());
|
||||
|
||||
*info = ReceiverInfo::Connected {
|
||||
let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst);
|
||||
let mut info = ReceiverInfo {
|
||||
id: id_receiver,
|
||||
ndi_name: source.ndi_name().to_owned(),
|
||||
recv: recv.clone(),
|
||||
video: video.clone(),
|
||||
audio: audio.clone(),
|
||||
observations: observations.clone(),
|
||||
ndi_name: ndi_name.map(String::from),
|
||||
url_address: url_address.map(String::from),
|
||||
recv,
|
||||
video: None,
|
||||
audio: None,
|
||||
observations: Observations::new(),
|
||||
};
|
||||
|
||||
gst_debug!(cat, obj: element, "Started NDI connection");
|
||||
// This will set info.audio/video accordingly
|
||||
let receiver = Receiver::new(
|
||||
&mut info,
|
||||
timestamp_mode,
|
||||
timeout,
|
||||
connect_timeout,
|
||||
element,
|
||||
cat,
|
||||
);
|
||||
|
||||
if let Some(audio) = audio.and_then(|v| v.upgrade()).map(Receiver) {
|
||||
let mut audio_recv = audio.0.recv.lock().unwrap();
|
||||
assert!(audio_recv.is_none());
|
||||
*audio_recv = Some(recv.clone());
|
||||
audio.0.recv_cond.notify_one();
|
||||
}
|
||||
receivers.insert(id_receiver, info);
|
||||
|
||||
if let Some(video) = video.and_then(|v| v.upgrade()).map(Receiver) {
|
||||
let mut video_recv = video.0.recv.lock().unwrap();
|
||||
assert!(video_recv.is_none());
|
||||
*video_recv = Some(recv);
|
||||
video.0.recv_cond.notify_one();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Some(receiver)
|
||||
}
|
||||
|
||||
fn receive_thread<T: ReceiverType>(receiver: &Weak<ReceiverInner<T>>)
|
||||
where
|
||||
Receiver<T>: ReceiverCapture<T>,
|
||||
{
|
||||
// First loop until we actually are connected, or an error happened
|
||||
let recv = {
|
||||
let receiver = match receiver.upgrade().map(Receiver) {
|
||||
None => return,
|
||||
Some(receiver) => receiver,
|
||||
};
|
||||
|
||||
let element = match receiver.0.element.upgrade() {
|
||||
None => return,
|
||||
Some(element) => element,
|
||||
};
|
||||
|
||||
let mut recv = receiver.0.recv.lock().unwrap();
|
||||
loop {
|
||||
{
|
||||
let queue = (receiver.0.queue.0).0.lock().unwrap();
|
||||
if !queue.capturing {
|
||||
gst_debug!(receiver.0.cat, obj: &element, "Shutting down");
|
||||
return;
|
||||
}
|
||||
|
||||
// If an error happened in the meantime, just go out of here
|
||||
if queue.error.is_some() {
|
||||
gst_error!(
|
||||
receiver.0.cat,
|
||||
obj: &element,
|
||||
"Error while waiting for connection"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref recv) = *recv {
|
||||
break recv.clone();
|
||||
}
|
||||
|
||||
recv = receiver.0.recv_cond.wait(recv).unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
// Now first capture frames until the queues are empty so that we're sure that we output only
|
||||
// the very latest frame that is available now
|
||||
loop {
|
||||
|
@ -921,6 +675,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
let recv = receiver.0.recv.lock().unwrap();
|
||||
|
||||
let queue = recv.get_queue();
|
||||
if (!T::IS_VIDEO && queue.audio_frames() <= 1) || (T::IS_VIDEO && queue.video_frames() <= 1)
|
||||
{
|
||||
|
@ -950,6 +706,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
let recv = receiver.0.recv.lock().unwrap();
|
||||
|
||||
let res = receiver.capture_internal(&element, &recv);
|
||||
|
||||
match res {
|
||||
|
@ -999,11 +757,7 @@ pub trait ReceiverCapture<T: ReceiverType> {
|
|||
recv: &RecvInstance,
|
||||
) -> Result<(gst::Buffer, T::InfoType), gst::FlowError>;
|
||||
|
||||
fn store_internal(
|
||||
storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
|
||||
storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
|
||||
weak: Weak<ReceiverInner<T>>,
|
||||
);
|
||||
fn store_internal(info: &mut ReceiverInfo, weak: Weak<ReceiverInner<T>>);
|
||||
}
|
||||
|
||||
impl ReceiverCapture<VideoReceiver> for Receiver<VideoReceiver> {
|
||||
|
@ -1015,13 +769,9 @@ impl ReceiverCapture<VideoReceiver> for Receiver<VideoReceiver> {
|
|||
self.capture_video(element, recv)
|
||||
}
|
||||
|
||||
fn store_internal(
|
||||
storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
|
||||
_storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
|
||||
weak: Weak<ReceiverInner<VideoReceiver>>,
|
||||
) {
|
||||
assert!(storage_video.is_none());
|
||||
*storage_video = Some(weak);
|
||||
fn store_internal(info: &mut ReceiverInfo, weak: Weak<ReceiverInner<VideoReceiver>>) {
|
||||
assert!(info.video.is_none());
|
||||
info.video = Some(weak);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1109,13 +859,9 @@ impl ReceiverCapture<AudioReceiver> for Receiver<AudioReceiver> {
|
|||
self.capture_audio(element, recv)
|
||||
}
|
||||
|
||||
fn store_internal(
|
||||
_storage_video: &mut Option<Weak<ReceiverInner<VideoReceiver>>>,
|
||||
storage_audio: &mut Option<Weak<ReceiverInner<AudioReceiver>>>,
|
||||
weak: Weak<ReceiverInner<AudioReceiver>>,
|
||||
) {
|
||||
assert!(storage_audio.is_none());
|
||||
*storage_audio = Some(weak);
|
||||
fn store_internal(info: &mut ReceiverInfo, weak: Weak<ReceiverInner<AudioReceiver>>) {
|
||||
assert!(info.audio.is_none());
|
||||
info.audio = Some(weak);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1125,7 +871,12 @@ impl Receiver<VideoReceiver> {
|
|||
element: &gst_base::BaseSrc,
|
||||
recv: &RecvInstance,
|
||||
) -> Result<(gst::Buffer, gst_video::VideoInfo), gst::FlowError> {
|
||||
let timeout = time::Instant::now();
|
||||
let timer = time::Instant::now();
|
||||
let timeout = if self.0.first_frame.load(Ordering::SeqCst) {
|
||||
self.0.connect_timeout
|
||||
} else {
|
||||
self.0.timeout
|
||||
};
|
||||
let mut flushing;
|
||||
let mut playing;
|
||||
|
||||
|
@ -1156,7 +907,7 @@ impl Receiver<VideoReceiver> {
|
|||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => {
|
||||
Ok(None) if timer.elapsed().as_millis() >= timeout as u128 => {
|
||||
gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",);
|
||||
return Err(gst::FlowError::Eos);
|
||||
}
|
||||
|
@ -1174,6 +925,8 @@ impl Receiver<VideoReceiver> {
|
|||
break video_frame;
|
||||
};
|
||||
|
||||
self.0.first_frame.store(false, Ordering::SeqCst);
|
||||
|
||||
gst_debug!(
|
||||
self.0.cat,
|
||||
obj: element,
|
||||
|
@ -1546,7 +1299,12 @@ impl Receiver<AudioReceiver> {
|
|||
element: &gst_base::BaseSrc,
|
||||
recv: &RecvInstance,
|
||||
) -> Result<(gst::Buffer, gst_audio::AudioInfo), gst::FlowError> {
|
||||
let timeout = time::Instant::now();
|
||||
let timer = time::Instant::now();
|
||||
let timeout = if self.0.first_frame.load(Ordering::SeqCst) {
|
||||
self.0.connect_timeout
|
||||
} else {
|
||||
self.0.timeout
|
||||
};
|
||||
let mut flushing;
|
||||
let mut playing;
|
||||
|
||||
|
@ -1577,7 +1335,7 @@ impl Receiver<AudioReceiver> {
|
|||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => {
|
||||
Ok(None) if timer.elapsed().as_millis() >= timeout as u128 => {
|
||||
gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",);
|
||||
return Err(gst::FlowError::Eos);
|
||||
}
|
||||
|
@ -1595,6 +1353,8 @@ impl Receiver<AudioReceiver> {
|
|||
break audio_frame;
|
||||
};
|
||||
|
||||
self.0.first_frame.store(false, Ordering::SeqCst);
|
||||
|
||||
gst_debug!(
|
||||
self.0.cat,
|
||||
obj: element,
|
||||
|
|
Loading…
Reference in a new issue