diff --git a/gst-plugin-file/src/filesink.rs b/gst-plugin-file/src/filesink.rs index 2690b719..84524d92 100644 --- a/gst-plugin-file/src/filesink.rs +++ b/gst-plugin-file/src/filesink.rs @@ -80,18 +80,24 @@ impl Sink for FileSink { let location = try!(uri.to_file_path() .or_else(|_| { + error!(self.logger, "Unsupported file URI '{}'", uri.as_str()); Err(error_msg!(SinkError::Failure, ["Unsupported file URI '{}'", uri.as_str()])) })); let file = try!(File::create(location.as_path()).or_else(|err| { + error!(self.logger, + "Could not open file for writing: {}", + err.to_string()); Err(error_msg!(SinkError::OpenFailed, ["Could not open file for writing '{}': {}", location.to_str().unwrap_or("Non-UTF8 path"), err.to_string()])) })); + debug!(self.logger, "Opened file {:?}", file); + self.streaming_state = StreamingState::Started { file: file, position: 0, @@ -107,6 +113,11 @@ impl Sink for FileSink { } fn render(&mut self, buffer: &Buffer) -> Result<(), FlowError> { + // FIXME: Because we borrow streaming state mutably below + let logger = self.logger.clone(); + + trace!(logger, "Rendering {:?}", buffer); + let (file, position) = match self.streaming_state { StreamingState::Started { ref mut file, ref mut position } => (file, position), StreamingState::Stopped => { @@ -124,6 +135,7 @@ impl Sink for FileSink { let data = map.as_slice(); try!(file.write_all(data).or_else(|err| { + error!(logger, "Failed to write: {}", err); Err(FlowError::Error(error_msg!(SinkError::WriteFailed, ["Failed to write: {}", err]))) })); diff --git a/gst-plugin-file/src/filesrc.rs b/gst-plugin-file/src/filesrc.rs index 1e37a901..0e95f4e2 100644 --- a/gst-plugin-file/src/filesrc.rs +++ b/gst-plugin-file/src/filesrc.rs @@ -92,17 +92,23 @@ impl Source for FileSrc { let location = try!(uri.to_file_path() .or_else(|_| { + error!(self.logger, "Unsupported file URI '{}'", uri.as_str()); Err(error_msg!(SourceError::Failure, ["Unsupported file URI '{}'", uri.as_str()])) })); let file = try!(File::open(location.as_path()).or_else(|err| { + error!(self.logger, + "Could not open file for reading: {}", + err.to_string()); Err(error_msg!(SourceError::OpenFailed, ["Could not open file for reading '{}': {}", location.to_str().unwrap_or("Non-UTF8 path"), err.to_string()])) })); + debug!(self.logger, "Opened file {:?}", file); + self.streaming_state = StreamingState::Started { file: file, position: 0, @@ -118,6 +124,9 @@ impl Source for FileSrc { } fn fill(&mut self, offset: u64, _: u32, buffer: &mut Buffer) -> Result<(), FlowError> { + // FIXME: Because we borrow streaming state mutably below + let logger = self.logger.clone(); + let (file, position) = match self.streaming_state { StreamingState::Started { ref mut file, ref mut position } => (file, position), StreamingState::Stopped => { @@ -127,6 +136,7 @@ impl Source for FileSrc { if *position != offset { try!(file.seek(SeekFrom::Start(offset)).or_else(|err| { + error!(logger, "Failed to seek to {}: {:?}", offset, err); Err(FlowError::Error(error_msg!(SourceError::SeekFailed, ["Failed to seek to {}: {}", offset, @@ -147,6 +157,7 @@ impl Source for FileSrc { let data = map.as_mut_slice(); try!(file.read(data).or_else(|err| { + error!(logger, "Failed to read: {:?}", err); Err(FlowError::Error(error_msg!(SourceError::ReadFailed, ["Failed to read at {}: {}", offset, diff --git a/gst-plugin-http/src/httpsrc.rs b/gst-plugin-http/src/httpsrc.rs index fc1de1f9..5d747a19 100644 --- a/gst-plugin-http/src/httpsrc.rs +++ b/gst-plugin-http/src/httpsrc.rs @@ -83,12 +83,16 @@ impl HttpSrc { } } + debug!(self.logger, "Doing new request {:?}", req); + let response = try!(req.send().or_else(|err| { + error!(self.logger, "Request failed: {:?}", err); Err(error_msg!(SourceError::ReadFailed, ["Failed to fetch {}: {}", uri, err.to_string()])) })); if !response.status().is_success() { + error!(self.logger, "Request status failed: {:?}", response); return Err(error_msg!(SourceError::ReadFailed, ["Failed to fetch {}: {}", uri, response.status()])); } @@ -119,6 +123,8 @@ impl HttpSrc { ["Failed to seek to {}: Got {}", start, position])); } + debug!(self.logger, "Request successful: {:?}", response); + Ok(StreamingState::Started { uri: uri, response: response, @@ -193,6 +199,8 @@ impl Source for HttpSrc { } fn fill(&mut self, offset: u64, _: u32, buffer: &mut Buffer) -> Result<(), FlowError> { + let logger = self.logger.clone(); + let (response, position) = match self.streaming_state { StreamingState::Started { ref mut response, ref mut position, .. } => { (response, position) @@ -221,6 +229,7 @@ impl Source for HttpSrc { let data = map.as_mut_slice(); try!(response.read(data).or_else(|err| { + error!(logger, "Failed to read: {:?}", err); Err(FlowError::Error(error_msg!(SourceError::ReadFailed, ["Failed to read at {}: {}", offset, diff --git a/gst-plugin/Cargo.toml b/gst-plugin/Cargo.toml index 36c48929..f25849f0 100644 --- a/gst-plugin/Cargo.toml +++ b/gst-plugin/Cargo.toml @@ -10,7 +10,8 @@ license = "LGPL-2.1+" libc = "0.2" url = "1.1" bitflags = "0.7" -slog = "1.3" +slog = { version = "1.3", features = ["max_level_trace"] } +lazy_static = "0.2" [build-dependencies] gcc = "0.3" diff --git a/gst-plugin/src/adapter.rs b/gst-plugin/src/adapter.rs index dea087f7..2ae4c992 100644 --- a/gst-plugin/src/adapter.rs +++ b/gst-plugin/src/adapter.rs @@ -17,8 +17,20 @@ // use buffer::*; +use log::*; use std::collections::VecDeque; use std::cmp; +use slog::*; + +lazy_static! { + static ref LOGGER: Logger = { + Logger::root(GstDebugDrain::new(None, + "rsadapter", + 0, + "Rust buffer adapter"), + None) + }; +} #[derive(Debug)] pub struct Adapter { @@ -47,6 +59,11 @@ impl Adapter { let size = buffer.get_size(); self.size += size; + trace!(LOGGER, + "Storing {:?} of size {}, now have size {}", + buffer, + size, + self.size); self.deque.push_back(buffer.into_read_mapped_buffer().unwrap()); } @@ -55,6 +72,7 @@ impl Adapter { self.size = 0; self.skip = 0; self.scratch.clear(); + trace!(LOGGER, "Cleared adapter"); } pub fn get_available(&self) -> usize { @@ -66,10 +84,18 @@ impl Adapter { let mut left = size; let mut idx = 0; + trace!(LOGGER, "Copying {} bytes", size); + for item in deque { let data_item = item.as_slice(); let to_copy = cmp::min(left, data_item.len() - skip); + trace!(LOGGER, + "Copying {} bytes from {:?}, {} more to go", + to_copy, + item, + left - to_copy); + data[idx..idx + to_copy].copy_from_slice(&data_item[skip..skip + to_copy]); skip = 0; idx += to_copy; @@ -85,9 +111,14 @@ impl Adapter { let size = data.len(); if self.size < size { + debug!(LOGGER, + "Peeking {} bytes into, not enough data: have {}", + size, + self.size); return Err(AdapterError::NotEnoughData); } + trace!(LOGGER, "Peeking {} bytes into", size); if size == 0 { return Ok(()); } @@ -98,6 +129,10 @@ impl Adapter { pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> { if self.size < size { + debug!(LOGGER, + "Peeking {} bytes, not enough data: have {}", + size, + self.size); return Err(AdapterError::NotEnoughData); } @@ -106,11 +141,14 @@ impl Adapter { } if let Some(front) = self.deque.front() { + trace!(LOGGER, "Peeking {} bytes, subbuffer of first", size); if front.get_size() - self.skip >= size { return Ok(&front.as_slice()[self.skip..self.skip + size]); } } + trace!(LOGGER, "Peeking {} bytes, copy to scratch", size); + self.scratch.truncate(0); self.scratch.reserve(size); { @@ -123,6 +161,10 @@ impl Adapter { pub fn get_buffer(&mut self, size: usize) -> Result { if self.size < size { + debug!(LOGGER, + "Get buffer of {} bytes, not enough data: have {}", + size, + self.size); return Err(AdapterError::NotEnoughData); } @@ -132,6 +174,7 @@ impl Adapter { let sub = self.deque.front().and_then(|front| { if front.get_size() - self.skip >= size { + trace!(LOGGER, "Get buffer of {} bytes, subbuffer of first", size); let new = front.get_buffer().copy_region(self.skip, Some(size)).unwrap(); Some(new) } else { @@ -144,6 +187,7 @@ impl Adapter { return Ok(s); } + trace!(LOGGER, "Get buffer of {} bytes, copy into new buffer", size); let mut new = Buffer::new_with_size(size).unwrap(); { let mut map = new.map_readwrite().unwrap(); @@ -156,6 +200,10 @@ impl Adapter { pub fn flush(&mut self, size: usize) -> Result<(), AdapterError> { if self.size < size { + debug!(LOGGER, + "Flush {} bytes, not enough data: have {}", + size, + self.size); return Err(AdapterError::NotEnoughData); } @@ -163,16 +211,26 @@ impl Adapter { return Ok(()); } + trace!(LOGGER, "Flushing {} bytes, have {}", size, self.size); + let mut left = size; while left > 0 { let front_size = self.deque.front().unwrap().get_size() - self.skip; if front_size <= left { + trace!(LOGGER, + "Flushing whole {:?}, {} more to go", + self.deque.front(), + left - front_size); self.deque.pop_front(); self.size -= front_size; self.skip = 0; left -= front_size; } else { + trace!(LOGGER, + "Flushing partial {:?}, {} more left", + self.deque.front(), + front_size - left); self.skip += left; self.size -= left; left = 0; diff --git a/gst-plugin/src/buffer.rs b/gst-plugin/src/buffer.rs index 678a9b14..965b0699 100644 --- a/gst-plugin/src/buffer.rs +++ b/gst-plugin/src/buffer.rs @@ -581,7 +581,7 @@ impl Drop for Buffer { fn gst_mini_object_unref(obj: *mut c_void); } - if self.owned { + if self.owned && !self.raw.is_null() { unsafe { gst_mini_object_unref(self.raw) } } } @@ -673,8 +673,10 @@ impl Drop for ReadMappedBuffer { fn gst_buffer_unmap(buffer: *mut c_void, map: *mut GstMapInfo); }; - unsafe { - gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo); + if !self.buffer.raw.is_null() { + unsafe { + gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo); + } } } } @@ -706,8 +708,10 @@ impl Drop for ReadWriteMappedBuffer { fn gst_buffer_unmap(buffer: *mut c_void, map: *mut GstMapInfo); }; - unsafe { - gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo); + if !self.buffer.raw.is_null() { + unsafe { + gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo); + } } } } diff --git a/gst-plugin/src/demuxer.c b/gst-plugin/src/demuxer.c index cf8bca12..e5047cd4 100644 --- a/gst-plugin/src/demuxer.c +++ b/gst-plugin/src/demuxer.c @@ -132,6 +132,8 @@ gst_rs_demuxer_init (GstRsDemuxer * demuxer, GstRsDemuxerClass * klass) gst_element_add_pad (GST_ELEMENT (demuxer), demuxer->sinkpad); demuxer->flow_combiner = gst_flow_combiner_new (); + + GST_DEBUG_OBJECT (demuxer, "Instantiating"); } static void @@ -139,6 +141,7 @@ gst_rs_demuxer_finalize (GObject * object) { GstRsDemuxer *demuxer = GST_RS_DEMUXER (object); + GST_DEBUG_OBJECT (demuxer, "Finalizing"); gst_flow_combiner_free (demuxer->flow_combiner); demuxer_drop (demuxer->instance); @@ -158,8 +161,12 @@ gst_rs_demuxer_sink_activate (GstPad * pad, GstObject * parent) return FALSE; } // TODO - //if (gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE)) + //if (gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE)) { + // GST_DEBUG_OBJECT (demuxer, "Activating in PULL mode"); // mode = GST_PAD_MODE_PULL; + //} else { + //GST_DEBUG_OBJECT (demuxer, "Activating in PUSH mode"); + //} gst_query_unref (query); demuxer->upstream_size = -1; @@ -179,7 +186,11 @@ gst_rs_demuxer_sink_activate_mode (GstPad * pad, GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); gboolean res = TRUE; + GST_DEBUG_OBJECT (demuxer, "%s pad in %s mode", + (active ? "Activating" : "Deactivating"), gst_pad_mode_get_name (mode)); + if (active) { + GST_DEBUG_OBJECT (demuxer, "Starting"); if (!demuxer_start (demuxer->instance, demuxer->upstream_size, mode == GST_PAD_MODE_PULL ? TRUE : FALSE)) { res = FALSE; @@ -202,8 +213,16 @@ static GstFlowReturn gst_rs_demuxer_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) { GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + GstFlowReturn res; - return demuxer_handle_buffer (demuxer->instance, buf); + GST_TRACE_OBJECT (demuxer, "Handling buffer %p", buf); + + res = demuxer_handle_buffer (demuxer->instance, buf); + + GST_TRACE_OBJECT (demuxer, "Handling buffer returned %s", + gst_flow_get_name (res)); + + return res; } static gboolean @@ -220,6 +239,7 @@ gst_rs_demuxer_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) break; } case GST_EVENT_EOS: + GST_DEBUG_OBJECT (demuxer, "Got EOS"); demuxer_end_of_stream (demuxer->instance); res = gst_pad_event_default (pad, parent, event); break; @@ -245,6 +265,8 @@ gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, GstQuery * query) gint64 position; if (demuxer_get_position (demuxer->instance, &position)) { + GST_DEBUG_OBJECT (demuxer, "Returning position %" GST_TIME_FORMAT, + GST_TIME_ARGS (position)); gst_query_set_position (query, format, position); res = TRUE; } else { @@ -261,6 +283,8 @@ gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, GstQuery * query) gint64 duration; if (demuxer_get_duration (demuxer->instance, &duration)) { + GST_DEBUG_OBJECT (demuxer, "Returning duration %" GST_TIME_FORMAT, + GST_TIME_ARGS (duration)); gst_query_set_duration (query, format, duration); res = TRUE; } else { @@ -302,6 +326,10 @@ gst_rs_demuxer_change_state (GstElement * element, GstStateChange transition) GstRsDemuxer *demuxer = GST_RS_DEMUXER (element); GstStateChangeReturn result; + GST_DEBUG_OBJECT (demuxer, "Change state %s to %s", + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + switch (transition) { case GST_STATE_CHANGE_READY_TO_PAUSED: demuxer->offset = 0; @@ -325,6 +353,7 @@ gst_rs_demuxer_change_state (GstElement * element, GstStateChange transition) guint i; /* Ignore stop failures */ + GST_DEBUG_OBJECT (demuxer, "Stopping"); demuxer_stop (demuxer->instance); gst_flow_combiner_clear (demuxer->flow_combiner); @@ -362,6 +391,9 @@ gst_rs_demuxer_add_stream (GstRsDemuxer * demuxer, guint32 index, g_assert (demuxer->srcpads[index] == NULL); + GST_DEBUG_OBJECT (demuxer, "Adding stream %u with format %s and stream id %s", + index, format, stream_id); + templ = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (demuxer), "src_%u"); @@ -398,6 +430,8 @@ gst_rs_demuxer_add_stream (GstRsDemuxer * demuxer, guint32 index, void gst_rs_demuxer_added_all_streams (GstRsDemuxer * demuxer) { + GST_DEBUG_OBJECT (demuxer, "No more pads"); + gst_element_no_more_pads (GST_ELEMENT (demuxer)); demuxer->group_id = gst_util_group_id_next (); } @@ -411,6 +445,8 @@ gst_rs_demuxer_stream_format_changed (GstRsDemuxer * demuxer, guint32 index, g_assert (demuxer->srcpads[index] != NULL); + GST_DEBUG_OBJECT (demuxer, "Format changed for stream %u: %s", index, format); + caps = gst_caps_from_string (format); event = gst_event_new_caps (caps); gst_caps_unref (caps); @@ -426,6 +462,8 @@ gst_rs_demuxer_stream_eos (GstRsDemuxer * demuxer, guint32 index) g_assert (index == -1 || demuxer->srcpads[index] != NULL); + GST_DEBUG_OBJECT (demuxer, "EOS for stream %u", index); + event = gst_event_new_eos (); if (index == -1) { gint i; @@ -450,8 +488,12 @@ gst_rs_demuxer_stream_push_buffer (GstRsDemuxer * demuxer, guint32 index, g_assert (demuxer->srcpads[index] != NULL); + GST_DEBUG_OBJECT (demuxer, "Pushing buffer %p for pad %u", buffer, index); res = gst_pad_push (demuxer->srcpads[index], buffer); + GST_DEBUG_OBJECT (demuxer, "Pushed buffer returned: %s", + gst_flow_get_name (res)); res = gst_flow_combiner_update_flow (demuxer->flow_combiner, res); + GST_DEBUG_OBJECT (demuxer, "Combined return: %s", gst_flow_get_name (res)); return res; } @@ -461,6 +503,7 @@ gst_rs_demuxer_remove_all_streams (GstRsDemuxer * demuxer) { guint i; + GST_DEBUG_OBJECT (demuxer, "Removing all streams"); gst_flow_combiner_clear (demuxer->flow_combiner); for (i = 0; i < G_N_ELEMENTS (demuxer->srcpads); i++) { @@ -475,7 +518,7 @@ gst_rs_demuxer_init_class (gpointer data) { demuxers = g_hash_table_new (g_direct_hash, g_direct_equal); GST_DEBUG_CATEGORY_INIT (gst_rs_demuxer_debug, "rsdemux", 0, - "rsdemux element"); + "Rust demuxer base class"); parent_class = g_type_class_ref (GST_TYPE_ELEMENT); } @@ -505,6 +548,15 @@ gst_rs_demuxer_register (GstPlugin * plugin, const gchar * name, g_once (&gonce, gst_rs_demuxer_init_class, NULL); + GST_DEBUG ("Registering for %" GST_PTR_FORMAT ": %s", plugin, name); + GST_DEBUG (" long name: %s", long_name); + GST_DEBUG (" description: %s", description); + GST_DEBUG (" classification: %s", classification); + GST_DEBUG (" author: %s", author); + GST_DEBUG (" rank: %d", rank); + GST_DEBUG (" input formats: %s", input_format); + GST_DEBUG (" output formats: %s", output_formats); + data = g_new0 (ElementData, 1); data->long_name = g_strdup (long_name); data->description = g_strdup (description); diff --git a/gst-plugin/src/demuxer.rs b/gst-plugin/src/demuxer.rs index 0078053d..90d1e21f 100644 --- a/gst-plugin/src/demuxer.rs +++ b/gst-plugin/src/demuxer.rs @@ -27,9 +27,12 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::u32; use std::u64; +use slog::*; + use utils::*; use error::*; use buffer::*; +use log::*; use plugin::Plugin; pub type StreamIndex = u32; @@ -92,6 +95,7 @@ impl Stream { pub struct DemuxerWrapper { raw: *mut c_void, + logger: Logger, demuxer: Mutex>, panicked: AtomicBool, } @@ -100,6 +104,11 @@ impl DemuxerWrapper { fn new(raw: *mut c_void, demuxer: Box) -> DemuxerWrapper { DemuxerWrapper { raw: raw, + logger: Logger::root(GstDebugDrain::new(Some(unsafe { &Element::new(raw) }), + "rsdemux", + 0, + "Rust demuxer base class"), + None), demuxer: Mutex::new(demuxer), panicked: AtomicBool::new(false), } @@ -108,6 +117,11 @@ impl DemuxerWrapper { fn start(&self, upstream_size: u64, random_access: bool) -> bool { let demuxer = &mut self.demuxer.lock().unwrap(); + debug!(self.logger, + "Starting with upstream size {} and random access {}", + upstream_size, + random_access); + let upstream_size = if upstream_size == u64::MAX { None } else { @@ -115,8 +129,12 @@ impl DemuxerWrapper { }; match demuxer.start(upstream_size, random_access) { - Ok(..) => true, + Ok(..) => { + trace!(self.logger, "Successfully started"); + true + } Err(ref msg) => { + error!(self.logger, "Failed to start: {:?}", msg); self.post_message(msg); false } @@ -126,9 +144,15 @@ impl DemuxerWrapper { fn stop(&self) -> bool { let demuxer = &mut self.demuxer.lock().unwrap(); + debug!(self.logger, "Stopping"); + match demuxer.stop() { - Ok(..) => true, + Ok(..) => { + trace!(self.logger, "Successfully stop"); + true + } Err(ref msg) => { + error!(self.logger, "Failed to stop: {:?}", msg); self.post_message(msg); false } @@ -138,7 +162,10 @@ impl DemuxerWrapper { fn is_seekable(&self) -> bool { let demuxer = &self.demuxer.lock().unwrap(); - demuxer.is_seekable() + let seekable = demuxer.is_seekable(); + debug!(self.logger, "Seekable {}", seekable); + + seekable } @@ -147,10 +174,12 @@ impl DemuxerWrapper { match demuxer.get_position() { None => { + trace!(self.logger, "Got no position"); *position = u64::MAX; GBoolean::False } Some(pos) => { + trace!(self.logger, "Returning position {}", pos); *position = pos; GBoolean::True } @@ -158,16 +187,18 @@ impl DemuxerWrapper { } - fn get_duration(&self, position: &mut u64) -> GBoolean { + fn get_duration(&self, duration: &mut u64) -> GBoolean { let demuxer = &self.demuxer.lock().unwrap(); match demuxer.get_duration() { None => { - *position = u64::MAX; + trace!(self.logger, "Got no duration"); + *duration = u64::MAX; GBoolean::False } - Some(pos) => { - *position = pos; + Some(dur) => { + trace!(self.logger, "Returning duration {}", dur); + *duration = dur; GBoolean::True } } @@ -180,12 +211,15 @@ impl DemuxerWrapper { let stop = if stop == u64::MAX { None } else { Some(stop) }; + debug!(self.logger, "Seeking to {:?}-{:?}", start, stop); + let res = { let mut demuxer = &mut self.demuxer.lock().unwrap(); match demuxer.seek(start, stop) { Ok(res) => res, Err(ref msg) => { + error!(self.logger, "Failed to seek: {:?}", msg); self.post_message(msg); return false; } @@ -193,12 +227,17 @@ impl DemuxerWrapper { }; match res { - SeekResult::TooEarly => false, + SeekResult::TooEarly => { + debug!(self.logger, "Seeked too early"); + false + } SeekResult::Ok(off) => { + trace!(self.logger, "Seeked successfully"); *offset = off; true } SeekResult::Eos => { + debug!(self.logger, "Seeked after EOS"); *offset = u64::MAX; unsafe { @@ -231,9 +270,12 @@ impl DemuxerWrapper { let mut res = { let mut demuxer = &mut self.demuxer.lock().unwrap(); + trace!(self.logger, "Handling buffer {:?}", buffer); + match demuxer.handle_buffer(Some(buffer)) { Ok(res) => res, Err(flow_error) => { + error!(self.logger, "Failed handling buffer: {:?}", flow_error); match flow_error { FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => self.post_message(msg), @@ -246,6 +288,8 @@ impl DemuxerWrapper { // Loop until AllEos, NeedMoreData or error when pushing downstream loop { + trace!(self.logger, "Handled {:?}", res); + match res { HandleBufferResult::NeedMoreData => { return GstFlowReturn::Ok; @@ -306,11 +350,14 @@ impl DemuxerWrapper { } }; + trace!(self.logger, "Calling again"); + res = { let mut demuxer = &mut self.demuxer.lock().unwrap(); match demuxer.handle_buffer(None) { Ok(res) => res, Err(flow_error) => { + error!(self.logger, "Failed calling again: {:?}", flow_error); match flow_error { FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => self.post_message(msg), @@ -326,9 +373,11 @@ impl DemuxerWrapper { fn end_of_stream(&self) { let mut demuxer = &mut self.demuxer.lock().unwrap(); + debug!(self.logger, "End of stream"); match demuxer.end_of_stream() { Ok(_) => (), Err(ref msg) => { + error!(self.logger, "Failed end of stream: {:?}", msg); self.post_message(msg); } } diff --git a/gst-plugin/src/lib.rs b/gst-plugin/src/lib.rs index 4e3367aa..adc84534 100644 --- a/gst-plugin/src/lib.rs +++ b/gst-plugin/src/lib.rs @@ -22,6 +22,8 @@ extern crate url; extern crate bitflags; #[macro_use] extern crate slog; +#[macro_use] +extern crate lazy_static; #[macro_use] pub mod utils; diff --git a/gst-plugin/src/sink.c b/gst-plugin/src/sink.c index 13f52fb3..fd205849 100644 --- a/gst-plugin/src/sink.c +++ b/gst-plugin/src/sink.c @@ -119,6 +119,8 @@ gst_rs_sink_init (GstRsSink * sink, GstRsSinkClass * klass) gst_base_sink_set_sync (GST_BASE_SINK (sink), FALSE); + GST_DEBUG_OBJECT (sink, "Instantiating"); + sink->instance = sink_new (sink, data->create_instance); } @@ -127,6 +129,8 @@ gst_rs_sink_finalize (GObject * object) { GstRsSink *sink = GST_RS_SINK (object); + GST_DEBUG_OBJECT (sink, "Finalizing"); + sink_drop (sink->instance); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -174,8 +178,12 @@ gst_rs_sink_render (GstBaseSink * basesink, GstBuffer * buffer) GstRsSink *sink = GST_RS_SINK (basesink); GstFlowReturn ret; + GST_TRACE_OBJECT (sink, "Rendering buffer %p", buffer); + ret = sink_render (sink->instance, buffer); + GST_TRACE_OBJECT (sink, "Rendered buffer: %s", gst_flow_get_name (ret)); + return ret; } @@ -185,6 +193,8 @@ gst_rs_sink_start (GstBaseSink * basesink) { GstRsSink *sink = GST_RS_SINK (basesink); + GST_DEBUG_OBJECT (sink, "Starting"); + return sink_start (sink->instance); } @@ -194,6 +204,8 @@ gst_rs_sink_stop (GstBaseSink * basesink) { GstRsSink *sink = GST_RS_SINK (basesink); + GST_DEBUG_OBJECT (sink, "Stopping"); + /* Ignore stop failures */ sink_stop (sink->instance); @@ -219,8 +231,13 @@ static gchar * gst_rs_sink_uri_get_uri (GstURIHandler * handler) { GstRsSink *sink = GST_RS_SINK (handler); + gchar *res; - return sink_get_uri (sink->instance); + res = sink_get_uri (sink->instance); + + GST_DEBUG_OBJECT (sink, "Returning URI %s", res); + + return res; } static gboolean @@ -229,8 +246,12 @@ gst_rs_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri, { GstRsSink *sink = GST_RS_SINK (handler); - if (!sink_set_uri (sink->instance, uri, err)) + GST_DEBUG_OBJECT (sink, "Setting URI %s", uri); + + if (!sink_set_uri (sink->instance, uri, err)) { + GST_ERROR_OBJECT (sink, "Failed to set URI: %s", (*err)->message); return FALSE; + } return TRUE; } @@ -250,7 +271,8 @@ static gpointer gst_rs_sink_init_class (gpointer data) { sinks = g_hash_table_new (g_direct_hash, g_direct_equal); - GST_DEBUG_CATEGORY_INIT (gst_rs_sink_debug, "rssink", 0, "rssink element"); + GST_DEBUG_CATEGORY_INIT (gst_rs_sink_debug, "rssink", 0, + "Rust sink base class"); parent_class = g_type_class_ref (GST_TYPE_BASE_SINK); } @@ -284,6 +306,14 @@ gst_rs_sink_register (GstPlugin * plugin, const gchar * name, g_once (&gonce, gst_rs_sink_init_class, NULL); + GST_DEBUG ("Registering for %" GST_PTR_FORMAT ": %s", plugin, name); + GST_DEBUG (" long name: %s", long_name); + GST_DEBUG (" description: %s", description); + GST_DEBUG (" classification: %s", classification); + GST_DEBUG (" author: %s", author); + GST_DEBUG (" rank: %d", rank); + GST_DEBUG (" protocols: %s", protocols); + data = g_new0 (ElementData, 1); data->name = g_strdup (name); data->long_name = g_strdup (long_name); diff --git a/gst-plugin/src/sink.rs b/gst-plugin/src/sink.rs index 5c9928f3..645666e2 100644 --- a/gst-plugin/src/sink.rs +++ b/gst-plugin/src/sink.rs @@ -28,9 +28,12 @@ use std::sync::atomic::{AtomicBool, Ordering}; use url::Url; +use slog::*; + use utils::*; use error::*; use buffer::*; +use log::*; use plugin::Plugin; #[derive(Debug)] @@ -56,6 +59,7 @@ impl ToGError for SinkError { pub struct SinkWrapper { raw: *mut c_void, + logger: Logger, uri: Mutex<(Option, bool)>, uri_validator: Box, sink: Mutex>, @@ -75,6 +79,11 @@ impl SinkWrapper { fn new(raw: *mut c_void, sink: Box) -> SinkWrapper { SinkWrapper { raw: raw, + logger: Logger::root(GstDebugDrain::new(Some(unsafe { &Element::new(raw) }), + "rssink", + 0, + "Rust sink base class"), + None), uri: Mutex::new((None, false)), uri_validator: sink.uri_validator(), sink: Mutex::new(sink), @@ -85,6 +94,8 @@ impl SinkWrapper { fn set_uri(&self, uri_str: Option<&str>) -> Result<(), UriError> { let uri_storage = &mut self.uri.lock().unwrap(); + debug!(self.logger, "Setting URI {:?}", uri_str); + if uri_storage.1 { return Err(UriError::new(UriErrorKind::BadState, Some("Already started".to_string()))); } @@ -114,6 +125,8 @@ impl SinkWrapper { } fn start(&self) -> bool { + debug!(self.logger, "Starting"); + // Don't keep the URI locked while we call start later let uri = match *self.uri.lock().unwrap() { (Some(ref uri), ref mut started) => { @@ -121,6 +134,7 @@ impl SinkWrapper { uri.clone() } (None, _) => { + error!(self.logger, "No URI given"); self.post_message(&error_msg!(SinkError::OpenFailed, ["No URI given"])); return false; } @@ -128,8 +142,13 @@ impl SinkWrapper { let sink = &mut self.sink.lock().unwrap(); match sink.start(uri) { - Ok(..) => true, + Ok(..) => { + trace!(self.logger, "Started successfully"); + true + } Err(ref msg) => { + error!(self.logger, "Failed to start: {:?}", msg); + self.uri.lock().unwrap().1 = false; self.post_message(msg); false @@ -140,12 +159,17 @@ impl SinkWrapper { fn stop(&self) -> bool { let sink = &mut self.sink.lock().unwrap(); + debug!(self.logger, "Stopping"); + match sink.stop() { Ok(..) => { + trace!(self.logger, "Stopped successfully"); self.uri.lock().unwrap().1 = false; true } Err(ref msg) => { + error!(self.logger, "Failed to stop: {:?}", msg); + self.post_message(msg); false } @@ -155,9 +179,12 @@ impl SinkWrapper { fn render(&self, buffer: &Buffer) -> GstFlowReturn { let sink = &mut self.sink.lock().unwrap(); + trace!(self.logger, "Rendering buffer {:?}", buffer); + match sink.render(buffer) { Ok(..) => GstFlowReturn::Ok, Err(flow_error) => { + error!(self.logger, "Failed to render: {:?}", flow_error); match flow_error { FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => self.post_message(msg), @@ -204,6 +231,7 @@ pub unsafe extern "C" fn sink_set_uri(ptr: *const SinkWrapper, match wrap.set_uri(uri_str) { Err(err) => { + error!(wrap.logger, "Failed to set URI {:?}", err); err.into_gerror(cerr); GBoolean::False } diff --git a/gst-plugin/src/source.c b/gst-plugin/src/source.c index 29983d13..e6721418 100644 --- a/gst-plugin/src/source.c +++ b/gst-plugin/src/source.c @@ -128,6 +128,8 @@ gst_rs_src_init (GstRsSrc * src, GstRsSrcClass * klass) gst_base_src_set_blocksize (GST_BASE_SRC (src), 4096); + GST_DEBUG_OBJECT (src, "Instantiating"); + src->instance = source_new (src, data->create_instance); } @@ -136,6 +138,8 @@ gst_rs_src_finalize (GObject * object) { GstRsSrc *src = GST_RS_SRC (object); + GST_DEBUG_OBJECT (src, "Finalizing"); + source_drop (src->instance); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -182,16 +186,28 @@ gst_rs_src_fill (GstBaseSrc * basesrc, guint64 offset, guint length, GstBuffer * buf) { GstRsSrc *src = GST_RS_SRC (basesrc); + GstFlowReturn ret; - return source_fill (src->instance, offset, length, buf); + GST_TRACE_OBJECT (src, + "Filling buffer %p, offset %" G_GUINT64_FORMAT " and length %" + G_GUINT64_FORMAT, *buf, offset, length); + + ret = source_fill (src->instance, offset, length, buf); + + GST_TRACE_OBJECT (src, "Filled buffer: %s", gst_flow_get_name (ret)); + + return ret; } static gboolean gst_rs_src_is_seekable (GstBaseSrc * basesrc) { GstRsSrc *src = GST_RS_SRC (basesrc); + gboolean res; - return source_is_seekable (src->instance); + res = source_is_seekable (src->instance); + + GST_DEBUG_OBJECT (src, "Returning seekable %d", res); } static gboolean @@ -201,6 +217,8 @@ gst_rs_src_get_size (GstBaseSrc * basesrc, guint64 * size) *size = source_get_size (src->instance); + GST_DEBUG_OBJECT (src, "Returning size %" G_GUINT64_FORMAT, *size); + return TRUE; } @@ -210,6 +228,8 @@ gst_rs_src_start (GstBaseSrc * basesrc) { GstRsSrc *src = GST_RS_SRC (basesrc); + GST_DEBUG_OBJECT (src, "Starting"); + return source_start (src->instance); } @@ -218,6 +238,8 @@ gst_rs_src_stop (GstBaseSrc * basesrc) { GstRsSrc *src = GST_RS_SRC (basesrc); + GST_DEBUG_OBJECT (src, "Stopping"); + /* Ignore stop failures */ source_stop (src->instance); @@ -230,9 +252,14 @@ gst_rs_src_do_seek (GstBaseSrc * basesrc, GstSegment * segment) GstRsSrc *src = GST_RS_SRC (basesrc); gboolean ret; + GST_DEBUG_OBJECT (src, "Seeking to %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT, + GST_TIME_ARGS (segment->start), GST_TIME_ARGS (segment->stop)); + ret = source_seek (src->instance, segment->start, segment->stop); - if (!ret) + if (!ret) { + GST_DEBUG_OBJECT (src, "Failed to seek"); return FALSE; + } return GST_BASE_SRC_CLASS (parent_class)->do_seek (basesrc, segment); } @@ -256,8 +283,13 @@ static gchar * gst_rs_src_uri_get_uri (GstURIHandler * handler) { GstRsSrc *src = GST_RS_SRC (handler); + gchar *res; - return source_get_uri (src->instance); + res = source_get_uri (src->instance); + + GST_DEBUG_OBJECT (src, "Returning URI %s", res); + + return res; } static gboolean @@ -266,8 +298,12 @@ gst_rs_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, { GstRsSrc *src = GST_RS_SRC (handler); - if (!source_set_uri (src->instance, uri, err)) + GST_DEBUG_OBJECT (src, "Setting URI %s", uri); + + if (!source_set_uri (src->instance, uri, err)) { + GST_ERROR_OBJECT (src, "Failed to set URI: %s", (*err)->message); return FALSE; + } return TRUE; } @@ -287,7 +323,8 @@ static gpointer gst_rs_source_init_class (gpointer data) { sources = g_hash_table_new (g_direct_hash, g_direct_equal); - GST_DEBUG_CATEGORY_INIT (gst_rs_src_debug, "rssrc", 0, "rssrc element"); + GST_DEBUG_CATEGORY_INIT (gst_rs_src_debug, "rssrc", 0, + "Rust source base class"); parent_class = g_type_class_ref (GST_TYPE_BASE_SRC); @@ -323,6 +360,15 @@ gst_rs_source_register (GstPlugin * plugin, const gchar * name, g_once (&gonce, gst_rs_source_init_class, NULL); + GST_DEBUG ("Registering for %" GST_PTR_FORMAT ": %s", plugin, name); + GST_DEBUG (" long name: %s", long_name); + GST_DEBUG (" description: %s", description); + GST_DEBUG (" classification: %s", classification); + GST_DEBUG (" author: %s", author); + GST_DEBUG (" rank: %d", rank); + GST_DEBUG (" protocols: %s", protocols); + GST_DEBUG (" push only: %d", push_only); + data = g_new0 (ElementData, 1); data->long_name = g_strdup (long_name); data->description = g_strdup (description); diff --git a/gst-plugin/src/source.rs b/gst-plugin/src/source.rs index 2338b15d..d632f6b0 100644 --- a/gst-plugin/src/source.rs +++ b/gst-plugin/src/source.rs @@ -28,10 +28,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use url::Url; +use slog::*; + use plugin::Plugin; use utils::*; use error::*; use buffer::*; +use log::*; #[derive(Debug)] pub enum SourceError { @@ -56,6 +59,7 @@ impl ToGError for SourceError { pub struct SourceWrapper { raw: *mut c_void, + logger: Logger, uri: Mutex<(Option, bool)>, uri_validator: Box, source: Mutex>, @@ -78,6 +82,11 @@ impl SourceWrapper { fn new(raw: *mut c_void, source: Box) -> SourceWrapper { SourceWrapper { raw: raw, + logger: Logger::root(GstDebugDrain::new(Some(unsafe { &Element::new(raw) }), + "rssrc", + 0, + "Rust source base class"), + None), uri: Mutex::new((None, false)), uri_validator: source.uri_validator(), source: Mutex::new(source), @@ -88,6 +97,8 @@ impl SourceWrapper { fn set_uri(&self, uri_str: Option<&str>) -> Result<(), UriError> { let uri_storage = &mut self.uri.lock().unwrap(); + debug!(self.logger, "Setting URI {:?}", uri_str); + if uri_storage.1 { return Err(UriError::new(UriErrorKind::BadState, Some("Already started".to_string()))); } @@ -127,6 +138,8 @@ impl SourceWrapper { } fn start(&self) -> bool { + debug!(self.logger, "Starting"); + // Don't keep the URI locked while we call start later let uri = match *self.uri.lock().unwrap() { (Some(ref uri), ref mut started) => { @@ -134,6 +147,7 @@ impl SourceWrapper { uri.clone() } (None, _) => { + error!(self.logger, "No URI given"); self.post_message(&error_msg!(SourceError::OpenFailed, ["No URI given"])); return false; } @@ -141,8 +155,13 @@ impl SourceWrapper { let source = &mut self.source.lock().unwrap(); match source.start(uri) { - Ok(..) => true, + Ok(..) => { + trace!(self.logger, "Started successfully"); + true + } Err(ref msg) => { + error!(self.logger, "Failed to start: {:?}", msg); + self.uri.lock().unwrap().1 = false; self.post_message(msg); false @@ -153,12 +172,17 @@ impl SourceWrapper { fn stop(&self) -> bool { let source = &mut self.source.lock().unwrap(); + debug!(self.logger, "Stopping"); + match source.stop() { Ok(..) => { + trace!(self.logger, "Stopped successfully"); self.uri.lock().unwrap().1 = false; true } Err(ref msg) => { + error!(self.logger, "Failed to stop: {:?}", msg); + self.post_message(msg); false } @@ -167,9 +191,17 @@ impl SourceWrapper { fn fill(&self, offset: u64, length: u32, buffer: &mut Buffer) -> GstFlowReturn { let source = &mut self.source.lock().unwrap(); + + trace!(self.logger, + "Filling buffer {:?} with offset {} and length {}", + buffer, + offset, + length); + match source.fill(offset, length, buffer) { Ok(()) => GstFlowReturn::Ok, Err(flow_error) => { + error!(self.logger, "Failed to fill: {:?}", flow_error); match flow_error { FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => self.post_message(msg), @@ -183,9 +215,12 @@ impl SourceWrapper { fn seek(&self, start: u64, stop: Option) -> bool { let source = &mut self.source.lock().unwrap(); + debug!(self.logger, "Seeking to {:?}-{:?}", start, stop); + match source.seek(start, stop) { Ok(..) => true, Err(ref msg) => { + error!(self.logger, "Failed to seek {:?}", msg); self.post_message(msg); false } @@ -229,6 +264,7 @@ pub unsafe extern "C" fn source_set_uri(ptr: *const SourceWrapper, match wrap.set_uri(uri_str) { Err(err) => { + error!(wrap.logger, "Failed to set URI {:?}", err); err.into_gerror(cerr); GBoolean::False }