diff --git a/src/error.rs b/src/error.rs index 1c1f9f8a..f6e4e862 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,8 @@ use std::fmt::{Display, Formatter}; use std::fmt::Error as FmtError; use std::borrow::Cow; +use url::Url; + use utils::*; macro_rules! error_msg( @@ -115,6 +117,49 @@ impl ErrorMessage { line: line, } } + + + pub unsafe fn post(&self, element: *mut c_void) { + extern "C" { + fn gst_rs_element_error(sink: *mut c_void, + error_domain: u32, + error_code: i32, + message: *const c_char, + debug: *const c_char, + filename: *const c_char, + function: *const c_char, + line: u32); + } + + let ErrorMessage { error_domain, + error_code, + ref message, + ref debug, + filename, + function, + line } = *self; + + let message_cstr = message.as_ref().map(|m| CString::new(m.as_bytes()).unwrap()); + let message_ptr = message_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr()); + + let debug_cstr = debug.as_ref().map(|m| CString::new(m.as_bytes()).unwrap()); + let debug_ptr = debug_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr()); + + let file_cstr = CString::new(filename.as_bytes()).unwrap(); + let file_ptr = file_cstr.as_ptr(); + + let function_cstr = CString::new(function.as_bytes()).unwrap(); + let function_ptr = function_cstr.as_ptr(); + + gst_rs_element_error(element, + error_domain, + error_code, + message_ptr, + debug_ptr, + file_ptr, + function_ptr, + line); + } } #[derive(Debug)] @@ -246,3 +291,5 @@ impl Error for UriError { } } } + +pub type UriValidator = Fn(&Url) -> Result<(), UriError>; diff --git a/src/plugin.c b/src/plugin.c index b6fd1111..11115fb1 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -43,3 +43,12 @@ GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, rsplugin, "Rust Plugin", plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN); + +void +gst_rs_element_error (GstElement * element, GQuark error_domain, + gint error_code, const gchar * message, const gchar * debug, + const gchar * file, const gchar * function, guint line) +{ + gst_element_message_full (element, GST_MESSAGE_ERROR, error_domain, + error_code, g_strdup (message), g_strdup (debug), file, function, line); +} diff --git a/src/rsfilesink.rs b/src/rsfilesink.rs index d0acacb4..eded0b5b 100644 --- a/src/rsfilesink.rs +++ b/src/rsfilesink.rs @@ -17,21 +17,14 @@ // Boston, MA 02110-1301, USA. use std::fs::File; -use std::path::PathBuf; use url::Url; use std::io::Write; -use std::sync::Mutex; use std::convert::From; use error::*; use rssink::*; -#[derive(Debug)] -struct Settings { - location: Option, -} - #[derive(Debug)] enum StreamingState { Stopped, @@ -40,68 +33,44 @@ enum StreamingState { #[derive(Debug)] pub struct FileSink { - controller: SinkController, - settings: Mutex, - streaming_state: Mutex, + streaming_state: StreamingState, } -unsafe impl Sync for FileSink {} -unsafe impl Send for FileSink {} - impl FileSink { - pub fn new(controller: SinkController) -> FileSink { - FileSink { - controller: controller, - settings: Mutex::new(Settings { location: None }), - streaming_state: Mutex::new(StreamingState::Stopped), - } + pub fn new() -> FileSink { + FileSink { streaming_state: StreamingState::Stopped } } - pub fn new_boxed(controller: SinkController) -> Box { - Box::new(FileSink::new(controller)) + pub fn new_boxed() -> Box { + Box::new(FileSink::new()) } } +fn validate_uri(uri: &Url) -> Result<(), UriError> { + let _ = try!(uri.to_file_path() + .or_else(|_| { + Err(UriError::new(UriErrorKind::UnsupportedProtocol, + Some(format!("Unsupported file URI '{}'", uri.as_str())))) + })); + Ok(()) +} + impl Sink for FileSink { - fn get_controller(&self) -> &SinkController { - &self.controller + fn uri_validator(&self) -> Box { + Box::new(validate_uri) } - fn set_uri(&self, uri: Option) -> Result<(), UriError> { - let location = &mut self.settings.lock().unwrap().location; - - *location = None; - match uri { - None => Ok(()), - Some(ref uri) => { - *location = Some(try!(uri.to_file_path() - .or_else(|_| { - Err(UriError::new(UriErrorKind::UnsupportedProtocol, - Some(format!("Unsupported file URI '{}'", uri.as_str())))) - }))); - Ok(()) - } - } - } - - fn get_uri(&self) -> Option { - let location = &self.settings.lock().unwrap().location; - location.as_ref() - .map(|l| Url::from_file_path(l).ok()) - .and_then(|i| i) // join() - } - - fn start(&self) -> Result<(), ErrorMessage> { - let location = &self.settings.lock().unwrap().location; - let mut streaming_state = self.streaming_state.lock().unwrap(); - - if let StreamingState::Started { .. } = *streaming_state { + fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage> { + if let StreamingState::Started { .. } = self.streaming_state { return Err(error_msg!(SinkError::Failure, ["Sink already started"])); } - let location = &try!(location.as_ref().ok_or_else(|| { - error_msg!(SinkError::Failure, ["No URI provided"]) - })); + let location = try!(uri.to_file_path() + .or_else(|_| { + Err(error_msg!(SinkError::Failure, + ["Unsupported file URI '{}'", uri.as_str()])) + })); + let file = try!(File::create(location.as_path()).or_else(|err| { Err(error_msg!(SinkError::OpenFailed, @@ -110,7 +79,7 @@ impl Sink for FileSink { err.to_string()])) })); - *streaming_state = StreamingState::Started { + self.streaming_state = StreamingState::Started { file: file, position: 0, }; @@ -118,17 +87,14 @@ impl Sink for FileSink { Ok(()) } - fn stop(&self) -> Result<(), ErrorMessage> { - let mut streaming_state = self.streaming_state.lock().unwrap(); - *streaming_state = StreamingState::Stopped; + fn stop(&mut self) -> Result<(), ErrorMessage> { + self.streaming_state = StreamingState::Stopped; Ok(()) } - fn render(&self, data: &[u8]) -> Result<(), FlowError> { - let mut streaming_state = self.streaming_state.lock().unwrap(); - - let (file, position) = match *streaming_state { + fn render(&mut self, data: &[u8]) -> Result<(), FlowError> { + let (file, position) = match self.streaming_state { StreamingState::Started { ref mut file, ref mut position } => (file, position), StreamingState::Stopped => { return Err(FlowError::Error(error_msg!(SinkError::Failure, ["Not started yet"]))); @@ -140,6 +106,7 @@ impl Sink for FileSink { })); *position += data.len() as u64; + Ok(()) } } diff --git a/src/rsfilesrc.rs b/src/rsfilesrc.rs index d4bd9f48..33f4e0fd 100644 --- a/src/rsfilesrc.rs +++ b/src/rsfilesrc.rs @@ -18,18 +18,11 @@ use std::u64; use std::io::{Read, Seek, SeekFrom}; use std::fs::File; -use std::path::PathBuf; -use std::sync::Mutex; use url::Url; use error::*; use rssource::*; -#[derive(Debug)] -struct Settings { - location: Option, -} - #[derive(Debug)] enum StreamingState { Stopped, @@ -38,58 +31,31 @@ enum StreamingState { #[derive(Debug)] pub struct FileSrc { - controller: SourceController, - settings: Mutex, - streaming_state: Mutex, + streaming_state: StreamingState, } -unsafe impl Sync for FileSrc {} -unsafe impl Send for FileSrc {} - impl FileSrc { - pub fn new(controller: SourceController) -> FileSrc { - FileSrc { - controller: controller, - settings: Mutex::new(Settings { location: None }), - streaming_state: Mutex::new(StreamingState::Stopped), - } + pub fn new() -> FileSrc { + FileSrc { streaming_state: StreamingState::Stopped } } - pub fn new_boxed(controller: SourceController) -> Box { - Box::new(FileSrc::new(controller)) + pub fn new_boxed() -> Box { + Box::new(FileSrc::new()) } } +fn validate_uri(uri: &Url) -> Result<(), UriError> { + let _ = try!(uri.to_file_path() + .or_else(|_| { + Err(UriError::new(UriErrorKind::UnsupportedProtocol, + Some(format!("Unsupported file URI '{}'", uri.as_str())))) + })); + Ok(()) +} + impl Source for FileSrc { - fn get_controller(&self) -> &SourceController { - &self.controller - } - - fn set_uri(&self, uri: Option) -> Result<(), UriError> { - let location = &mut self.settings.lock().unwrap().location; - - match uri { - None => { - *location = None; - Ok(()) - } - Some(ref uri) => { - *location = Some(try!(uri.to_file_path() - .or_else(|_| { - Err(UriError::new(UriErrorKind::UnsupportedProtocol, - Some(format!("Unsupported file URI '{}'", uri.as_str())))) - }))); - Ok(()) - } - } - } - - fn get_uri(&self) -> Option { - let location = &self.settings.lock().unwrap().location; - - location.as_ref() - .map(|l| Url::from_file_path(l).ok()) - .and_then(|i| i) // join() + fn uri_validator(&self) -> Box { + Box::new(validate_uri) } fn is_seekable(&self) -> bool { @@ -97,9 +63,7 @@ impl Source for FileSrc { } fn get_size(&self) -> u64 { - let streaming_state = self.streaming_state.lock().unwrap(); - - if let StreamingState::Started { ref file, .. } = *streaming_state { + if let StreamingState::Started { ref file, .. } = self.streaming_state { file.metadata() .ok() .map_or(u64::MAX, |m| m.len()) @@ -108,16 +72,16 @@ impl Source for FileSrc { } } - fn start(&self) -> Result<(), ErrorMessage> { - let location = &self.settings.lock().unwrap().location; - let mut streaming_state = self.streaming_state.lock().unwrap(); - - if let StreamingState::Started { .. } = *streaming_state { + fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage> { + if let StreamingState::Started { .. } = self.streaming_state { return Err(error_msg!(SourceError::Failure, ["Source already started"])); } - let location = &try!(location.as_ref() - .ok_or_else(|| error_msg!(SourceError::Failure, ["No URI provided"]))); + let location = try!(uri.to_file_path() + .or_else(|_| { + Err(error_msg!(SourceError::Failure, + ["Unsupported file URI '{}'", uri.as_str()])) + })); let file = try!(File::open(location.as_path()).or_else(|err| { Err(error_msg!(SourceError::OpenFailed, @@ -126,7 +90,7 @@ impl Source for FileSrc { err.to_string()])) })); - *streaming_state = StreamingState::Started { + self.streaming_state = StreamingState::Started { file: file, position: 0, }; @@ -134,18 +98,14 @@ impl Source for FileSrc { Ok(()) } - fn stop(&self) -> Result<(), ErrorMessage> { - let mut streaming_state = self.streaming_state.lock().unwrap(); - *streaming_state = StreamingState::Stopped; + fn stop(&mut self) -> Result<(), ErrorMessage> { + self.streaming_state = StreamingState::Stopped; Ok(()) } - fn fill(&self, offset: u64, data: &mut [u8]) -> Result { - let mut streaming_state = self.streaming_state.lock().unwrap(); - - - let (file, position) = match *streaming_state { + fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result { + let (file, position) = match self.streaming_state { StreamingState::Started { ref mut file, ref mut position } => (file, position), StreamingState::Stopped => { return Err(FlowError::Error(error_msg!(SourceError::Failure, ["Not started yet"]))); @@ -172,7 +132,7 @@ impl Source for FileSrc { Ok(size) } - fn do_seek(&self, _: u64, _: u64) -> Result<(), ErrorMessage> { + fn seek(&mut self, _: u64, _: u64) -> Result<(), ErrorMessage> { Ok(()) } } diff --git a/src/rshttpsrc.rs b/src/rshttpsrc.rs index 0f6e7732..2b979af0 100644 --- a/src/rshttpsrc.rs +++ b/src/rshttpsrc.rs @@ -23,20 +23,14 @@ use hyper::header::{ContentLength, ContentRange, ContentRangeSpec, Range, ByteRa use hyper::client::Client; use hyper::client::response::Response; -use std::sync::Mutex; - use error::*; use rssource::*; -#[derive(Debug)] -struct Settings { - url: Option, -} - #[derive(Debug)] enum StreamingState { Stopped, Started { + uri: Url, response: Response, seekable: bool, position: u64, @@ -48,37 +42,24 @@ enum StreamingState { #[derive(Debug)] pub struct HttpSrc { - controller: SourceController, - settings: Mutex, - streaming_state: Mutex, + streaming_state: StreamingState, client: Client, } -unsafe impl Sync for HttpSrc {} -unsafe impl Send for HttpSrc {} - impl HttpSrc { - pub fn new(controller: SourceController) -> HttpSrc { + pub fn new() -> HttpSrc { HttpSrc { - controller: controller, - settings: Mutex::new(Settings { url: None }), - streaming_state: Mutex::new(StreamingState::Stopped), + streaming_state: StreamingState::Stopped, client: Client::new(), } } - pub fn new_boxed(controller: SourceController) -> Box { - Box::new(HttpSrc::new(controller)) + pub fn new_boxed() -> Box { + Box::new(HttpSrc::new()) } - fn do_request(&self, start: u64, stop: u64) -> Result { - let url = &self.settings.lock().unwrap().url; - - let url = try!(url.as_ref() - .ok_or_else(|| error_msg!(SourceError::Failure, ["No URI provided"]))); - - - let mut req = self.client.get(url.clone()); + fn do_request(&self, uri: Url, start: u64, stop: u64) -> Result { + let mut req = self.client.get(uri.clone()); if start != 0 || stop != u64::MAX { req = if stop == u64::MAX { @@ -90,12 +71,12 @@ impl HttpSrc { let response = try!(req.send().or_else(|err| { Err(error_msg!(SourceError::ReadFailed, - ["Failed to fetch {}: {}", url, err.to_string()])) + ["Failed to fetch {}: {}", uri, err.to_string()])) })); if !response.status.is_success() { return Err(error_msg!(SourceError::ReadFailed, - ["Failed to fetch {}: {}", url, response.status])); + ["Failed to fetch {}: {}", uri, response.status])); } let size = if let Some(&ContentLength(content_length)) = response.headers.get() { @@ -128,6 +109,7 @@ impl HttpSrc { } Ok(StreamingState::Started { + uri: uri, response: response, seekable: seekable, position: 0, @@ -138,95 +120,69 @@ impl HttpSrc { } } +fn validate_uri(uri: &Url) -> Result<(), UriError> { + if uri.scheme() != "http" && uri.scheme() != "https" { + return Err(UriError::new(UriErrorKind::UnsupportedProtocol, + Some(format!("Unsupported URI '{}'", uri.as_str())))); + } + + Ok(()) +} + impl Source for HttpSrc { - fn get_controller(&self) -> &SourceController { - &self.controller - } - - fn set_uri(&self, uri: Option) -> Result<(), UriError> { - let url = &mut self.settings.lock().unwrap().url; - - match uri { - None => { - *url = None; - Ok(()) - } - Some(uri) => { - if uri.scheme() != "http" && uri.scheme() != "https" { - *url = None; - return Err(UriError::new(UriErrorKind::UnsupportedProtocol, - Some(format!("Unsupported URI '{}'", uri.as_str())))); - } - - *url = Some(uri); - Ok(()) - } - } - } - - fn get_uri(&self) -> Option { - let url = &self.settings.lock().unwrap().url; - url.as_ref().cloned() + fn uri_validator(&self) -> Box { + Box::new(validate_uri) } fn is_seekable(&self) -> bool { - let streaming_state = self.streaming_state.lock().unwrap(); - - match *streaming_state { + match self.streaming_state { StreamingState::Started { seekable, .. } => seekable, _ => false, } } fn get_size(&self) -> u64 { - let streaming_state = self.streaming_state.lock().unwrap(); - match *streaming_state { + match self.streaming_state { StreamingState::Started { size, .. } => size, _ => u64::MAX, } } - fn start(&self) -> Result<(), ErrorMessage> { - let mut streaming_state = self.streaming_state.lock().unwrap(); - *streaming_state = StreamingState::Stopped; - - let new_state = try!(self.do_request(0, u64::MAX)); - - *streaming_state = new_state; - Ok(()) - } - - fn stop(&self) -> Result<(), ErrorMessage> { - let mut streaming_state = self.streaming_state.lock().unwrap(); - *streaming_state = StreamingState::Stopped; + fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage> { + self.streaming_state = StreamingState::Stopped; + self.streaming_state = try!(self.do_request(uri.clone(), 0, u64::MAX)); Ok(()) } - fn do_seek(&self, start: u64, stop: u64) -> Result<(), ErrorMessage> { - let mut streaming_state = self.streaming_state.lock().unwrap(); - *streaming_state = StreamingState::Stopped; + fn stop(&mut self) -> Result<(), ErrorMessage> { + self.streaming_state = StreamingState::Stopped; - let new_state = try!(self.do_request(start, stop)); - - *streaming_state = new_state; Ok(()) } - fn fill(&self, offset: u64, data: &mut [u8]) -> Result { - let mut streaming_state = self.streaming_state.lock().unwrap(); - - if let StreamingState::Started { position, stop, .. } = *streaming_state { - if position != offset { - *streaming_state = StreamingState::Stopped; - let new_state = try!(self.do_request(offset, stop) - .or_else(|err| Err(FlowError::Error(err)))); - - *streaming_state = new_state; + fn seek(&mut self, start: u64, stop: u64) -> Result<(), ErrorMessage> { + let (position, old_stop, uri) = match self.streaming_state { + StreamingState::Started { position, stop, ref uri, .. } => { + (position, stop, uri.clone()) } + StreamingState::Stopped => { + return Err(error_msg!(SourceError::Failure, ["Not started yet"])); + } + }; + + if position == start && old_stop == stop { + return Ok(()); } - let (response, position) = match *streaming_state { + self.streaming_state = StreamingState::Stopped; + self.streaming_state = try!(self.do_request(uri, start, stop)); + + Ok(()) + } + + fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result { + let (response, position) = match self.streaming_state { StreamingState::Started { ref mut response, ref mut position, .. } => { (response, position) } @@ -235,6 +191,13 @@ impl Source for HttpSrc { } }; + if *position != offset { + return Err(FlowError::Error(error_msg!(SourceError::SeekFailed, + ["Got unexpected offset {}, expected {}", + offset, + position]))); + } + let size = try!(response.read(data).or_else(|err| { Err(FlowError::Error(error_msg!(SourceError::ReadFailed, ["Failed to read at {}: {}", offset, err.to_string()]))) @@ -245,6 +208,7 @@ impl Source for HttpSrc { } *position += size as u64; + Ok(size) } } diff --git a/src/rssink.c b/src/rssink.c index 32fab4f2..dfa0962f 100644 --- a/src/rssink.c +++ b/src/rssink.c @@ -246,15 +246,6 @@ gst_rs_sink_uri_handler_init (gpointer g_iface, gpointer iface_data) iface->set_uri = gst_rs_sink_uri_set_uri; } -void -gst_rs_sink_error (GstRsSink * sink, GQuark error_domain, gint error_code, - const gchar * message, const gchar * debug, const gchar * file, - const gchar * function, guint line) -{ - gst_element_message_full (GST_ELEMENT (sink), GST_MESSAGE_ERROR, error_domain, - error_code, g_strdup (message), g_strdup (debug), file, function, line); -} - gboolean gst_rs_sink_plugin_init (GstPlugin * plugin) { diff --git a/src/rssink.rs b/src/rssink.rs index 001ba308..6a142bc7 100644 --- a/src/rssink.rs +++ b/src/rssink.rs @@ -22,6 +22,8 @@ use std::ffi::{CStr, CString}; use std::slice; use std::ptr; +use std::sync::Mutex; + use url::Url; use utils::*; @@ -48,117 +50,82 @@ impl ToGError for SinkError { } } -#[derive(Debug)] -pub struct SinkController { - sink: *mut c_void, +pub struct SinkWrapper { + sink_raw: *mut c_void, + uri: Mutex<(Option, bool)>, + uri_validator: Box, + sink: Mutex>, } -impl SinkController { - fn new(sink: *mut c_void) -> SinkController { - SinkController { sink: sink } - } +pub trait Sink { + fn uri_validator(&self) -> Box; - pub fn error(&self, error: &ErrorMessage) { - extern "C" { - fn gst_rs_sink_error(sink: *mut c_void, - error_domain: u32, - error_code: i32, - message: *const c_char, - debug: *const c_char, - filename: *const c_char, - function: *const c_char, - line: u32); - } + fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage>; + fn stop(&mut self) -> Result<(), ErrorMessage>; - let ErrorMessage { error_domain, - error_code, - ref message, - ref debug, - filename, - function, - line } = *error; - - let message_cstr = message.as_ref().map(|m| CString::new(m.as_bytes()).unwrap()); - let message_ptr = message_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr()); - - let debug_cstr = debug.as_ref().map(|m| CString::new(m.as_bytes()).unwrap()); - let debug_ptr = debug_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr()); - - let file_cstr = CString::new(filename.as_bytes()).unwrap(); - let file_ptr = file_cstr.as_ptr(); - - let function_cstr = CString::new(function.as_bytes()).unwrap(); - let function_ptr = function_cstr.as_ptr(); - - unsafe { - gst_rs_sink_error(self.sink, - error_domain, - error_code, - message_ptr, - debug_ptr, - file_ptr, - function_ptr, - line); - } - } + fn render(&mut self, data: &[u8]) -> Result<(), FlowError>; } -pub trait Sink: Sync + Send { - fn get_controller(&self) -> &SinkController; - - // Called from any thread at any time - fn set_uri(&self, uri: Option) -> Result<(), UriError>; - fn get_uri(&self) -> Option; - - // Called from the streaming thread only - fn start(&self) -> Result<(), ErrorMessage>; - fn stop(&self) -> Result<(), ErrorMessage>; - fn render(&self, data: &[u8]) -> Result<(), FlowError>; +impl SinkWrapper { + fn new(sink_raw: *mut c_void, sink: Box) -> SinkWrapper { + SinkWrapper { + sink_raw: sink_raw, + uri: Mutex::new((None, false)), + uri_validator: sink.uri_validator(), + sink: Mutex::new(sink), + } + } } #[no_mangle] pub extern "C" fn sink_new(sink: *mut c_void, - create_instance: fn(controller: SinkController) -> Box) - -> *mut Box { - Box::into_raw(Box::new(create_instance(SinkController::new(sink)))) + create_instance: fn() -> Box) + -> *mut SinkWrapper { + Box::into_raw(Box::new(SinkWrapper::new(sink, create_instance()))) } #[no_mangle] -pub unsafe extern "C" fn sink_drop(ptr: *mut Box) { +pub unsafe extern "C" fn sink_drop(ptr: *mut SinkWrapper) { Box::from_raw(ptr); } #[no_mangle] -pub unsafe extern "C" fn sink_set_uri(ptr: *mut Box, +pub unsafe extern "C" fn sink_set_uri(ptr: *mut SinkWrapper, uri_ptr: *const c_char, cerr: *mut c_void) -> GBoolean { - let sink: &mut Box = &mut *ptr; + let wrap: &mut SinkWrapper = &mut *ptr; + let uri_storage = &mut wrap.uri.lock().unwrap(); + if uri_storage.1 { + UriError::new(UriErrorKind::BadState, Some("Already started".to_string())) + .into_gerror(cerr); + return GBoolean::False; + } + + uri_storage.0 = None; if uri_ptr.is_null() { - if let Err(err) = sink.set_uri(None) { - err.into_gerror(cerr); - GBoolean::False - } else { - GBoolean::True - } + GBoolean::True } else { let uri_str = CStr::from_ptr(uri_ptr).to_str().unwrap(); match Url::parse(uri_str) { Ok(uri) => { - if let Err(err) = sink.set_uri(Some(uri)) { + if let Err(err) = (*wrap.uri_validator)(&uri) { err.into_gerror(cerr); + GBoolean::False } else { + uri_storage.0 = Some(uri); + GBoolean::True } } Err(err) => { - let _ = sink.set_uri(None); UriError::new(UriErrorKind::BadUri, Some(format!("Failed to parse URI '{}': {}", uri_str, err))) .into_gerror(cerr); + GBoolean::False } } @@ -166,21 +133,23 @@ pub unsafe extern "C" fn sink_set_uri(ptr: *mut Box, } #[no_mangle] -pub unsafe extern "C" fn sink_get_uri(ptr: *const Box) -> *mut c_char { - let sink: &Box = &*ptr; +pub unsafe extern "C" fn sink_get_uri(ptr: *const SinkWrapper) -> *mut c_char { + let wrap: &SinkWrapper = &*ptr; + let uri_storage = &mut wrap.uri.lock().unwrap(); - match sink.get_uri() { - Some(uri) => CString::new(uri.into_string().into_bytes()).unwrap().into_raw(), + match uri_storage.0 { + Some(ref uri) => CString::new(uri.as_ref().as_bytes()).unwrap().into_raw(), None => ptr::null_mut(), } } #[no_mangle] -pub unsafe extern "C" fn sink_render(ptr: *mut Box, +pub unsafe extern "C" fn sink_render(ptr: *mut SinkWrapper, data_ptr: *const u8, data_len: usize) -> GstFlowReturn { - let sink: &mut Box = &mut *ptr; + let wrap: &mut SinkWrapper = &mut *ptr; + let sink = &mut wrap.sink.lock().unwrap(); let data = slice::from_raw_parts(data_ptr, data_len); match sink.render(data) { @@ -188,7 +157,7 @@ pub unsafe extern "C" fn sink_render(ptr: *mut Box, Err(flow_error) => { match flow_error { FlowError::NotNegotiated(ref msg) | - FlowError::Error(ref msg) => sink.get_controller().error(msg), + FlowError::Error(ref msg) => msg.post(wrap.sink_raw), _ => (), } flow_error.to_native() @@ -197,26 +166,45 @@ pub unsafe extern "C" fn sink_render(ptr: *mut Box, } #[no_mangle] -pub unsafe extern "C" fn sink_start(ptr: *mut Box) -> GBoolean { - let sink: &mut Box = &mut *ptr; +pub unsafe extern "C" fn sink_start(ptr: *mut SinkWrapper) -> GBoolean { + let wrap: &mut SinkWrapper = &mut *ptr; + let sink = &mut wrap.sink.lock().unwrap(); + let uri_storage = &mut wrap.uri.lock().unwrap(); - match sink.start() { - Ok(..) => GBoolean::True, + let (uri, started) = match **uri_storage { + (Some(ref uri), ref mut started) => (uri, started), + (None, _) => { + error_msg!(SinkError::OpenFailed, ["No URI given"]).post(wrap.sink_raw); + return GBoolean::False; + } + }; + + match sink.start(uri) { + Ok(..) => { + *started = true; + + GBoolean::True + } Err(ref msg) => { - sink.get_controller().error(msg); + msg.post(wrap.sink_raw); GBoolean::False } } } #[no_mangle] -pub unsafe extern "C" fn sink_stop(ptr: *mut Box) -> GBoolean { - let sink: &mut Box = &mut *ptr; +pub unsafe extern "C" fn sink_stop(ptr: *mut SinkWrapper) -> GBoolean { + let wrap: &mut SinkWrapper = &mut *ptr; + let sink = &mut wrap.sink.lock().unwrap(); + let uri_storage = &mut wrap.uri.lock().unwrap(); match sink.stop() { - Ok(..) => GBoolean::True, + Ok(..) => { + uri_storage.1 = false; + GBoolean::True + } Err(ref msg) => { - sink.get_controller().error(msg); + msg.post(wrap.sink_raw); GBoolean::False } } diff --git a/src/rssource.c b/src/rssource.c index a0cc6b79..022675af 100644 --- a/src/rssource.c +++ b/src/rssource.c @@ -38,7 +38,7 @@ extern void *source_new (GstRsSrc * source, void *create_instance); extern void source_drop (void *rssource); extern GstFlowReturn source_fill (void *rssource, uint64_t offset, void *data, size_t * data_len); -extern gboolean source_do_seek (void *rssource, uint64_t start, uint64_t stop); +extern gboolean source_seek (void *rssource, uint64_t start, uint64_t stop); extern gboolean source_set_uri (void *rssource, const char *uri, GError ** err); extern char *source_get_uri (void *rssource); extern uint64_t source_get_size (void *rssource); @@ -238,7 +238,7 @@ gst_rs_src_do_seek (GstBaseSrc * basesrc, GstSegment * segment) GstRsSrc *src = GST_RS_SRC (basesrc); gboolean ret; - ret = source_do_seek (src->instance, segment->start, segment->stop); + ret = source_seek (src->instance, segment->start, segment->stop); if (!ret) return FALSE; @@ -291,15 +291,6 @@ gst_rs_src_uri_handler_init (gpointer g_iface, gpointer iface_data) iface->set_uri = gst_rs_src_uri_set_uri; } -void -gst_rs_source_error (GstRsSrc * src, GQuark error_domain, gint error_code, - const gchar * message, const gchar * debug, const gchar * file, - const gchar * function, guint line) -{ - gst_element_message_full (GST_ELEMENT (src), GST_MESSAGE_ERROR, error_domain, - error_code, g_strdup (message), g_strdup (debug), file, function, line); -} - gboolean gst_rs_source_plugin_init (GstPlugin * plugin) { diff --git a/src/rssource.rs b/src/rssource.rs index 40423378..4ed473c2 100644 --- a/src/rssource.rs +++ b/src/rssource.rs @@ -21,6 +21,8 @@ use std::ffi::{CStr, CString}; use std::slice; use std::ptr; +use std::sync::Mutex; + use url::Url; use utils::*; @@ -47,122 +49,85 @@ impl ToGError for SourceError { } } -#[derive(Debug)] -pub struct SourceController { - source: *mut c_void, +pub struct SourceWrapper { + source_raw: *mut c_void, + uri: Mutex<(Option, bool)>, + uri_validator: Box, + source: Mutex>, } -impl SourceController { - fn new(source: *mut c_void) -> SourceController { - SourceController { source: source } - } +pub trait Source { + fn uri_validator(&self) -> Box; - pub fn error(&self, error: &ErrorMessage) { - extern "C" { - fn gst_rs_source_error(source: *mut c_void, - error_domain: u32, - error_code: i32, - message: *const c_char, - debug: *const c_char, - filename: *const c_char, - function: *const c_char, - line: u32); - } - - let ErrorMessage { error_domain, - error_code, - ref message, - ref debug, - filename, - function, - line } = *error; - - let message_cstr = message.as_ref().map(|m| CString::new(m.as_bytes()).unwrap()); - let message_ptr = message_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr()); - - let debug_cstr = debug.as_ref().map(|m| CString::new(m.as_bytes()).unwrap()); - let debug_ptr = debug_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr()); - - let file_cstr = CString::new(filename.as_bytes()).unwrap(); - let file_ptr = file_cstr.as_ptr(); - - let function_cstr = CString::new(function.as_bytes()).unwrap(); - let function_ptr = function_cstr.as_ptr(); - - unsafe { - gst_rs_source_error(self.source, - error_domain, - error_code, - message_ptr, - debug_ptr, - file_ptr, - function_ptr, - line); - } - } -} - -pub trait Source: Sync + Send { - fn get_controller(&self) -> &SourceController; - - // Called from any thread at any time - fn set_uri(&self, uri: Option) -> Result<(), UriError>; - fn get_uri(&self) -> Option; - - // Called from any thread between start/stop fn is_seekable(&self) -> bool; - - // Called from the streaming thread only - fn start(&self) -> Result<(), ErrorMessage>; - fn stop(&self) -> Result<(), ErrorMessage>; - fn fill(&self, offset: u64, data: &mut [u8]) -> Result; - fn do_seek(&self, start: u64, stop: u64) -> Result<(), ErrorMessage>; fn get_size(&self) -> u64; + + fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage>; + fn stop(&mut self) -> Result<(), ErrorMessage>; + fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result; + fn seek(&mut self, start: u64, stop: u64) -> Result<(), ErrorMessage>; +} + +impl SourceWrapper { + fn new(source_raw: *mut c_void, source: Box) -> SourceWrapper { + SourceWrapper { + source_raw: source_raw, + uri: Mutex::new((None, false)), + uri_validator: source.uri_validator(), + source: Mutex::new(source), + } + } } #[no_mangle] pub extern "C" fn source_new(source: *mut c_void, - create_instance: fn(controller: SourceController) -> Box) - -> *mut Box { - Box::into_raw(Box::new(create_instance(SourceController::new(source)))) + create_instance: fn() -> Box) + -> *mut SourceWrapper { + Box::into_raw(Box::new(SourceWrapper::new(source, create_instance()))) } #[no_mangle] -pub unsafe extern "C" fn source_drop(ptr: *mut Box) { +pub unsafe extern "C" fn source_drop(ptr: *mut SourceWrapper) { Box::from_raw(ptr); } #[no_mangle] -pub unsafe extern "C" fn source_set_uri(ptr: *mut Box, +pub unsafe extern "C" fn source_set_uri(ptr: *mut SourceWrapper, uri_ptr: *const c_char, cerr: *mut c_void) -> GBoolean { - let source: &mut Box = &mut *ptr; + let wrap: &mut SourceWrapper = &mut *ptr; + let uri_storage = &mut wrap.uri.lock().unwrap(); + if uri_storage.1 { + UriError::new(UriErrorKind::BadState, Some("Already started".to_string())) + .into_gerror(cerr); + return GBoolean::False; + } + + uri_storage.0 = None; if uri_ptr.is_null() { - if let Err(err) = source.set_uri(None) { - err.into_gerror(cerr); - GBoolean::False - } else { - GBoolean::True - } + GBoolean::True } else { let uri_str = CStr::from_ptr(uri_ptr).to_str().unwrap(); match Url::parse(uri_str) { Ok(uri) => { - if let Err(err) = source.set_uri(Some(uri)) { + if let Err(err) = (*wrap.uri_validator)(&uri) { err.into_gerror(cerr); + GBoolean::False } else { + uri_storage.0 = Some(uri); + GBoolean::True } } Err(err) => { - let _ = source.set_uri(None); UriError::new(UriErrorKind::BadUri, Some(format!("Failed to parse URI '{}': {}", uri_str, err))) .into_gerror(cerr); + GBoolean::False } } @@ -170,23 +135,84 @@ pub unsafe extern "C" fn source_set_uri(ptr: *mut Box, } #[no_mangle] -pub unsafe extern "C" fn source_get_uri(ptr: *mut Box) -> *mut c_char { - let source: &mut Box = &mut *ptr; +pub unsafe extern "C" fn source_get_uri(ptr: *mut SourceWrapper) -> *mut c_char { + let wrap: &SourceWrapper = &*ptr; + let uri_storage = &mut wrap.uri.lock().unwrap(); - match source.get_uri() { - Some(uri) => CString::new(uri.into_string().into_bytes()).unwrap().into_raw(), + match uri_storage.0 { + Some(ref uri) => CString::new(uri.as_ref().as_bytes()).unwrap().into_raw(), None => ptr::null_mut(), } } #[no_mangle] -pub unsafe extern "C" fn source_fill(ptr: *mut Box, +pub unsafe extern "C" fn source_is_seekable(ptr: *const SourceWrapper) -> GBoolean { + let wrap: &SourceWrapper = &*ptr; + let source = &wrap.source.lock().unwrap(); + + GBoolean::from_bool(source.is_seekable()) +} + +#[no_mangle] +pub unsafe extern "C" fn source_get_size(ptr: *const SourceWrapper) -> u64 { + let wrap: &SourceWrapper = &*ptr; + let source = &wrap.source.lock().unwrap(); + + source.get_size() +} + +#[no_mangle] +pub unsafe extern "C" fn source_start(ptr: *mut SourceWrapper) -> GBoolean { + let wrap: &mut SourceWrapper = &mut *ptr; + let source = &mut wrap.source.lock().unwrap(); + let uri_storage = &mut wrap.uri.lock().unwrap(); + + let (uri, started) = match **uri_storage { + (Some(ref uri), ref mut started) => (uri, started), + (None, _) => { + error_msg!(SourceError::OpenFailed, ["No URI given"]).post(wrap.source_raw); + return GBoolean::False; + } + }; + + match source.start(uri) { + Ok(..) => { + *started = true; + GBoolean::True + } + Err(ref msg) => { + msg.post(wrap.source_raw); + GBoolean::False + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn source_stop(ptr: *mut SourceWrapper) -> GBoolean { + let wrap: &mut SourceWrapper = &mut *ptr; + let source = &mut wrap.source.lock().unwrap(); + let uri_storage = &mut wrap.uri.lock().unwrap(); + + match source.stop() { + Ok(..) => { + uri_storage.1 = false; + GBoolean::True + } + Err(ref msg) => { + msg.post(wrap.source_raw); + GBoolean::False + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn source_fill(ptr: *mut SourceWrapper, offset: u64, data_ptr: *mut u8, data_len_ptr: *mut usize) -> GstFlowReturn { - let source: &mut Box = &mut *ptr; - + let wrap: &mut SourceWrapper = &mut *ptr; + let source = &mut wrap.source.lock().unwrap(); let mut data_len: &mut usize = &mut *data_len_ptr; let mut data = slice::from_raw_parts_mut(data_ptr, *data_len); @@ -198,7 +224,7 @@ pub unsafe extern "C" fn source_fill(ptr: *mut Box, Err(flow_error) => { match flow_error { FlowError::NotNegotiated(ref msg) | - FlowError::Error(ref msg) => source.get_controller().error(msg), + FlowError::Error(ref msg) => msg.post(wrap.source_raw), _ => (), } flow_error.to_native() @@ -207,53 +233,14 @@ pub unsafe extern "C" fn source_fill(ptr: *mut Box, } #[no_mangle] -pub unsafe extern "C" fn source_get_size(ptr: *const Box) -> u64 { - let source: &Box = &*ptr; +pub unsafe extern "C" fn source_seek(ptr: *mut SourceWrapper, start: u64, stop: u64) -> GBoolean { + let wrap: &mut SourceWrapper = &mut *ptr; + let source = &mut wrap.source.lock().unwrap(); - source.get_size() -} - -#[no_mangle] -pub unsafe extern "C" fn source_start(ptr: *mut Box) -> GBoolean { - let source: &mut Box = &mut *ptr; - - match source.start() { + match source.seek(start, stop) { Ok(..) => GBoolean::True, Err(ref msg) => { - source.get_controller().error(msg); - GBoolean::False - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn source_stop(ptr: *mut Box) -> GBoolean { - let source: &mut Box = &mut *ptr; - - match source.stop() { - Ok(..) => GBoolean::True, - Err(ref msg) => { - source.get_controller().error(msg); - GBoolean::False - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn source_is_seekable(ptr: *const Box) -> GBoolean { - let source: &Box = &*ptr; - - GBoolean::from_bool(source.is_seekable()) -} - -#[no_mangle] -pub unsafe extern "C" fn source_do_seek(ptr: *mut Box, start: u64, stop: u64) -> GBoolean { - let source: &mut Box = &mut *ptr; - - match source.do_seek(start, stop) { - Ok(..) => GBoolean::True, - Err(ref msg) => { - source.get_controller().error(msg); + msg.post(wrap.source_raw); GBoolean::False } }