From 4b4ae6d52c2e9db3cd9941fb8d422bfc252ac184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 3 Jan 2019 23:53:06 +0200 Subject: [PATCH] flvdemux: Port to new subclassing API Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/issues/49 --- gst-plugin-flv/Cargo.toml | 9 +- gst-plugin-flv/src/bytes.rs | 140 +++ gst-plugin-flv/src/flvdemux.rs | 1889 +++++++++++++++++++------------- gst-plugin-flv/src/lib.rs | 51 +- 4 files changed, 1296 insertions(+), 793 deletions(-) create mode 100644 gst-plugin-flv/src/bytes.rs diff --git a/gst-plugin-flv/Cargo.toml b/gst-plugin-flv/Cargo.toml index fe8d8306..620ebe0d 100644 --- a/gst-plugin-flv/Cargo.toml +++ b/gst-plugin-flv/Cargo.toml @@ -4,17 +4,20 @@ version = "0.4.0" authors = ["Sebastian Dröge "] repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" license = "MIT/Apache-2.0" +edition = "2018" [dependencies] url = "1.1" glib = { git = "https://github.com/gtk-rs/glib" } -gst-plugin = { path="../gst-plugin" } -gst-plugin-simple = { path="../gst-plugin-simple" } -gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } +gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } num-rational = { version = "0.2", default-features = false, features = [] } nom = "3.0" flavors = {git = "https://github.com/rust-av/flavors.git"} muldiv = "0.2" +byteorder = "1.0" +lazy_static = "1.0" +smallvec = "0.6" [lib] name = "gstrsflv" diff --git a/gst-plugin-flv/src/bytes.rs b/gst-plugin-flv/src/bytes.rs new file mode 100644 index 00000000..d16be1d1 --- /dev/null +++ b/gst-plugin-flv/src/bytes.rs @@ -0,0 +1,140 @@ +// Copyright (C) 2016-2017 Sebastian Dröge +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +pub use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use std::io; + +pub trait ReadBytesExtShort: io::Read { + fn read_u16le(&mut self) -> io::Result { + self.read_u16::() + } + fn read_i16le(&mut self) -> io::Result { + self.read_i16::() + } + fn read_u32le(&mut self) -> io::Result { + self.read_u32::() + } + fn read_i32le(&mut self) -> io::Result { + self.read_i32::() + } + fn read_u64le(&mut self) -> io::Result { + self.read_u64::() + } + fn read_i64le(&mut self) -> io::Result { + self.read_i64::() + } + fn read_uintle(&mut self, nbytes: usize) -> io::Result { + self.read_uint::(nbytes) + } + fn read_intle(&mut self, nbytes: usize) -> io::Result { + self.read_int::(nbytes) + } + fn read_f32le(&mut self) -> io::Result { + self.read_f32::() + } + fn read_f64le(&mut self) -> io::Result { + self.read_f64::() + } + fn read_u16be(&mut self) -> io::Result { + self.read_u16::() + } + fn read_i16be(&mut self) -> io::Result { + self.read_i16::() + } + fn read_u32be(&mut self) -> io::Result { + self.read_u32::() + } + fn read_i32be(&mut self) -> io::Result { + self.read_i32::() + } + fn read_u64be(&mut self) -> io::Result { + self.read_u64::() + } + fn read_i64be(&mut self) -> io::Result { + self.read_i64::() + } + fn read_uintbe(&mut self, nbytes: usize) -> io::Result { + self.read_uint::(nbytes) + } + fn read_intbe(&mut self, nbytes: usize) -> io::Result { + self.read_int::(nbytes) + } + fn read_f32be(&mut self) -> io::Result { + self.read_f32::() + } + fn read_f64be(&mut self) -> io::Result { + self.read_f64::() + } +} + +impl ReadBytesExtShort for T where T: ReadBytesExt {} + +pub trait WriteBytesExtShort: WriteBytesExt { + fn write_u16le(&mut self, n: u16) -> io::Result<()> { + self.write_u16::(n) + } + fn write_i16le(&mut self, n: i16) -> io::Result<()> { + self.write_i16::(n) + } + fn write_u32le(&mut self, n: u32) -> io::Result<()> { + self.write_u32::(n) + } + fn write_i32le(&mut self, n: i32) -> io::Result<()> { + self.write_i32::(n) + } + fn write_u64le(&mut self, n: u64) -> io::Result<()> { + self.write_u64::(n) + } + fn write_i64le(&mut self, n: i64) -> io::Result<()> { + self.write_i64::(n) + } + fn write_uintle(&mut self, n: u64, nbytes: usize) -> io::Result<()> { + self.write_uint::(n, nbytes) + } + fn write_intle(&mut self, n: i64, nbytes: usize) -> io::Result<()> { + self.write_int::(n, nbytes) + } + fn write_f32le(&mut self, n: f32) -> io::Result<()> { + self.write_f32::(n) + } + fn write_f64le(&mut self, n: f64) -> io::Result<()> { + self.write_f64::(n) + } + fn write_u16be(&mut self, n: u16) -> io::Result<()> { + self.write_u16::(n) + } + fn write_i16be(&mut self, n: i16) -> io::Result<()> { + self.write_i16::(n) + } + fn write_u32be(&mut self, n: u32) -> io::Result<()> { + self.write_u32::(n) + } + fn write_i32be(&mut self, n: i32) -> io::Result<()> { + self.write_i32::(n) + } + fn write_u64be(&mut self, n: u64) -> io::Result<()> { + self.write_u64::(n) + } + fn write_i64be(&mut self, n: i64) -> io::Result<()> { + self.write_i64::(n) + } + fn write_uintbe(&mut self, n: u64, nbytes: usize) -> io::Result<()> { + self.write_uint::(n, nbytes) + } + fn write_intbe(&mut self, n: i64, nbytes: usize) -> io::Result<()> { + self.write_int::(n, nbytes) + } + fn write_f32be(&mut self, n: f32) -> io::Result<()> { + self.write_f32::(n) + } + fn write_f64be(&mut self, n: f64) -> io::Result<()> { + self.write_f64::(n) + } +} + +impl WriteBytesExtShort for T where T: WriteBytesExt {} diff --git a/gst-plugin-flv/src/flvdemux.rs b/gst-plugin-flv/src/flvdemux.rs index 3a783cb7..4c43f345 100644 --- a/gst-plugin-flv/src/flvdemux.rs +++ b/gst-plugin-flv/src/flvdemux.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2016-2017 Sebastian Dröge +// Copyright (C) 2016-2018 Sebastian Dröge // // Licensed under the Apache License, Version 2.0 or the MIT license @@ -7,25 +7,45 @@ // except according to those terms. use std::cmp; -use std::io::{Cursor, Write}; +use std::sync::Mutex; use nom; use nom::IResult; -use flavors::parser as flavors; +// FIXME: rustfmt removes the :: but they're required here +#[rustfmt::skip] +use ::flavors::parser as flavors; -use gst_plugin::adapter::*; -use gst_plugin::bytes::*; -use gst_plugin::element::*; -use gst_plugin::error::*; -use gst_plugin_simple::demuxer::*; - -use gst; +use crate::gst; +use crate::gst::prelude::*; +use crate::gst::subclass::prelude::*; +use crate::gst_base; +use glib; +use glib::subclass; use num_rational::Rational32; -const AUDIO_STREAM_ID: u32 = 0; -const VIDEO_STREAM_ID: u32 = 1; +use smallvec::SmallVec; + +lazy_static! { + static ref CAT: gst::DebugCategory = { + gst::DebugCategory::new( + "rsflvdemux", + gst::DebugColorFlags::empty(), + "Rust FLV demuxer", + ) + }; +} + +#[derive(Debug)] +struct FlvDemux { + sinkpad: gst::Pad, + audio_srcpad: Mutex>, + video_srcpad: Mutex>, + adapter: Mutex, + flow_combiner: Mutex, + state: Mutex, +} #[derive(Debug)] enum State { @@ -36,7 +56,20 @@ enum State { video: bool, skip_left: u32, }, - Streaming, + Streaming(StreamingState), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Stream { + Audio, + Video, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +enum Event { + StreamChanged(Stream, gst::Caps), + Buffer(Stream, gst::Buffer), + HaveAllStreams, } #[derive(Debug)] @@ -54,6 +87,593 @@ struct StreamingState { avc_sequence_header: Option, } +#[derive(Debug, Eq, Clone)] +struct AudioFormat { + format: flavors::SoundFormat, + rate: u16, + width: u8, + channels: u8, + bitrate: Option, + aac_sequence_header: Option, +} + +#[derive(Debug, Eq, Clone)] +struct VideoFormat { + format: flavors::CodecId, + width: Option, + height: Option, + pixel_aspect_ratio: Option, + framerate: Option, + bitrate: Option, + avc_sequence_header: Option, +} + +#[derive(Debug, PartialEq, Eq, Clone, Default)] +struct Metadata { + duration: gst::ClockTime, + + creation_date: Option, + creator: Option, + title: Option, + metadata_creator: Option, /* TODO: seek_table: _, + * filepositions / times metadata arrays */ + + audio_bitrate: Option, + + video_width: Option, + video_height: Option, + video_pixel_aspect_ratio: Option, + video_framerate: Option, + video_bitrate: Option, +} + +impl ObjectSubclass for FlvDemux { + const NAME: &'static str = "RsFlvDemux"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sinkpad = gst::Pad::new_from_template(&templ, "sink"); + + sinkpad.set_activate_function(|pad, parent| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux, element| demux.sink_activate(pad, element), + ) + }); + + sinkpad.set_activatemode_function(|pad, parent, mode, active| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux, element| demux.sink_activatemode(pad, element, mode, active), + ) + }); + + sinkpad.set_chain_function(|pad, parent, buffer| { + FlvDemux::catch_panic_pad_function( + parent, + || gst::FlowReturn::Error, + |demux, element| demux.sink_chain(pad, element, buffer), + ) + }); + sinkpad.set_event_function(|pad, parent, event| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux, element| demux.sink_event(pad, element, event), + ) + }); + + FlvDemux { + sinkpad, + audio_srcpad: Mutex::new(None), + video_srcpad: Mutex::new(None), + state: Mutex::new(State::Stopped), + adapter: Mutex::new(gst_base::UniqueAdapter::new()), + flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), + } + } + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "FLV Demuxer", + "Codec/Demuxer", + "Demuxes FLV Streams", + "Sebastian Dröge ", + ); + + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + + caps.append( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", &1i32) + .build(), + ); + caps.append( + gst::Caps::builder("audio/x-raw") + .field("layout", &"interleaved") + .field("format", &gst::List::new(&[&"U8", &"S16LE"])) + .build(), + ); + caps.append( + gst::Caps::builder("audio/x-adpcm") + .field("layout", &"swf") + .build(), + ); + caps.append(gst::Caps::builder("audio/x-nellymoser").build()); + caps.append(gst::Caps::builder("audio/x-alaw").build()); + caps.append(gst::Caps::builder("audio/x-mulaw").build()); + caps.append( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", &4i32) + .field("framed", &true) + .field("stream-format", &"raw") + .build(), + ); + caps.append(gst::Caps::builder("audio/x-speex").build()); + } + let audiosrc_pad_template = gst::PadTemplate::new( + "audio", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ); + klass.add_pad_template(audiosrc_pad_template); + + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + + caps.append( + gst::Caps::builder("video/x-flash-video") + .field("flvversion", &1i32) + .build(), + ); + caps.append(gst::Caps::builder("video/x-flash-screen").build()); + caps.append(gst::Caps::builder("video/x-vp6-flash").build()); + caps.append(gst::Caps::builder("video/x-vp6-flash-alpha").build()); + caps.append(gst::Caps::builder("video/x-flash-screen2").build()); + caps.append( + gst::Caps::builder("video/x-h264") + .field("stream-format", &"avc") + .build(), + ); + caps.append(gst::Caps::builder("video/x-h263").build()); + caps.append( + gst::Caps::builder("video/mpeg") + .field("mpegversion", &4i32) + .build(), + ); + } + let videosrc_pad_template = gst::PadTemplate::new( + "video", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ); + klass.add_pad_template(videosrc_pad_template); + + let caps = gst::Caps::builder("video/x-flv").build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(sink_pad_template); + } +} + +impl ObjectImpl for FlvDemux { + glib_object_impl!(); + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(&self.sinkpad).unwrap(); + } +} + +impl ElementImpl for FlvDemux {} + +impl FlvDemux { + fn sink_activate(&self, pad: &gst::Pad, _element: &gst::Element) -> bool { + let mode = { + let mut query = gst::Query::new_scheduling(); + if !pad.peer_query(&mut query) { + return false; + } + + // TODO: pull mode + if false + && query.has_scheduling_mode_with_flags( + gst::PadMode::Pull, + gst::SchedulingFlags::SEEKABLE, + ) + { + gst_debug!(CAT, obj: pad, "Activating in Pull mode"); + gst::PadMode::Pull + } else { + gst_debug!(CAT, obj: pad, "Activating in Push mode"); + gst::PadMode::Push + } + }; + + pad.activate_mode(mode, true).is_ok() + } + + fn sink_activatemode( + &self, + _pad: &gst::Pad, + element: &gst::Element, + mode: gst::PadMode, + active: bool, + ) -> bool { + if active { + if let Err(err) = self.start(element, mode) { + element.post_error_message(&err); + return false; + } + + if mode == gst::PadMode::Pull { + // TODO implement pull mode + // self.sinkpad.start_task(...) + unimplemented!(); + } + + true + } else { + if mode == gst::PadMode::Pull { + let _ = self.sinkpad.stop_task(); + } + + if let Err(err) = self.stop(element) { + element.post_error_message(&err); + return false; + } + + true + } + } + + fn start(&self, _element: &gst::Element, _mode: gst::PadMode) -> Result<(), gst::ErrorMessage> { + *self.state.lock().unwrap() = State::NeedHeader; + + Ok(()) + } + + fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + *self.state.lock().unwrap() = State::Stopped; + self.adapter.lock().unwrap().clear(); + + let mut flow_combiner = self.flow_combiner.lock().unwrap(); + if let Some(pad) = self.audio_srcpad.lock().unwrap().take() { + element.remove_pad(&pad).unwrap(); + flow_combiner.remove_pad(&pad); + } + + if let Some(pad) = self.video_srcpad.lock().unwrap().take() { + element.remove_pad(&pad).unwrap(); + flow_combiner.remove_pad(&pad); + } + + flow_combiner.reset(); + + Ok(()) + } + + fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { + use crate::gst::EventView; + + gst_log!(CAT, obj: pad, "Handling event {:?}", event); + match event.view() { + EventView::Eos(..) => { + // TODO implement + pad.event_default(element, event) + } + EventView::Segment(..) => { + // TODO implement + pad.event_default(element, event) + } + EventView::FlushStart(..) => { + // TODO implement + pad.event_default(element, event) + } + EventView::FlushStop(..) => { + // TODO implement + pad.event_default(element, event) + } + _ => pad.event_default(element, event), + } + } + + fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool { + use crate::gst::QueryView; + + match query.view_mut() { + QueryView::Position(ref mut q) => { + let fmt = q.get_format(); + if fmt == gst::Format::Time { + if self.sinkpad.peer_query(q.get_mut_query()) { + return true; + } + + if let State::Streaming(StreamingState { last_position, .. }) = + *self.state.lock().unwrap() + { + q.set(last_position); + return true; + } + + false + } else { + false + } + } + QueryView::Duration(ref mut q) => { + let fmt = q.get_format(); + if fmt == gst::Format::Time { + if self.sinkpad.peer_query(q.get_mut_query()) { + return true; + } + + if let State::Streaming(StreamingState { + metadata: Some(Metadata { duration, .. }), + .. + }) = *self.state.lock().unwrap() + { + q.set(duration); + return true; + } + + false + } else { + false + } + } + _ => pad.query_default(element, query), + } + } + + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { + use crate::gst::EventView; + + match event.view() { + EventView::Seek(..) => { + // TODO: Implement + false + } + _ => pad.event_default(element, event), + } + } + + fn sink_chain( + &self, + pad: &gst::Pad, + element: &gst::Element, + buffer: gst::Buffer, + ) -> gst::FlowReturn { + gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + + let mut adapter = self.adapter.lock().unwrap(); + adapter.push(buffer); + + let mut state = self.state.lock().unwrap(); + loop { + match *state { + State::Stopped => unreachable!(), + State::NeedHeader => { + let header = match self.find_header(element, &mut *adapter) { + Ok(header) => header, + Err(_) => { + gst_trace!(CAT, obj: element, "Need more data"); + return gst::FlowReturn::Ok; + } + }; + + let skip = if header.offset < 9 { + 0 + } else { + header.offset - 9 + }; + + *state = State::Skipping { + audio: header.audio, + video: header.video, + skip_left: skip, + }; + } + State::Skipping { + audio, + video, + skip_left: 0, + } => { + *state = State::Streaming(StreamingState::new(audio, video)); + } + State::Skipping { + ref mut skip_left, .. + } => { + let avail = adapter.available(); + if avail == 0 { + gst_trace!(CAT, obj: element, "Need more data"); + return gst::FlowReturn::Ok; + } + let skip = cmp::min(avail, *skip_left as usize); + adapter.flush(skip); + *skip_left -= skip as u32; + } + State::Streaming(ref mut sstate) => { + let res = sstate.handle_tag(element, &mut *adapter); + + match res { + Ok(None) => { + gst_trace!(CAT, obj: element, "Need more data"); + return gst::FlowReturn::Ok; + } + Ok(Some(events)) => { + drop(state); + drop(adapter); + + if let Err(err) = self.handle_events(element, events) { + return err.into(); + } + + adapter = self.adapter.lock().unwrap(); + state = self.state.lock().unwrap(); + } + Err(err) => { + element.post_error_message(&err); + return gst::FlowReturn::Error; + } + } + } + } + } + } + + fn find_header( + &self, + element: &gst::Element, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result { + while adapter.available() >= 9 { + let data = adapter.map(9).unwrap(); + + if let IResult::Done(_, header) = flavors::header(&*data) { + gst_debug!(CAT, obj: element, "Found FLV header: {:?}", header); + drop(data); + adapter.flush(9); + + return Ok(header); + } + + drop(data); + adapter.flush(1); + } + + Err(()) + } + + fn handle_events( + &self, + element: &gst::Element, + events: SmallVec<[Event; 4]>, + ) -> Result { + for event in events { + match event { + Event::StreamChanged(stream, caps) => { + let pad = match stream { + Stream::Audio => { + let mut audio_srcpad = self.audio_srcpad.lock().unwrap(); + if let Some(ref srcpad) = *audio_srcpad { + srcpad.clone() + } else { + let srcpad = self.create_srcpad(element, "audio", &caps); + *audio_srcpad = Some(srcpad.clone()); + + srcpad + } + } + Stream::Video => { + let mut video_srcpad = self.video_srcpad.lock().unwrap(); + if let Some(ref srcpad) = *video_srcpad { + srcpad.clone() + } else { + let srcpad = self.create_srcpad(element, "video", &caps); + + *video_srcpad = Some(srcpad.clone()); + + srcpad + } + } + }; + + pad.push_event(gst::Event::new_caps(&caps).build()); + } + Event::Buffer(stream, buffer) => { + let pad = match stream { + Stream::Audio => { + self.audio_srcpad.lock().unwrap().as_ref().map(Clone::clone) + } + Stream::Video => { + self.video_srcpad.lock().unwrap().as_ref().map(Clone::clone) + } + }; + + if let Some(pad) = pad { + let res = pad.push(buffer); + gst_trace!( + CAT, + obj: element, + "Pushing buffer for stream {:?} returned {:?}", + stream, + res + ); + + self.flow_combiner + .lock() + .unwrap() + .update_pad_flow(&pad, res) + .into_result()?; + } + } + Event::HaveAllStreams => { + element.no_more_pads(); + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn create_srcpad(&self, element: &gst::Element, name: &str, caps: &gst::Caps) -> gst::Pad { + let templ = element.get_element_class().get_pad_template(name).unwrap(); + let srcpad = gst::Pad::new_from_template(&templ, name); + + srcpad.set_event_function(|pad, parent, event| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux, element| demux.src_event(pad, element, event), + ) + }); + + srcpad.set_query_function(|pad, parent, query| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux, element| demux.src_query(pad, element, query), + ) + }); + + srcpad.set_active(true).unwrap(); + + let full_stream_id = srcpad.create_stream_id(element, name).unwrap(); + // FIXME group id + srcpad.push_event(gst::Event::new_stream_start(&full_stream_id).build()); + srcpad.push_event(gst::Event::new_caps(&caps).build()); + + // FIXME proper segment handling + let segment = gst::FormattedSegment::::default(); + srcpad.push_event(gst::Event::new_segment(&segment).build()); + + self.flow_combiner.lock().unwrap().add_pad(&srcpad); + + element.add_pad(&srcpad).unwrap(); + + srcpad + } +} + impl StreamingState { fn new(audio: bool, video: bool) -> StreamingState { StreamingState { @@ -68,16 +688,492 @@ impl StreamingState { avc_sequence_header: None, } } -} -#[derive(Debug, Eq, Clone)] -struct AudioFormat { - format: flavors::SoundFormat, - rate: u16, - width: u8, - channels: u8, - bitrate: Option, - aac_sequence_header: Option, + fn handle_tag( + &mut self, + element: &gst::Element, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result>, gst::ErrorMessage> { + if adapter.available() < 15 { + return Ok(None); + } + + let data = adapter.map(15).unwrap(); + + match nom::be_u32(&data[0..4]) { + IResult::Error(_) | IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, previous_size) => { + gst_trace!(CAT, obj: element, "Previous tag size {}", previous_size); + // Nothing to do here, we just consume it for now + } + } + + let tag_header = match flavors::tag_header(&data[4..]) { + IResult::Error(_) | IResult::Incomplete(_) => unimplemented!(), + IResult::Done(_, tag_header) => tag_header, + }; + + gst_trace!(CAT, obj: element, "Parsed tag header {:?}", tag_header); + + drop(data); + + if adapter.available() < (15 + tag_header.data_size) as usize { + return Ok(None); + } + + adapter.flush(15); + + match tag_header.tag_type { + flavors::TagType::Script => { + gst_trace!(CAT, obj: element, "Found script tag"); + + self.handle_script_tag(element, &tag_header, adapter) + } + flavors::TagType::Audio => { + gst_trace!(CAT, obj: element, "Found audio tag"); + + self.handle_audio_tag(element, &tag_header, adapter) + } + flavors::TagType::Video => { + gst_trace!(CAT, obj: element, "Found video tag"); + + self.handle_video_tag(element, &tag_header, adapter) + } + } + .map(Option::Some) + } + + fn handle_script_tag( + &mut self, + element: &gst::Element, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result, gst::ErrorMessage> { + assert!(adapter.available() >= tag_header.data_size as usize); + + let mut events = SmallVec::new(); + + let data = adapter.map(tag_header.data_size as usize).unwrap(); + + match flavors::script_data(&*data) { + IResult::Done(_, ref script_data) if script_data.name == "onMetaData" => { + gst_trace!(CAT, obj: element, "Got script tag: {:?}", script_data); + + let metadata = Metadata::new(script_data); + gst_debug!(CAT, obj: element, "Got metadata: {:?}", metadata); + + let audio_changed = self + .audio + .as_mut() + .map(|a| a.update_with_metadata(&metadata)) + .unwrap_or(false); + let video_changed = self + .video + .as_mut() + .map(|v| v.update_with_metadata(&metadata)) + .unwrap_or(false); + self.metadata = Some(metadata); + + if audio_changed || video_changed { + if audio_changed { + if let Some(caps) = self.audio.as_ref().and_then(|a| a.to_caps()) { + events.push(Event::StreamChanged(Stream::Audio, caps)); + } + } + if video_changed { + if let Some(caps) = self.video.as_ref().and_then(|v| v.to_caps()) { + events.push(Event::StreamChanged(Stream::Video, caps)); + } + } + } + } + IResult::Done(_, ref script_data) => { + gst_trace!(CAT, obj: element, "Got script tag: {:?}", script_data); + } + IResult::Error(err) => { + gst_error!(CAT, obj: element, "Error parsing script tag: {:?}", err); + } + IResult::Incomplete(_) => { + // ignore + } + } + + drop(data); + adapter.flush(tag_header.data_size as usize); + + Ok(events) + } + + fn update_audio_stream( + &mut self, + element: &gst::Element, + data_header: &flavors::AudioDataHeader, + ) -> Result, gst::ErrorMessage> { + let mut events = SmallVec::new(); + + gst_trace!( + CAT, + obj: element, + "Got audio data header: {:?}", + data_header + ); + + let new_audio_format = + AudioFormat::new(data_header, &self.metadata, &self.aac_sequence_header); + + if self.audio.as_ref() != Some(&new_audio_format) { + gst_debug!( + CAT, + obj: element, + "Got new audio format: {:?}", + new_audio_format + ); + + let caps = new_audio_format.to_caps(); + if let Some(caps) = caps { + self.audio = Some(new_audio_format); + events.push(Event::StreamChanged(Stream::Audio, caps)); + } else { + self.audio = None; + } + } + + if (!self.expect_video || self.video != None) && self.audio != None && !self.got_all_streams + { + gst_debug!(CAT, obj: element, "Have all expected streams now"); + self.got_all_streams = true; + events.push(Event::HaveAllStreams); + } + + Ok(events) + } + + fn handle_aac_audio_packet_header( + &mut self, + element: &gst::Element, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result { + // Not big enough for the AAC packet header, ship! + if tag_header.data_size < 1 + 1 { + adapter.flush((tag_header.data_size - 1) as usize); + gst_warning!( + CAT, + obj: element, + "Too small packet for AAC packet header {}", + tag_header.data_size + ); + return Ok(true); + } + + let data = adapter.map(1).unwrap(); + + match flavors::aac_audio_packet_header(&*data) { + IResult::Error(_) | IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, header) => { + gst_trace!(CAT, obj: element, "Got AAC packet header {:?}", header); + match header.packet_type { + flavors::AACPacketType::SequenceHeader => { + drop(data); + adapter.flush(1); + let buffer = adapter + .take_buffer((tag_header.data_size - 1 - 1) as usize) + .unwrap(); + gst_debug!(CAT, obj: element, "Got AAC sequence header {:?}", buffer,); + + self.aac_sequence_header = Some(buffer); + Ok(true) + } + flavors::AACPacketType::Raw => { + drop(data); + adapter.flush(1); + Ok(false) + } + } + } + } + } + + fn handle_audio_tag( + &mut self, + element: &gst::Element, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result, gst::ErrorMessage> { + assert!(adapter.available() >= tag_header.data_size as usize); + + let data = adapter.map(1).unwrap(); + let data_header = match flavors::audio_data_header(&*data) { + IResult::Error(_) | IResult::Incomplete(_) => unimplemented!(), + IResult::Done(_, data_header) => data_header, + }; + drop(data); + adapter.flush(1); + + let mut events = self.update_audio_stream(element, &data_header)?; + + // AAC special case + if data_header.sound_format == flavors::SoundFormat::AAC + && self.handle_aac_audio_packet_header(element, &tag_header, adapter)? + { + return Ok(events); + } + + let offset = match data_header.sound_format { + flavors::SoundFormat::AAC => 2, + _ => 1, + }; + + if tag_header.data_size == offset { + return Ok(events); + } + + if self.audio == None { + adapter.flush((tag_header.data_size - offset) as usize); + return Ok(events); + } + + let mut buffer = adapter + .take_buffer((tag_header.data_size - offset) as usize) + .unwrap(); + + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(tag_header.timestamp as u64)); + } + + gst_trace!( + CAT, + obj: element, + "Outputting audio buffer {:?} for tag {:?}", + buffer, + tag_header, + ); + + self.update_position(&buffer); + + events.push(Event::Buffer(Stream::Audio, buffer)); + + Ok(events) + } + + fn update_video_stream( + &mut self, + element: &gst::Element, + data_header: &flavors::VideoDataHeader, + ) -> Result, gst::ErrorMessage> { + let mut events = SmallVec::new(); + + gst_trace!( + CAT, + obj: element, + "Got video data header: {:?}", + data_header + ); + + let new_video_format = + VideoFormat::new(data_header, &self.metadata, &self.avc_sequence_header); + + if self.video.as_ref() != Some(&new_video_format) { + gst_debug!( + CAT, + obj: element, + "Got new video format: {:?}", + new_video_format + ); + + let caps = new_video_format.to_caps(); + if let Some(caps) = caps { + self.video = Some(new_video_format); + events.push(Event::StreamChanged(Stream::Video, caps)); + } else { + self.video = None; + } + } + + if (!self.expect_audio || self.audio != None) && self.video != None && !self.got_all_streams + { + gst_debug!(CAT, obj: element, "Have all expected streams now"); + self.got_all_streams = true; + events.push(Event::HaveAllStreams); + } + + Ok(events) + } + + fn handle_avc_video_packet_header( + &mut self, + element: &gst::Element, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result, gst::ErrorMessage> { + // Not big enough for the AVC packet header, skip! + if tag_header.data_size < 1 + 4 { + adapter.flush((tag_header.data_size - 1) as usize); + gst_warning!( + CAT, + obj: element, + "Too small packet for AVC packet header {}", + tag_header.data_size + ); + return Ok(None); + } + + let data = adapter.map(4).unwrap(); + match flavors::avc_video_packet_header(&*data) { + IResult::Error(_) | IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, header) => { + gst_trace!(CAT, obj: element, "Got AVC packet header {:?}", header); + match header.packet_type { + flavors::AVCPacketType::SequenceHeader => { + drop(data); + adapter.flush(4); + let buffer = adapter + .take_buffer((tag_header.data_size - 1 - 4) as usize) + .unwrap(); + gst_debug!( + CAT, + obj: element, + "Got AVC sequence header {:?} of size {}", + buffer, + tag_header.data_size - 1 - 4 + ); + + self.avc_sequence_header = Some(buffer); + Ok(None) + } + flavors::AVCPacketType::NALU => { + drop(data); + adapter.flush(4); + Ok(Some(header.composition_time)) + } + flavors::AVCPacketType::EndOfSequence => { + // Skip + drop(data); + adapter.flush((tag_header.data_size - 1) as usize); + Ok(None) + } + } + } + } + } + + fn handle_video_tag( + &mut self, + element: &gst::Element, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result, gst::ErrorMessage> { + assert!(adapter.available() >= tag_header.data_size as usize); + + let data = adapter.map(1).unwrap(); + let data_header = match flavors::video_data_header(&*data) { + IResult::Error(_) | IResult::Incomplete(_) => unimplemented!(), + IResult::Done(_, data_header) => data_header, + }; + drop(data); + adapter.flush(1); + + let mut events = self.update_video_stream(element, &data_header)?; + + // AVC/H264 special case + let cts = if data_header.codec_id == flavors::CodecId::H264 { + match self.handle_avc_video_packet_header(element, tag_header, adapter)? { + Some(cts) => cts, + None => { + return Ok(events); + } + } + } else { + 0 + }; + + let offset = match data_header.codec_id { + flavors::CodecId::H264 => 5, + _ => 1, + }; + + if tag_header.data_size == offset { + return Ok(events); + } + + if self.video == None { + adapter.flush((tag_header.data_size - offset) as usize); + return Ok(events); + } + + let is_keyframe = data_header.frame_type == flavors::FrameType::Key; + + let skip = match data_header.codec_id { + flavors::CodecId::VP6 | flavors::CodecId::VP6A => 1, + _ => 0, + }; + + if skip > 0 { + adapter.flush(skip as usize); + } + + if tag_header.data_size == offset + skip { + return Ok(events); + } + + let mut buffer = adapter + .take_buffer((tag_header.data_size - offset - skip) as usize) + .unwrap(); + + { + let buffer = buffer.get_mut().unwrap(); + if !is_keyframe { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + buffer.set_dts(gst::ClockTime::from_mseconds(tag_header.timestamp as u64)); + + // Prevent negative numbers + let pts = if cts < 0 && tag_header.timestamp < (-cts) as u32 { + 0 + } else { + ((tag_header.timestamp as i64) + (cts as i64)) as u64 + }; + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + } + + gst_trace!( + CAT, + obj: element, + "Outputting video buffer {:?} for tag {:?}, keyframe: {}", + buffer, + tag_header, + is_keyframe + ); + + self.update_position(&buffer); + + events.push(Event::Buffer(Stream::Video, buffer)); + + Ok(events) + } + + fn update_position(&mut self, buffer: &gst::Buffer) { + if buffer.get_pts() != gst::CLOCK_TIME_NONE { + let pts = buffer.get_pts(); + self.last_position = self + .last_position + .map(|last| cmp::max(last.into(), pts)) + .unwrap_or(pts); + } else if buffer.get_dts() != gst::CLOCK_TIME_NONE { + let dts = buffer.get_dts(); + self.last_position = self + .last_position + .map(|last| cmp::max(last.into(), dts)) + .unwrap_or(dts); + } + } } // Ignores bitrate @@ -180,6 +1276,9 @@ impl AudioFormat { ) }), flavors::SoundFormat::SPEEX => { + use crate::bytes::*; + use std::io::{Cursor, Write}; + let header = { let header_size = 80; let mut data = Cursor::new(Vec::with_capacity(header_size)); @@ -249,15 +1348,16 @@ impl AudioFormat { } } -#[derive(Debug, Eq, Clone)] -struct VideoFormat { - format: flavors::CodecId, - width: Option, - height: Option, - pixel_aspect_ratio: Option, - framerate: Option, - bitrate: Option, - avc_sequence_header: Option, +// Ignores bitrate +impl PartialEq for VideoFormat { + fn eq(&self, other: &Self) -> bool { + self.format.eq(&other.format) + && self.width.eq(&other.width) + && self.height.eq(&other.height) + && self.pixel_aspect_ratio.eq(&other.pixel_aspect_ratio) + && self.framerate.eq(&other.framerate) + && self.avc_sequence_header.eq(&other.avc_sequence_header) + } } impl VideoFormat { @@ -326,7 +1426,7 @@ impl VideoFormat { }), flavors::CodecId::H263 => Some(gst::Caps::new_simple("video/x-h263", &[])), flavors::CodecId::MPEG4Part2 => Some(gst::Caps::new_simple( - "video/x-h263", + "video/mpeg", &[("mpegversion", &4i32), ("systemstream", &false)], )), flavors::CodecId::JPEG => { @@ -369,54 +1469,11 @@ impl VideoFormat { } } -// Ignores bitrate -impl PartialEq for VideoFormat { - fn eq(&self, other: &Self) -> bool { - self.format.eq(&other.format) - && self.width.eq(&other.width) - && self.height.eq(&other.height) - && self.pixel_aspect_ratio.eq(&other.pixel_aspect_ratio) - && self.framerate.eq(&other.framerate) - && self.avc_sequence_header.eq(&other.avc_sequence_header) - } -} - -#[derive(Debug, PartialEq, Eq, Clone)] -struct Metadata { - duration: gst::ClockTime, - - creation_date: Option, - creator: Option, - title: Option, - metadata_creator: Option, /* TODO: seek_table: _, - * filepositions / times metadata arrays */ - - audio_bitrate: Option, - - video_width: Option, - video_height: Option, - video_pixel_aspect_ratio: Option, - video_framerate: Option, - video_bitrate: Option, -} - impl Metadata { fn new(script_data: &flavors::ScriptData) -> Metadata { assert_eq!(script_data.name, "onMetaData"); - let mut metadata = Metadata { - duration: gst::CLOCK_TIME_NONE, - creation_date: None, - creator: None, - title: None, - metadata_creator: None, - audio_bitrate: None, - video_width: None, - video_height: None, - video_pixel_aspect_ratio: None, - video_framerate: None, - video_bitrate: None, - }; + let mut metadata = Metadata::default(); let args = match script_data.arguments { flavors::ScriptDataValue::Object(ref objects) @@ -479,682 +1536,6 @@ impl Metadata { } } -pub struct FlvDemux { - cat: gst::DebugCategory, - state: State, - adapter: Adapter, - // Only in >= State::Streaming - streaming_state: Option, -} - -impl FlvDemux { - pub fn new(_demuxer: &Element) -> FlvDemux { - FlvDemux { - cat: gst::DebugCategory::new( - "rsflvdemux", - gst::DebugColorFlags::empty(), - "Rust FLV demuxer", - ), - state: State::Stopped, - adapter: Adapter::new(), - streaming_state: None, - } - } - - pub fn new_boxed(demuxer: &Element) -> Box { - Box::new(Self::new(demuxer)) - } - - fn handle_script_tag( - &mut self, - demuxer: &Element, - tag_header: &flavors::TagHeader, - ) -> Result { - if self.adapter.get_available() < (15 + tag_header.data_size) as usize { - return Ok(HandleBufferResult::NeedMoreData); - } - - self.adapter.flush(15).unwrap(); - - let buffer = self - .adapter - .get_buffer(tag_header.data_size as usize) - .unwrap(); - let map = buffer.map_readable().unwrap(); - let data = map.as_slice(); - - match flavors::script_data(data) { - IResult::Done(_, ref script_data) if script_data.name == "onMetaData" => { - gst_trace!(self.cat, obj: demuxer, "Got script tag: {:?}", script_data); - - let metadata = Metadata::new(script_data); - gst_debug!(self.cat, obj: demuxer, "Got metadata: {:?}", metadata); - - let streaming_state = self.streaming_state.as_mut().unwrap(); - - let audio_changed = streaming_state - .audio - .as_mut() - .map(|a| a.update_with_metadata(&metadata)) - .unwrap_or(false); - let video_changed = streaming_state - .video - .as_mut() - .map(|v| v.update_with_metadata(&metadata)) - .unwrap_or(false); - streaming_state.metadata = Some(metadata); - - if audio_changed || video_changed { - let mut streams = Vec::new(); - - if audio_changed { - if let Some(caps) = streaming_state.audio.as_ref().and_then(|a| a.to_caps()) - { - streams.push(Stream::new(AUDIO_STREAM_ID, caps, String::from("audio"))); - } - } - if video_changed { - if let Some(caps) = streaming_state.video.as_ref().and_then(|v| v.to_caps()) - { - streams.push(Stream::new(VIDEO_STREAM_ID, caps, String::from("video"))); - } - } - - return Ok(HandleBufferResult::StreamsChanged(streams)); - } - } - IResult::Done(_, ref script_data) => { - gst_trace!(self.cat, obj: demuxer, "Got script tag: {:?}", script_data); - } - IResult::Error(_) | IResult::Incomplete(_) => { - // ignore - } - } - - Ok(HandleBufferResult::Again) - } - - fn update_audio_stream( - &mut self, - demuxer: &Element, - data_header: &flavors::AudioDataHeader, - ) -> Result { - gst_trace!( - self.cat, - obj: demuxer, - "Got audio data header: {:?}", - data_header - ); - - let streaming_state = self.streaming_state.as_mut().unwrap(); - - let new_audio_format = AudioFormat::new( - data_header, - &streaming_state.metadata, - &streaming_state.aac_sequence_header, - ); - - if streaming_state.audio.as_ref() != Some(&new_audio_format) { - gst_debug!( - self.cat, - obj: demuxer, - "Got new audio format: {:?}", - new_audio_format - ); - let new_stream = streaming_state.audio == None; - - let caps = new_audio_format.to_caps(); - if let Some(caps) = caps { - streaming_state.audio = Some(new_audio_format); - let stream = Stream::new(AUDIO_STREAM_ID, caps, String::from("audio")); - if new_stream { - return Ok(HandleBufferResult::StreamAdded(stream)); - } else { - return Ok(HandleBufferResult::StreamChanged(stream)); - } - } else { - streaming_state.audio = None; - } - } - - if !streaming_state.got_all_streams - && streaming_state.audio != None - && (streaming_state.expect_video && streaming_state.video != None - || !streaming_state.expect_video) - { - streaming_state.got_all_streams = true; - return Ok(HandleBufferResult::HaveAllStreams); - } - - Ok(HandleBufferResult::Again) - } - - fn handle_audio_tag( - &mut self, - demuxer: &Element, - tag_header: &flavors::TagHeader, - data_header: &flavors::AudioDataHeader, - ) -> Result { - let res = self.update_audio_stream(demuxer, data_header); - match res { - Ok(HandleBufferResult::Again) => (), - _ => return res, - } - - if self.adapter.get_available() < (15 + tag_header.data_size) as usize { - return Ok(HandleBufferResult::NeedMoreData); - } - - // AAC special case - if data_header.sound_format == flavors::SoundFormat::AAC { - // Not big enough for the AAC packet header, ship! - if tag_header.data_size < 1 + 1 { - self.adapter - .flush(15 + tag_header.data_size as usize) - .unwrap(); - gst_warning!( - self.cat, - obj: demuxer, - "Too small packet for AAC packet header {}", - 15 + tag_header.data_size - ); - return Ok(HandleBufferResult::Again); - } - - let mut data = [0u8; 17]; - self.adapter.peek_into(&mut data).unwrap(); - match flavors::aac_audio_packet_header(&data[16..]) { - IResult::Error(_) | IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, header) => { - gst_trace!(self.cat, obj: demuxer, "Got AAC packet header {:?}", header); - match header.packet_type { - flavors::AACPacketType::SequenceHeader => { - self.adapter.flush(15 + 1 + 1).unwrap(); - let buffer = self - .adapter - .get_buffer((tag_header.data_size - 1 - 1) as usize) - .unwrap(); - gst_debug!( - self.cat, - obj: demuxer, - "Got AAC sequence header {:?} of size {}", - buffer, - tag_header.data_size - 1 - 1 - ); - - let streaming_state = self.streaming_state.as_mut().unwrap(); - streaming_state.aac_sequence_header = Some(buffer); - return Ok(HandleBufferResult::Again); - } - flavors::AACPacketType::Raw => { - // fall through - } - } - } - } - } - - let streaming_state = self.streaming_state.as_ref().unwrap(); - - if streaming_state.audio == None { - self.adapter - .flush((tag_header.data_size + 15) as usize) - .unwrap(); - return Ok(HandleBufferResult::Again); - } - - let audio = streaming_state.audio.as_ref().unwrap(); - self.adapter.flush(16).unwrap(); - - let offset = match audio.format { - flavors::SoundFormat::AAC => 1, - _ => 0, - }; - - if tag_header.data_size == 0 { - return Ok(HandleBufferResult::Again); - } - - if tag_header.data_size < offset { - self.adapter - .flush((tag_header.data_size - 1) as usize) - .unwrap(); - return Ok(HandleBufferResult::Again); - } - - if offset > 0 { - self.adapter.flush(offset as usize).unwrap(); - } - - let mut buffer = self - .adapter - .get_buffer((tag_header.data_size - 1 - offset) as usize) - .unwrap(); - - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(gst::ClockTime::from_mseconds(tag_header.timestamp as u64)); - } - - gst_trace!( - self.cat, - obj: demuxer, - "Outputting audio buffer {:?} for tag {:?} of size {}", - buffer, - tag_header, - tag_header.data_size - 1 - ); - - Ok(HandleBufferResult::BufferForStream(AUDIO_STREAM_ID, buffer)) - } - - fn update_video_stream( - &mut self, - demuxer: &Element, - data_header: &flavors::VideoDataHeader, - ) -> Result { - gst_trace!( - self.cat, - obj: demuxer, - "Got video data header: {:?}", - data_header - ); - - let streaming_state = self.streaming_state.as_mut().unwrap(); - - let new_video_format = VideoFormat::new( - data_header, - &streaming_state.metadata, - &streaming_state.avc_sequence_header, - ); - - if streaming_state.video.as_ref() != Some(&new_video_format) { - gst_debug!( - self.cat, - obj: demuxer, - "Got new video format: {:?}", - new_video_format - ); - - let new_stream = streaming_state.video == None; - - let caps = new_video_format.to_caps(); - if let Some(caps) = caps { - streaming_state.video = Some(new_video_format); - let stream = Stream::new(VIDEO_STREAM_ID, caps, String::from("video")); - if new_stream { - return Ok(HandleBufferResult::StreamAdded(stream)); - } else { - return Ok(HandleBufferResult::StreamChanged(stream)); - } - } else { - streaming_state.video = None; - } - } - - if !streaming_state.got_all_streams - && streaming_state.video != None - && (streaming_state.expect_audio && streaming_state.audio != None - || !streaming_state.expect_audio) - { - streaming_state.got_all_streams = true; - return Ok(HandleBufferResult::HaveAllStreams); - } - - Ok(HandleBufferResult::Again) - } - - fn handle_video_tag( - &mut self, - demuxer: &Element, - tag_header: &flavors::TagHeader, - data_header: &flavors::VideoDataHeader, - ) -> Result { - let res = self.update_video_stream(demuxer, data_header); - match res { - Ok(HandleBufferResult::Again) => (), - _ => return res, - } - - if self.adapter.get_available() < (15 + tag_header.data_size) as usize { - return Ok(HandleBufferResult::NeedMoreData); - } - - let mut cts = 0; - - // AVC/H264 special case - if data_header.codec_id == flavors::CodecId::H264 { - // Not big enough for the AVC packet header, ship! - if tag_header.data_size < 1 + 4 { - self.adapter - .flush(15 + tag_header.data_size as usize) - .unwrap(); - gst_warning!( - self.cat, - obj: demuxer, - "Too small packet for AVC packet header {}", - 15 + tag_header.data_size - ); - return Ok(HandleBufferResult::Again); - } - - let mut data = [0u8; 20]; - self.adapter.peek_into(&mut data).unwrap(); - match flavors::avc_video_packet_header(&data[16..]) { - IResult::Error(_) | IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, header) => { - gst_trace!(self.cat, obj: demuxer, "Got AVC packet header {:?}", header); - match header.packet_type { - flavors::AVCPacketType::SequenceHeader => { - self.adapter.flush(15 + 1 + 4).unwrap(); - let buffer = self - .adapter - .get_buffer((tag_header.data_size - 1 - 4) as usize) - .unwrap(); - gst_debug!( - self.cat, - obj: demuxer, - "Got AVC sequence header {:?} of size {}", - buffer, - tag_header.data_size - 1 - 4 - ); - - let streaming_state = self.streaming_state.as_mut().unwrap(); - streaming_state.avc_sequence_header = Some(buffer); - return Ok(HandleBufferResult::Again); - } - flavors::AVCPacketType::NALU => { - cts = header.composition_time; - } - flavors::AVCPacketType::EndOfSequence => { - // Skip - self.adapter - .flush(15 + tag_header.data_size as usize) - .unwrap(); - return Ok(HandleBufferResult::Again); - } - } - } - } - } - - let streaming_state = self.streaming_state.as_ref().unwrap(); - - if streaming_state.video == None { - self.adapter - .flush((tag_header.data_size + 15) as usize) - .unwrap(); - return Ok(HandleBufferResult::Again); - } - - let video = streaming_state.video.as_ref().unwrap(); - let is_keyframe = data_header.frame_type == flavors::FrameType::Key; - - self.adapter.flush(16).unwrap(); - - let offset = match video.format { - flavors::CodecId::VP6 | flavors::CodecId::VP6A => 1, - flavors::CodecId::H264 => 4, - _ => 0, - }; - - if tag_header.data_size == 0 { - return Ok(HandleBufferResult::Again); - } - - if tag_header.data_size < offset { - self.adapter - .flush((tag_header.data_size - 1) as usize) - .unwrap(); - return Ok(HandleBufferResult::Again); - } - - if offset > 0 { - self.adapter.flush(offset as usize).unwrap(); - } - - let mut buffer = self - .adapter - .get_buffer((tag_header.data_size - 1 - offset) as usize) - .unwrap(); - - { - let buffer = buffer.get_mut().unwrap(); - if !is_keyframe { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - buffer.set_dts(gst::ClockTime::from_mseconds(tag_header.timestamp as u64)); - - // Prevent negative numbers - let pts = if cts < 0 && tag_header.timestamp < (-cts) as u32 { - 0 - } else { - ((tag_header.timestamp as i64) + (cts as i64)) as u64 - }; - buffer.set_pts(gst::ClockTime::from_mseconds(pts)); - } - - gst_trace!( - self.cat, - obj: demuxer, - "Outputting video buffer {:?} for tag {:?} of size {}, keyframe: {}", - buffer, - tag_header, - tag_header.data_size - 1 - offset, - is_keyframe - ); - - Ok(HandleBufferResult::BufferForStream(VIDEO_STREAM_ID, buffer)) - } - - fn update_state(&mut self, demuxer: &Element) -> Result { - match self.state { - State::Stopped => unreachable!(), - State::NeedHeader => { - while self.adapter.get_available() >= 9 { - let mut data = [0u8; 9]; - self.adapter.peek_into(&mut data).unwrap(); - - match flavors::header(&data) { - IResult::Error(_) | IResult::Incomplete(_) => { - // fall through - } - IResult::Done(_, ref header) => { - gst_debug!(self.cat, obj: demuxer, "Found FLV header: {:?}", header); - - let skip = if header.offset < 9 { - 0 - } else { - header.offset - 9 - }; - - self.adapter.flush(9).unwrap(); - - self.state = State::Skipping { - audio: header.audio, - video: header.video, - skip_left: skip, - }; - - return Ok(HandleBufferResult::Again); - } - } - - self.adapter.flush(1).unwrap(); - } - - Ok(HandleBufferResult::NeedMoreData) - } - State::Skipping { - audio, - video, - skip_left: 0, - } => { - self.state = State::Streaming; - self.streaming_state = Some(StreamingState::new(audio, video)); - - Ok(HandleBufferResult::Again) - } - State::Skipping { - ref mut skip_left, .. - } => { - let skip = cmp::min(self.adapter.get_available(), *skip_left as usize); - self.adapter.flush(skip).unwrap(); - *skip_left -= skip as u32; - - Ok(HandleBufferResult::Again) - } - State::Streaming => { - if self.adapter.get_available() < 16 { - return Ok(HandleBufferResult::NeedMoreData); - } - - let mut data = [0u8; 16]; - self.adapter.peek_into(&mut data).unwrap(); - - match nom::be_u32(&data[0..4]) { - IResult::Error(_) | IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, previous_size) => { - gst_trace!( - self.cat, - obj: demuxer, - "Previous tag size {}", - previous_size - ); - // Nothing to do here, we just consume it for now - } - } - - let tag_header = match flavors::tag_header(&data[4..]) { - IResult::Error(_) | IResult::Incomplete(_) => unimplemented!(), - IResult::Done(_, tag_header) => tag_header, - }; - - let res = match tag_header.tag_type { - flavors::TagType::Script => { - gst_trace!(self.cat, obj: demuxer, "Found script tag"); - - self.handle_script_tag(demuxer, &tag_header) - } - flavors::TagType::Audio => { - gst_trace!(self.cat, obj: demuxer, "Found audio tag"); - - let data_header = match flavors::audio_data_header(&data[15..]) { - IResult::Error(_) | IResult::Incomplete(_) => unimplemented!(), - IResult::Done(_, data_header) => data_header, - }; - - self.handle_audio_tag(demuxer, &tag_header, &data_header) - } - flavors::TagType::Video => { - gst_trace!(self.cat, obj: demuxer, "Found video tag"); - - let data_header = match flavors::video_data_header(&data[15..]) { - IResult::Error(_) | IResult::Incomplete(_) => unimplemented!(), - IResult::Done(_, data_header) => data_header, - }; - - self.handle_video_tag(demuxer, &tag_header, &data_header) - } - }; - - if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res { - let streaming_state = self.streaming_state.as_mut().unwrap(); - - if buffer.get_pts() != gst::CLOCK_TIME_NONE { - let pts = buffer.get_pts(); - streaming_state.last_position = streaming_state - .last_position - .map(|last| cmp::max(last.into(), pts)) - .unwrap_or(pts); - } else if buffer.get_dts() != gst::CLOCK_TIME_NONE { - let dts = buffer.get_dts(); - streaming_state.last_position = streaming_state - .last_position - .map(|last| cmp::max(last.into(), dts)) - .unwrap_or(dts); - } - } - - res - } - } - } -} - -impl DemuxerImpl for FlvDemux { - fn start( - &mut self, - _demuxer: &Element, - _upstream_size: Option, - _random_access: bool, - ) -> Result<(), gst::ErrorMessage> { - self.state = State::NeedHeader; - - Ok(()) - } - - fn stop(&mut self, _demuxer: &Element) -> Result<(), gst::ErrorMessage> { - self.state = State::Stopped; - self.adapter.clear(); - self.streaming_state = None; - - Ok(()) - } - - fn seek( - &mut self, - _demuxer: &Element, - _start: gst::ClockTime, - _stop: gst::ClockTime, - ) -> Result { - unimplemented!(); - } - - fn handle_buffer( - &mut self, - demuxer: &Element, - buffer: Option, - ) -> Result { - if let Some(buffer) = buffer { - self.adapter.push(buffer); - } - - self.update_state(demuxer) - } - - fn end_of_stream(&mut self, _demuxer: &Element) -> Result<(), gst::ErrorMessage> { - // nothing to do here, all data we have left is incomplete - Ok(()) - } - - fn is_seekable(&self, _demuxer: &Element) -> bool { - false - } - - fn get_position(&self, _demuxer: &Element) -> gst::ClockTime { - if let Some(StreamingState { last_position, .. }) = self.streaming_state { - return last_position; - } - - gst::CLOCK_TIME_NONE - } - - fn get_duration(&self, _demuxer: &Element) -> gst::ClockTime { - if let Some(StreamingState { - metadata: Some(Metadata { duration, .. }), - .. - }) = self.streaming_state - { - return duration; - } - - gst::CLOCK_TIME_NONE - } +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register(plugin, "rsflvdemux", 256 + 100, FlvDemux::get_type()) } diff --git a/gst-plugin-flv/src/lib.rs b/gst-plugin-flv/src/lib.rs index e3656a86..40e8941f 100644 --- a/gst-plugin-flv/src/lib.rs +++ b/gst-plugin-flv/src/lib.rs @@ -8,51 +8,30 @@ #![crate_type = "cdylib"] -extern crate flavors; +#[macro_use] extern crate glib; #[macro_use] -extern crate gst_plugin; -extern crate gst_plugin_simple; -#[macro_use] extern crate gstreamer as gst; -extern crate muldiv; -extern crate nom; -extern crate num_rational; -extern crate url; +extern crate gstreamer_base as gst_base; -use gst_plugin_simple::demuxer::*; +#[macro_use] +extern crate lazy_static; +mod bytes; mod flvdemux; -use flvdemux::FlvDemux; - fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - demuxer_register( - plugin, - DemuxerInfo { - name: "rsflvdemux".into(), - long_name: "FLV Demuxer".into(), - description: "Demuxes FLV Streams".into(), - classification: "Codec/Demuxer".into(), - author: "Sebastian Dröge ".into(), - rank: 256 + 100, - create_instance: FlvDemux::new_boxed, - input_caps: gst::Caps::new_simple("video/x-flv", &[]), - output_caps: gst::Caps::new_any(), - }, - )?; - - Ok(()) + flvdemux::register(plugin) } -plugin_define!( - b"rsflv\0", - b"Rust FLV Plugin\0", +gst_plugin_define!( + "rsflv", + "Rust FLV Plugin", plugin_init, - b"1.0\0", - b"MIT/X11\0", - b"rsflv\0", - b"rsflv\0", - b"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs\0", - b"2016-12-08\0" + "1.0", + "MIT/X11", + "rsflv", + "rsflv", + "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs", + "2016-12-08" );