From 62d24efc0405c879a29f247e215e06f423cd4ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 24 Nov 2016 16:29:43 +0200 Subject: [PATCH] WIP demuxer --- Cargo.toml | 2 + build.rs | 2 +- src/adapter.rs | 15 ++ src/buffer.rs | 13 ++ src/flvdemux.rs | 463 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 66 ++++++ src/plugin.c | 8 +- src/rsdemuxer.c | 522 +++++++++++++++++++++++++++++++++++++++++++++++ src/rsdemuxer.h | 62 ++++++ src/rsdemuxer.rs | 433 +++++++++++++++++++++++++++++++++++++++ 10 files changed, 1583 insertions(+), 3 deletions(-) create mode 100644 src/flvdemux.rs create mode 100644 src/rsdemuxer.c create mode 100644 src/rsdemuxer.h create mode 100644 src/rsdemuxer.rs diff --git a/Cargo.toml b/Cargo.toml index 98920a40..4be22cf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ libc = "0.2" url = "1.1" bitflags = "0.7" reqwest = "0.2" +nom = "1.2" +flavors = {git = "https://github.com/Geal/flavors.git"} [build-dependencies] gcc = "0.3" diff --git a/build.rs b/build.rs index 56a93954..134f3111 100644 --- a/build.rs +++ b/build.rs @@ -23,7 +23,7 @@ fn main() { let gstbase = pkg_config::probe_library("gstreamer-base-1.0").unwrap(); let includes = [gstreamer.include_paths, gstbase.include_paths]; - let files = ["src/plugin.c", "src/rssource.c", "src/rssink.c"]; + let files = ["src/plugin.c", "src/rssource.c", "src/rssink.c", "src/rsdemuxer.c"]; let mut config = gcc::Config::new(); config.include("src"); diff --git a/src/adapter.rs b/src/adapter.rs index ba67d13e..dea087f7 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -81,6 +81,21 @@ impl Adapter { assert_eq!(left, 0); } + pub fn peek_into(&self, data: &mut [u8]) -> Result<(), AdapterError> { + let size = data.len(); + + if self.size < size { + return Err(AdapterError::NotEnoughData); + } + + if size == 0 { + return Ok(()); + } + + Self::copy_data(&self.deque, self.skip, data, size); + Ok(()) + } + pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> { if self.size < size { return Err(AdapterError::NotEnoughData); diff --git a/src/buffer.rs b/src/buffer.rs index d5cea2f7..678a9b14 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -241,6 +241,19 @@ impl Buffer { } } + pub unsafe fn into_ptr(mut self) -> *mut c_void { + extern "C" { + fn gst_mini_object_ref(obj: *mut c_void) -> *mut c_void; + }; + if !self.owned { + gst_mini_object_ref(self.raw); + } + + self.owned = false; + + self.raw + } + pub fn is_writable(&self) -> bool { extern "C" { fn gst_mini_object_is_writable(obj: *const c_void) -> GBoolean; diff --git a/src/flvdemux.rs b/src/flvdemux.rs new file mode 100644 index 00000000..8152c003 --- /dev/null +++ b/src/flvdemux.rs @@ -0,0 +1,463 @@ +// Copyright (C) 2016 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +// Boston, MA 02110-1301, USA. + +use std::cmp; + +use nom; +use nom::IResult; + +use flavors::parser as flavors; + +use error::*; +use rsdemuxer::*; +use buffer::*; +use adapter::*; + +#[derive(Debug)] +enum StreamingState { + Stopped, + Started { + adapter: Adapter, + stream_state: StreamState, + }, +} + +#[derive(Debug)] +enum StreamState { + NeedHeader, + Initialized { + header: flavors::Header, + initialized_state: InitializedState, + }, +} + +#[derive(Debug)] +enum InitializedState { + Skipping { skip_left: u32 }, + Setup { setup_state: SetupState }, +} + +#[derive(Debug)] +struct SetupState { + audio: Option, + video: Option, /* TODO: parse and store various audio/video metadata from ScriptDataObject */ + got_all_streams: bool, + last_position: Option, +} + +#[derive(Debug)] +pub struct FlvDemux { + streaming_state: StreamingState, +} + +impl FlvDemux { + pub fn new() -> FlvDemux { + FlvDemux { streaming_state: StreamingState::Stopped } + } + + pub fn new_boxed() -> Box { + Box::new(FlvDemux::new()) + } + + fn handle_script_tag(adapter: &mut Adapter, + header: &flavors::Header, + setup_state: &mut SetupState, + tag_header: &flavors::TagHeader) + -> Result { + adapter.flush((15 + tag_header.data_size) as usize).unwrap(); + + Ok(HandleBufferResult::Again) + } + + fn update_audio_stream(header: &flavors::Header, + setup_state: &mut SetupState, + data_header: flavors::AudioDataHeader) + -> Result { + let stream_changed = match setup_state.audio { + None => true, + Some(flavors::AudioDataHeader { sound_format, sound_rate, sound_size, sound_type }) + if sound_format != data_header.sound_format || + sound_rate != data_header.sound_rate || + sound_size != data_header.sound_size || + sound_type != data_header.sound_type => true, + _ => false, + }; + + if stream_changed { + match data_header { + flavors::AudioDataHeader { sound_format: flavors::SoundFormat::MP3, .. } => { + let format = String::from("audio/mpeg, mpegversion=1, layer=3"); + let new_stream = setup_state.audio == None; + + setup_state.audio = Some(data_header); + let stream = Stream::new(0, format, String::from("audio")); + if new_stream { + return Ok(HandleBufferResult::StreamAdded(stream)); + } else { + return Ok(HandleBufferResult::StreamChanged(stream)); + } + } + _ => { + unimplemented!(); + } + } + } + setup_state.audio = Some(data_header); + + if !setup_state.got_all_streams && + (header.video && setup_state.video != None || !header.video) { + setup_state.got_all_streams = true; + return Ok(HandleBufferResult::HaveAllStreams); + } + + Ok(HandleBufferResult::Again) + } + + fn handle_audio_tag(adapter: &mut Adapter, + header: &flavors::Header, + setup_state: &mut SetupState, + tag_header: &flavors::TagHeader, + data_header: flavors::AudioDataHeader) + -> Result { + let res = Self::update_audio_stream(header, setup_state, data_header); + match res { + Ok(HandleBufferResult::Again) => (), + _ => return res, + } + + if adapter.get_available() < (15 + tag_header.data_size) as usize { + return Ok(HandleBufferResult::NeedMoreData); + } + + adapter.flush(16).unwrap(); + if tag_header.data_size == 0 { + return Ok(HandleBufferResult::Again); + } + + let mut buffer = adapter.get_buffer((tag_header.data_size - 1) as usize) + .unwrap(); + buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000)) + .unwrap(); + + Ok(HandleBufferResult::BufferForStream(0, buffer)) + } + + fn update_video_stream(header: &flavors::Header, + setup_state: &mut SetupState, + data_header: flavors::VideoDataHeader) + -> Result { + let stream_changed = match setup_state.video { + None => true, + Some(flavors::VideoDataHeader { codec_id, .. }) if codec_id != data_header.codec_id => { + true + } + _ => false, + }; + + if stream_changed { + match data_header { + flavors::VideoDataHeader { codec_id: flavors::CodecId::VP6, .. } => { + let format = String::from("video/x-vp6-flash"); + let new_stream = setup_state.video == None; + setup_state.video = Some(data_header); + + let stream = Stream::new(1, format, String::from("video")); + if new_stream { + return Ok(HandleBufferResult::StreamAdded(stream)); + } else { + return Ok(HandleBufferResult::StreamChanged(stream)); + } + } + _ => { + unimplemented!(); + } + } + } + + setup_state.video = Some(data_header); + + if !setup_state.got_all_streams && + (header.audio && setup_state.audio != None || !header.audio) { + setup_state.got_all_streams = true; + return Ok(HandleBufferResult::HaveAllStreams); + } + + Ok(HandleBufferResult::Again) + } + + fn handle_video_tag(adapter: &mut Adapter, + header: &flavors::Header, + setup_state: &mut SetupState, + tag_header: &flavors::TagHeader, + data_header: flavors::VideoDataHeader) + -> Result { + let res = Self::update_video_stream(header, setup_state, data_header); + match res { + Ok(HandleBufferResult::Again) => (), + _ => return res, + } + + if adapter.get_available() < (15 + tag_header.data_size) as usize { + return Ok(HandleBufferResult::NeedMoreData); + } + + let video = setup_state.video.as_ref().unwrap(); + let is_keyframe = video.frame_type == flavors::FrameType::Key; + + adapter.flush(16).unwrap(); + + let offset = if video.codec_id == flavors::CodecId::VP6 || + video.codec_id == flavors::CodecId::VP6A { + 1 + } else { + 0 + }; + + if tag_header.data_size == 0 { + return Ok(HandleBufferResult::Again); + } + + if tag_header.data_size < offset { + adapter.flush((tag_header.data_size - 1) as usize).unwrap(); + return Ok(HandleBufferResult::Again); + } + + if offset > 0 { + adapter.flush(offset as usize).unwrap(); + } + + let mut buffer = adapter.get_buffer((tag_header.data_size - 1 - offset) as usize) + .unwrap(); + if !is_keyframe { + buffer.set_flags(BUFFER_FLAG_DELTA_UNIT).unwrap(); + } + buffer.set_dts(Some((tag_header.timestamp as u64) * 1000 * 1000)) + .unwrap(); + + Ok(HandleBufferResult::BufferForStream(1, buffer)) + } + + fn update_setup_state(adapter: &mut Adapter, + header: &flavors::Header, + setup_state: &mut SetupState) + -> Result { + if adapter.get_available() < 16 { + return Ok(HandleBufferResult::NeedMoreData); + } + + let mut data = [0u8; 16]; + adapter.peek_into(&mut data).unwrap(); + + match nom::be_u32(&data[0..4]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, previous_size) => { + (); + } + } + + let tag_header = match flavors::tag_header(&data[4..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, tag_header) => tag_header, + }; + + let res = match tag_header.tag_type { + flavors::TagType::Script => { + Self::handle_script_tag(adapter, header, setup_state, &tag_header) + } + flavors::TagType::Audio => { + let data_header = match flavors::audio_data_header(&data[15..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, data_header) => data_header, + }; + + Self::handle_audio_tag(adapter, header, setup_state, &tag_header, data_header) + } + flavors::TagType::Video => { + let data_header = match flavors::video_data_header(&data[15..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, data_header) => data_header, + }; + + Self::handle_video_tag(adapter, header, setup_state, &tag_header, data_header) + } + }; + + if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res { + if let Some(pts) = buffer.get_pts() { + setup_state.last_position = + setup_state.last_position.map(|last| cmp::max(last, pts)).or_else(|| Some(pts)); + } else if let Some(dts) = buffer.get_dts() { + setup_state.last_position = + setup_state.last_position.map(|last| cmp::max(last, dts)).or_else(|| Some(dts)); + } + } + + res + } + + fn update_initialized_state(adapter: &mut Adapter, + header: &flavors::Header, + initialized_state: &mut InitializedState) + -> Result { + match *initialized_state { + InitializedState::Skipping { skip_left: 0 } => { + *initialized_state = InitializedState::Setup { + setup_state: SetupState { + audio: None, + video: None, + got_all_streams: false, + last_position: None, + }, + }; + Ok(HandleBufferResult::Again) + } + InitializedState::Skipping { ref mut skip_left } => { + let skip = cmp::min(adapter.get_available(), *skip_left as usize); + adapter.flush(skip).unwrap(); + *skip_left -= skip as u32; + + Ok(HandleBufferResult::Again) + } + InitializedState::Setup { ref mut setup_state } => { + Self::update_setup_state(adapter, header, setup_state) + } + } + } + + fn update_state(adapter: &mut Adapter, + stream_state: &mut StreamState) + -> Result { + match *stream_state { + StreamState::NeedHeader => { + while adapter.get_available() >= 9 { + let mut data = [0u8; 9]; + adapter.peek_into(&mut data).unwrap(); + + match flavors::header(&data) { + IResult::Error(_) | + IResult::Incomplete(_) => { + // fall through + } + IResult::Done(_, mut header) => { + if header.offset < 9 { + header.offset = 9; + } + let skip = header.offset - 9; + adapter.flush(9).unwrap(); + + *stream_state = StreamState::Initialized { + header: header, + initialized_state: InitializedState::Skipping { skip_left: skip }, + }; + + return Ok(HandleBufferResult::Again); + } + } + + adapter.flush(1).unwrap(); + } + + Ok(HandleBufferResult::NeedMoreData) + } + StreamState::Initialized { ref header, ref mut initialized_state } => { + Self::update_initialized_state(adapter, header, initialized_state) + } + } + } +} + +impl Demuxer for FlvDemux { + fn start(&mut self, + _upstream_size: Option, + _random_access: bool) + -> Result<(), ErrorMessage> { + self.streaming_state = StreamingState::Started { + adapter: Adapter::new(), + stream_state: StreamState::NeedHeader, + }; + + Ok(()) + } + + fn stop(&mut self) -> Result<(), ErrorMessage> { + self.streaming_state = StreamingState::Stopped; + + Ok(()) + } + + fn seek(&mut self, start: u64, stop: Option) -> Result { + unimplemented!(); + } + + fn handle_buffer(&mut self, buffer: Option) -> Result { + let (adapter, stream_state) = match self.streaming_state { + StreamingState::Started { ref mut adapter, ref mut stream_state } => { + (adapter, stream_state) + } + StreamingState::Stopped => { + unreachable!(); + } + }; + + if let Some(buffer) = buffer { + adapter.push(buffer); + } + + Self::update_state(adapter, stream_state) + } + + fn end_of_stream(&mut self) -> Result<(), ErrorMessage> { + // nothing to do here, all data we have left is incomplete + Ok(()) + } + + fn is_seekable(&self) -> bool { + false + } + + fn get_position(&self) -> Option { + if let StreamingState::Started { + stream_state: StreamState::Initialized { + initialized_state: InitializedState::Setup { + setup_state: SetupState { + last_position, .. + }, .. + }, .. + }, .. + } = self.streaming_state { + return last_position; + } + + None + } + + fn get_duration(&self) -> Option { + None + } +} diff --git a/src/lib.rs b/src/lib.rs index e64da8c3..fa16dbc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,9 @@ extern crate url; extern crate reqwest; #[macro_use] extern crate bitflags; +#[macro_use] +extern crate nom; +extern crate flavors; #[macro_use] pub mod utils; @@ -35,11 +38,14 @@ pub mod rssink; pub mod rsfilesrc; pub mod rshttpsrc; pub mod rsfilesink; +pub mod rsdemuxer; +pub mod flvdemux; use utils::*; use rsfilesrc::FileSrc; use rshttpsrc::HttpSrc; use rsfilesink::FileSink; +use flvdemux::FlvDemux; use std::os::raw::c_void; use libc::c_char; @@ -171,3 +177,63 @@ pub unsafe extern "C" fn sinks_register(plugin: *const c_void) -> GBoolean { GBoolean::True } + +unsafe fn demuxer_register(plugin: *const c_void, + name: &str, + long_name: &str, + description: &str, + classification: &str, + author: &str, + rank: i32, + create_instance: *const c_void, + input_format: &str, + output_formats: &str) { + extern "C" { + fn gst_rs_demuxer_register(plugin: *const c_void, + name: *const c_char, + long_name: *const c_char, + description: *const c_char, + classification: *const c_char, + author: *const c_char, + rank: i32, + create_instance: *const c_void, + input_format: *const c_char, + output_formats: *const c_char) + -> GBoolean; + } + + let cname = CString::new(name).unwrap(); + let clong_name = CString::new(long_name).unwrap(); + let cdescription = CString::new(description).unwrap(); + let cclassification = CString::new(classification).unwrap(); + let cauthor = CString::new(author).unwrap(); + let cinput_format = CString::new(input_format).unwrap(); + let coutput_formats = CString::new(output_formats).unwrap(); + + gst_rs_demuxer_register(plugin, + cname.as_ptr(), + clong_name.as_ptr(), + cdescription.as_ptr(), + cclassification.as_ptr(), + cauthor.as_ptr(), + rank, + create_instance, + cinput_format.as_ptr(), + coutput_formats.as_ptr()); +} + +#[no_mangle] +pub unsafe extern "C" fn demuxers_register(plugin: *const c_void) -> GBoolean { + demuxer_register(plugin, + "rsflvdemux", + "FLV Demuxer", + "Demuxes FLV Streams", + "Codec/Demuxer", + "Sebastian Dröge ", + 256 + 100, + FlvDemux::new_boxed as *const c_void, + "video/x-flv", + "ANY"); + + GBoolean::True +} diff --git a/src/plugin.c b/src/plugin.c index 2a3f9e00..05382610 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -20,6 +20,7 @@ #include "rssource.h" #include "rssink.h" +#include "rsdemuxer.h" static gboolean plugin_init (GstPlugin * plugin) @@ -30,6 +31,9 @@ plugin_init (GstPlugin * plugin) if (!gst_rs_sink_plugin_init (plugin)) return FALSE; + if (!gst_rs_demuxer_plugin_init (plugin)) + return FALSE; + return TRUE; } @@ -102,13 +106,13 @@ gst_rs_buffer_set_offset_end (GstBuffer * buffer, guint64 offset_end) } GstBufferFlags -gst_rs_buffer_get_buffer_flags (GstBuffer * buffer) +gst_rs_buffer_get_flags (GstBuffer * buffer) { return GST_MINI_OBJECT_FLAGS (buffer); } void -gst_rs_buffer_set_buffer_flags (GstBuffer * buffer, GstBufferFlags flags) +gst_rs_buffer_set_flags (GstBuffer * buffer, GstBufferFlags flags) { GST_MINI_OBJECT_FLAGS (buffer) = flags; } diff --git a/src/rsdemuxer.c b/src/rsdemuxer.c new file mode 100644 index 00000000..e8fdb51d --- /dev/null +++ b/src/rsdemuxer.c @@ -0,0 +1,522 @@ +/* Copyright (C) 2016 Sebastian Dröge + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "rsdemuxer.h" + +#include +#include + +typedef struct +{ + gchar *long_name; + gchar *description; + gchar *classification; + gchar *author; + void *create_instance; + gchar *input_format; + gchar *output_formats; +} ElementData; +static GHashTable *demuxers; + +/* Declarations for Rust code */ +extern gboolean demuxers_register (void *plugin); +extern void *demuxer_new (GstRsDemuxer * demuxer, void *create_instance); +extern void demuxer_drop (void *rsdemuxer); + +extern gboolean demuxer_start (void *rsdemuxer, uint64_t upstream_size, + gboolean random_access); +extern gboolean demuxer_stop (void *rsdemuxer); + +extern gboolean demuxer_is_seekable (void *rsdemuxer); +extern gboolean demuxer_get_position (void *rsdemuxer, uint64_t * position); +extern gboolean demuxer_get_duration (void *rsdemuxer, uint64_t * duration); + +extern gboolean demuxer_seek (void *rsdemuxer, uint64_t start, uint64_t stop, + uint64_t * offset); +extern GstFlowReturn demuxer_handle_buffer (void *rsdemuxer, + GstBuffer * buffer); +extern void demuxer_end_of_stream (void *rsdemuxer); + +extern void cstring_drop (void *str); + +GST_DEBUG_CATEGORY_STATIC (gst_rs_demuxer_debug); +#define GST_CAT_DEFAULT gst_rs_demuxer_debug + +static void gst_rs_demuxer_finalize (GObject * object); +static gboolean gst_rs_demuxer_sink_activate (GstPad * pad, GstObject * parent); +static gboolean gst_rs_demuxer_sink_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); +static GstFlowReturn gst_rs_demuxer_sink_chain (GstPad * pad, + GstObject * parent, GstBuffer * buf); +static gboolean gst_rs_demuxer_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, + GstQuery * query); +static gboolean gst_rs_demuxer_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstStateChangeReturn gst_rs_demuxer_change_state (GstElement * element, + GstStateChange transition); +static void gst_rs_demuxer_loop (GstRsDemuxer * demuxer); + +static GObjectClass *parent_class; + +static void +gst_rs_demuxer_class_init (GstRsDemuxerClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + ElementData *data = g_hash_table_lookup (demuxers, + GSIZE_TO_POINTER (G_TYPE_FROM_CLASS (klass))); + GstCaps *caps; + GstPadTemplate *templ; + g_assert (data != NULL); + + gobject_class = G_OBJECT_CLASS (klass); + gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->finalize = gst_rs_demuxer_finalize; + + gstelement_class->change_state = gst_rs_demuxer_change_state; + + gst_element_class_set_static_metadata (gstelement_class, + data->long_name, data->classification, data->description, data->author); + + caps = gst_caps_from_string (data->input_format); + templ = gst_pad_template_new ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, caps); + gst_caps_unref (caps); + + gst_element_class_add_pad_template (gstelement_class, templ); + + caps = gst_caps_from_string (data->output_formats); + templ = gst_pad_template_new ("src_%u", GST_PAD_SRC, GST_PAD_SOMETIMES, caps); + gst_caps_unref (caps); + + gst_element_class_add_pad_template (gstelement_class, templ); +} + +static void +gst_rs_demuxer_init (GstRsDemuxer * demuxer, GstRsDemuxerClass * klass) +{ + ElementData *data = g_hash_table_lookup (demuxers, + GSIZE_TO_POINTER (G_TYPE_FROM_CLASS (klass))); + GstPadTemplate *templ; + g_assert (data != NULL); + + demuxer->instance = demuxer_new (demuxer, data->create_instance); + + templ = + gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink"); + demuxer->sinkpad = gst_pad_new_from_template (templ, "sink"); + + gst_pad_set_activate_function (demuxer->sinkpad, + gst_rs_demuxer_sink_activate); + gst_pad_set_activatemode_function (demuxer->sinkpad, + gst_rs_demuxer_sink_activate_mode); + gst_pad_set_chain_function (demuxer->sinkpad, gst_rs_demuxer_sink_chain); + gst_pad_set_event_function (demuxer->sinkpad, gst_rs_demuxer_sink_event); + gst_element_add_pad (GST_ELEMENT (demuxer), demuxer->sinkpad); + + demuxer->flow_combiner = gst_flow_combiner_new (); +} + +static void +gst_rs_demuxer_finalize (GObject * object) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (object); + + gst_flow_combiner_free (demuxer->flow_combiner); + demuxer_drop (demuxer->instance); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static gboolean +gst_rs_demuxer_sink_activate (GstPad * pad, GstObject * parent) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + GstQuery *query; + GstPadMode mode = GST_PAD_MODE_PUSH; + + query = gst_query_new_scheduling (); + if (!gst_pad_peer_query (pad, query)) { + gst_query_unref (query); + return FALSE; + } + // TODO + //if (gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE)) + // mode = GST_PAD_MODE_PULL; + gst_query_unref (query); + + demuxer->upstream_size = -1; + query = gst_query_new_duration (GST_FORMAT_BYTES); + if (gst_pad_peer_query (pad, query)) { + gst_query_parse_duration (query, NULL, &demuxer->upstream_size); + } + gst_query_unref (query); + + return gst_pad_activate_mode (pad, mode, TRUE); +} + +static gboolean +gst_rs_demuxer_sink_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + gboolean res = TRUE; + + if (active) { + if (!demuxer_start (demuxer->instance, demuxer->upstream_size, + mode == GST_PAD_MODE_PULL ? TRUE : FALSE)) { + res = FALSE; + goto out; + } + if (mode == GST_PAD_MODE_PULL) + gst_pad_start_task (pad, (GstTaskFunction) gst_rs_demuxer_loop, demuxer, + NULL); + } else { + if (mode == GST_PAD_MODE_PULL) + gst_pad_stop_task (pad); + } + +out: + + return res; +} + +static GstFlowReturn +gst_rs_demuxer_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + + return demuxer_handle_buffer (demuxer->instance, buf); +} + +static gboolean +gst_rs_demuxer_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + gboolean res = FALSE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT:{ + // TODO + res = TRUE; + gst_event_unref (event); + break; + } + case GST_EVENT_EOS: + demuxer_end_of_stream (demuxer->instance); + res = gst_pad_event_default (pad, parent, event); + break; + default: + res = gst_pad_event_default (pad, parent, event); + break; + } + return res; +} + +static gboolean +gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + gboolean res = FALSE; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_POSITION:{ + GstFormat format; + + gst_query_parse_position (query, &format, NULL); + if (format == GST_FORMAT_TIME) { + gint64 position; + + if (demuxer_get_position (demuxer->instance, &position)) { + gst_query_set_position (query, format, position); + res = TRUE; + } else { + res = FALSE; + } + } + break; + } + case GST_QUERY_DURATION:{ + GstFormat format; + + gst_query_parse_duration (query, &format, NULL); + if (format == GST_FORMAT_TIME) { + gint64 duration; + + if (demuxer_get_duration (demuxer->instance, &duration)) { + gst_query_set_duration (query, format, duration); + res = TRUE; + } else { + res = FALSE; + } + } + break; + } + default: + res = gst_pad_query_default (pad, parent, query); + break; + } + return res; +} + +static gboolean +gst_rs_demuxer_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent); + gboolean res = FALSE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK:{ + // TODO + g_assert_not_reached (); + gst_event_unref (event); + break; + } + default: + res = gst_pad_event_default (pad, parent, event); + break; + } + return res; +} + +static GstStateChangeReturn +gst_rs_demuxer_change_state (GstElement * element, GstStateChange transition) +{ + GstRsDemuxer *demuxer = GST_RS_DEMUXER (element); + GstStateChangeReturn result; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + demuxer->offset = 0; + gst_segment_init (&demuxer->segment, GST_FORMAT_TIME); + demuxer->group_id = gst_util_group_id_next (); + demuxer->segment_seqnum = gst_util_seqnum_next (); + + break; + default: + break; + } + + if (result == GST_STATE_CHANGE_FAILURE) + return result; + result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + if (result == GST_STATE_CHANGE_FAILURE) + return result; + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY:{ + guint i; + + /* Ignore stop failures */ + demuxer_stop (demuxer->instance); + + gst_flow_combiner_clear (demuxer->flow_combiner); + + for (i = 0; i < demuxer->n_srcpads; i++) + gst_element_remove_pad (GST_ELEMENT (demuxer), demuxer->srcpads[i]); + memset (demuxer->srcpads, 0, sizeof (demuxer->srcpads)); + break; + } + default: + break; + } + + return result; +} + +static void +gst_rs_demuxer_loop (GstRsDemuxer * demuxer) +{ + // TODO + g_assert_not_reached (); +} + +void +gst_rs_demuxer_add_stream (GstRsDemuxer * demuxer, guint32 index, + const gchar * format, const gchar * stream_id) +{ + GstPad *pad; + GstPadTemplate *templ; + GstCaps *caps; + GstEvent *event; + gchar *name, *full_stream_id; + + g_assert (demuxer->srcpads[index] == NULL); + g_assert (demuxer->n_srcpads == index); + + templ = + gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (demuxer), + "src_%u"); + + name = g_strdup_printf ("src_%u", index); + pad = demuxer->srcpads[index] = gst_pad_new_from_template (templ, name); + g_free (name); + + gst_pad_set_query_function (pad, gst_rs_demuxer_src_query); + gst_pad_set_event_function (pad, gst_rs_demuxer_src_event); + + gst_pad_set_active (pad, TRUE); + + full_stream_id = + gst_pad_create_stream_id (pad, GST_ELEMENT (demuxer), stream_id); + event = gst_event_new_stream_start (full_stream_id); + gst_event_set_group_id (event, demuxer->group_id); + g_free (full_stream_id); + gst_pad_push_event (pad, event); + + caps = gst_caps_from_string (format); + event = gst_event_new_caps (caps); + gst_caps_unref (caps); + gst_pad_push_event (pad, event); + + event = gst_event_new_segment (&demuxer->segment); + gst_event_set_seqnum (event, demuxer->segment_seqnum); + gst_pad_push_event (pad, event); + + gst_flow_combiner_add_pad (demuxer->flow_combiner, pad); + gst_element_add_pad (GST_ELEMENT (demuxer), pad); + demuxer->n_srcpads++; +} + +void +gst_rs_demuxer_added_all_streams (GstRsDemuxer * demuxer) +{ + gst_element_no_more_pads (GST_ELEMENT (demuxer)); + demuxer->group_id = gst_util_group_id_next (); +} + +void +gst_rs_demuxer_stream_format_changed (GstRsDemuxer * demuxer, guint32 index, + const gchar * format) +{ + GstCaps *caps; + GstEvent *event; + + g_assert (demuxer->srcpads[index] != NULL); + + caps = gst_caps_from_string (format); + event = gst_event_new_caps (caps); + gst_caps_unref (caps); + + gst_pad_push_event (demuxer->srcpads[index], event); +} + +void +gst_rs_demuxer_stream_eos (GstRsDemuxer * demuxer, guint32 index) +{ + GstCaps *caps; + GstEvent *event; + + g_assert (demuxer->srcpads[index] != NULL); + + event = gst_event_new_eos (); + if (index == -1) { + gint i; + + for (i = 0; i < demuxer->n_srcpads; i++) + gst_pad_push_event (demuxer->srcpads[i], gst_event_ref (event)); + + gst_event_unref (event); + } else { + gst_pad_push_event (demuxer->srcpads[index], event); + } +} + + +GstFlowReturn +gst_rs_demuxer_stream_push_buffer (GstRsDemuxer * demuxer, guint32 index, + GstBuffer * buffer) +{ + GstFlowReturn res = GST_FLOW_OK; + + g_assert (demuxer->srcpads[index] != NULL); + + res = gst_pad_push (demuxer->srcpads[index], buffer); + res = gst_flow_combiner_update_flow (demuxer->flow_combiner, res); + + return res; +} + +void +gst_rs_demuxer_remove_all_streams (GstRsDemuxer * demuxer) +{ + guint i; + + gst_flow_combiner_clear (demuxer->flow_combiner); + + for (i = 0; i < demuxer->n_srcpads; i++) + gst_element_remove_pad (GST_ELEMENT (demuxer), demuxer->srcpads[i]); + memset (demuxer->srcpads, 0, sizeof (demuxer->srcpads)); +} + +gboolean +gst_rs_demuxer_plugin_init (GstPlugin * plugin) +{ + demuxers = g_hash_table_new (g_direct_hash, g_direct_equal); + GST_DEBUG_CATEGORY_INIT (gst_rs_demuxer_debug, "rsdemux", 0, + "rsdemux element"); + + parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + + return demuxers_register (plugin); +} + +gboolean +gst_rs_demuxer_register (GstPlugin * plugin, const gchar * name, + const gchar * long_name, const gchar * description, + const gchar * classification, const gchar * author, GstRank rank, + void *create_instance, const gchar * input_format, + const gchar * output_formats) +{ + GTypeInfo type_info = { + sizeof (GstRsDemuxerClass), + NULL, + NULL, + (GClassInitFunc) gst_rs_demuxer_class_init, + NULL, + NULL, + sizeof (GstRsDemuxer), + 0, + (GInstanceInitFunc) gst_rs_demuxer_init + }; + GType type; + gchar *type_name; + ElementData *data; + + data = g_new0 (ElementData, 1); + data->long_name = g_strdup (long_name); + data->description = g_strdup (description); + data->classification = g_strdup (classification); + data->author = g_strdup (author); + data->create_instance = create_instance; + data->input_format = g_strdup (input_format); + data->output_formats = g_strdup (output_formats); + + type_name = g_strconcat ("RsDemuxer-", name, NULL); + type = g_type_register_static (GST_TYPE_ELEMENT, type_name, &type_info, 0); + g_free (type_name); + + g_hash_table_insert (demuxers, GSIZE_TO_POINTER (type), data); + + if (!gst_element_register (plugin, name, rank, type)) + return FALSE; + + return TRUE; +} diff --git a/src/rsdemuxer.h b/src/rsdemuxer.h new file mode 100644 index 00000000..329f48a6 --- /dev/null +++ b/src/rsdemuxer.h @@ -0,0 +1,62 @@ +/* Copyright (C) 2016 Sebastian Dröge + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RS_DEMUXER_H__ +#define __GST_RS_DEMUXER_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_RS_DEMUXER(obj) \ + ((GstRsDemuxer *)obj) +#define GST_RS_DEMUXER_CLASS(klass) \ + ((GstRsDemuxerKlass *)klass) + +typedef struct _GstRsDemuxer GstRsDemuxer; +typedef struct _GstRsDemuxerClass GstRsDemuxerClass; + +struct _GstRsDemuxer { + GstElement element; + + gpointer instance; + + GstPad *sinkpad; + guint64 offset; + guint64 upstream_size; + + GstPad *srcpads[32]; + guint n_srcpads; + guint32 group_id; + + GstSegment segment; + guint32 segment_seqnum; + + GstFlowCombiner *flow_combiner; +}; + +struct _GstRsDemuxerClass { + GstElementClass parent_class; +}; + +G_GNUC_INTERNAL gboolean gst_rs_demuxer_plugin_init (GstPlugin * plugin); + +G_END_DECLS + +#endif /* __GST_RS_DEMUXER_H__ */ diff --git a/src/rsdemuxer.rs b/src/rsdemuxer.rs new file mode 100644 index 00000000..c0f62743 --- /dev/null +++ b/src/rsdemuxer.rs @@ -0,0 +1,433 @@ +// Copyright (C) 2016 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +// Boston, MA 02110-1301, USA. + +use libc::c_char; +use std::os::raw::c_void; +use std::ffi::CString; + +use std::panic::{self, AssertUnwindSafe}; + +use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; + +use std::u32; +use std::u64; + +use utils::*; +use error::*; +use buffer::*; + +pub type StreamIndex = u32; + +#[derive(Debug)] +pub enum SeekResult { + TooEarly, + Ok(u64), + Eos, +} + +#[derive(Debug)] +pub enum HandleBufferResult { + NeedMoreData, + Again, + // NeedDataFromOffset(u64), + StreamAdded(Stream), + HaveAllStreams, + StreamChanged(Stream), + // StreamsAdded(Vec), // Implies HaveAllStreams + // StreamsChanged(Vec), + // TODO need something to replace/add new streams + // TODO should probably directly implement the GstStreams new world order + BufferForStream(StreamIndex, Buffer), + Eos(Option), +} + +pub trait Demuxer { + fn start(&mut self, + upstream_size: Option, + random_access: bool) + -> Result<(), ErrorMessage>; + fn stop(&mut self) -> Result<(), ErrorMessage>; + + fn seek(&mut self, start: u64, stop: Option) -> Result; + fn handle_buffer(&mut self, buffer: Option) -> Result; + fn end_of_stream(&mut self) -> Result<(), ErrorMessage>; + + fn is_seekable(&self) -> bool; + fn get_position(&self) -> Option; + fn get_duration(&self) -> Option; +} + +#[derive(Debug)] +pub struct Stream { + pub index: StreamIndex, + pub format: String, + pub stream_id: String, +} + +impl Stream { + pub fn new(index: StreamIndex, format: String, stream_id: String) -> Stream { + Stream { + index: index, + format: format, + stream_id: stream_id, + } + } +} + +pub struct DemuxerWrapper { + raw: *mut c_void, + demuxer: Mutex>, + panicked: AtomicBool, +} + +impl DemuxerWrapper { + fn new(raw: *mut c_void, demuxer: Box) -> DemuxerWrapper { + DemuxerWrapper { + raw: raw, + demuxer: Mutex::new(demuxer), + panicked: AtomicBool::new(false), + } + } + + fn start(&self, upstream_size: u64, random_access: bool) -> bool { + let demuxer = &mut self.demuxer.lock().unwrap(); + + let upstream_size = if upstream_size == u64::MAX { + None + } else { + Some(upstream_size) + }; + + match demuxer.start(upstream_size, random_access) { + Ok(..) => true, + Err(ref msg) => { + self.post_message(msg); + false + } + } + + } + fn stop(&self) -> bool { + let demuxer = &mut self.demuxer.lock().unwrap(); + + match demuxer.stop() { + Ok(..) => true, + Err(ref msg) => { + self.post_message(msg); + false + } + } + } + + fn is_seekable(&self) -> bool { + let demuxer = &self.demuxer.lock().unwrap(); + + demuxer.is_seekable() + } + + + fn get_position(&self, position: &mut u64) -> GBoolean { + let demuxer = &self.demuxer.lock().unwrap(); + + match demuxer.get_position() { + None => { + *position = u64::MAX; + GBoolean::False + } + Some(pos) => { + *position = pos; + GBoolean::True + } + } + + } + + fn get_duration(&self, position: &mut u64) -> GBoolean { + let demuxer = &self.demuxer.lock().unwrap(); + + match demuxer.get_duration() { + None => { + *position = u64::MAX; + GBoolean::False + } + Some(pos) => { + *position = pos; + GBoolean::True + } + } + } + + fn seek(&self, start: u64, stop: u64, offset: &mut u64) -> bool { + extern "C" { + fn gst_rs_demuxer_stream_eos(raw: *mut c_void, index: u32); + }; + + let stop = if stop == u64::MAX { None } else { Some(stop) }; + + let res = { + let mut demuxer = &mut self.demuxer.lock().unwrap(); + + match demuxer.seek(start, stop) { + Ok(res) => res, + Err(ref msg) => { + self.post_message(msg); + return false; + } + } + }; + + match res { + SeekResult::TooEarly => false, + SeekResult::Ok(off) => { + *offset = off; + true + } + SeekResult::Eos => { + *offset = u64::MAX; + + unsafe { + gst_rs_demuxer_stream_eos(self.raw, u32::MAX); + } + + true + } + } + } + + fn handle_buffer(&self, buffer: Buffer) -> GstFlowReturn { + extern "C" { + fn gst_rs_demuxer_stream_eos(raw: *mut c_void, index: u32); + fn gst_rs_demuxer_add_stream(raw: *mut c_void, + index: u32, + format: *const c_char, + stream_id: *const c_char); + fn gst_rs_demuxer_added_all_streams(raw: *mut c_void); + // fn gst_rs_demuxer_remove_all_streams(raw: *mut c_void); + fn gst_rs_demuxer_stream_format_changed(raw: *mut c_void, + index: u32, + format: *const c_char); + fn gst_rs_demuxer_stream_push_buffer(raw: *mut c_void, + index: u32, + buffer: *mut c_void) + -> GstFlowReturn; + }; + + let mut res = { + let mut demuxer = &mut self.demuxer.lock().unwrap(); + + match demuxer.handle_buffer(Some(buffer)) { + Ok(res) => res, + Err(flow_error) => { + match flow_error { + FlowError::NotNegotiated(ref msg) | + FlowError::Error(ref msg) => self.post_message(msg), + _ => (), + } + return flow_error.to_native(); + } + } + }; + + // Loop until AllEos, NeedMoreData or error when pushing downstream + loop { + match res { + HandleBufferResult::NeedMoreData => { + return GstFlowReturn::Ok; + } + HandleBufferResult::StreamAdded(stream) => { + let format_cstr = CString::new(stream.format.as_bytes()).unwrap(); + let stream_id_cstr = CString::new(stream.stream_id.as_bytes()).unwrap(); + + unsafe { + gst_rs_demuxer_add_stream(self.raw, + stream.index, + format_cstr.as_ptr(), + stream_id_cstr.as_ptr()); + } + } + HandleBufferResult::HaveAllStreams => unsafe { + gst_rs_demuxer_added_all_streams(self.raw); + }, + HandleBufferResult::StreamChanged(stream) => { + let format_cstr = CString::new(stream.format.as_bytes()).unwrap(); + + unsafe { + gst_rs_demuxer_stream_format_changed(self.raw, + stream.index, + format_cstr.as_ptr()); + } + } + HandleBufferResult::BufferForStream(index, buffer) => { + let flow_ret = unsafe { + gst_rs_demuxer_stream_push_buffer(self.raw, index, buffer.into_ptr()) + }; + if flow_ret != GstFlowReturn::Ok { + return flow_ret; + } + } + HandleBufferResult::Eos(index) => { + let index = index.unwrap_or(u32::MAX); + + unsafe { + gst_rs_demuxer_stream_eos(self.raw, index); + } + + return GstFlowReturn::Eos; + } + HandleBufferResult::Again => { + // nothing, just call again + } + }; + + res = { + let mut demuxer = &mut self.demuxer.lock().unwrap(); + match demuxer.handle_buffer(None) { + Ok(res) => res, + Err(flow_error) => { + match flow_error { + FlowError::NotNegotiated(ref msg) | + FlowError::Error(ref msg) => self.post_message(msg), + _ => (), + } + return flow_error.to_native(); + } + } + } + } + } + + fn end_of_stream(&self) { + let mut demuxer = &mut self.demuxer.lock().unwrap(); + + match demuxer.end_of_stream() { + Ok(_) => (), + Err(ref msg) => { + self.post_message(msg); + } + } + } + + fn post_message(&self, msg: &ErrorMessage) { + unsafe { + msg.post(self.raw); + } + } +} + +#[no_mangle] +pub extern "C" fn demuxer_new(demuxer: *mut c_void, + create_instance: fn() -> Box) + -> *mut DemuxerWrapper { + Box::into_raw(Box::new(DemuxerWrapper::new(demuxer, create_instance()))) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_drop(ptr: *mut DemuxerWrapper) { + let _ = Box::from_raw(ptr); +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_start(ptr: *const DemuxerWrapper, + upstream_size: u64, + random_access: GBoolean) + -> GBoolean { + let wrap: &DemuxerWrapper = &*ptr; + + panic_to_error!(wrap, GBoolean::False, { + GBoolean::from_bool(wrap.start(upstream_size, random_access.to_bool())) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_stop(ptr: *const DemuxerWrapper) -> GBoolean { + let wrap: &DemuxerWrapper = &*ptr; + + panic_to_error!(wrap, GBoolean::False, { + GBoolean::from_bool(wrap.stop()) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_is_seekable(ptr: *const DemuxerWrapper) -> GBoolean { + let wrap: &DemuxerWrapper = &*ptr; + + panic_to_error!(wrap, GBoolean::False, { + GBoolean::from_bool(wrap.is_seekable()) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_get_position(ptr: *const DemuxerWrapper, + position: *mut u64) + -> GBoolean { + let wrap: &DemuxerWrapper = &*ptr; + + panic_to_error!(wrap, GBoolean::False, { + let position = &mut *position; + wrap.get_position(position) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_get_duration(ptr: *const DemuxerWrapper, + duration: *mut u64) + -> GBoolean { + let wrap: &DemuxerWrapper = &*ptr; + + panic_to_error!(wrap, GBoolean::False, { + let duration = &mut *duration; + wrap.get_duration(duration) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_seek(ptr: *mut DemuxerWrapper, + start: u64, + stop: u64, + offset: *mut u64) + -> GBoolean { + + let wrap: &mut DemuxerWrapper = &mut *ptr; + + panic_to_error!(wrap, GBoolean::False, { + let offset = &mut *offset; + + GBoolean::from_bool(wrap.seek(start, stop, offset)) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_handle_buffer(ptr: *mut DemuxerWrapper, + buffer: *mut c_void) + -> GstFlowReturn { + let wrap: &mut DemuxerWrapper = &mut *ptr; + + panic_to_error!(wrap, GstFlowReturn::Error, { + let buffer = Buffer::new_from_ptr_owned(buffer); + wrap.handle_buffer(buffer) + }) +} + +#[no_mangle] +pub unsafe extern "C" fn demuxer_end_of_stream(ptr: *mut DemuxerWrapper) { + let wrap: &mut DemuxerWrapper = &mut *ptr; + + panic_to_error!(wrap, (), { + wrap.end_of_stream(); + }) +}