diff --git a/Cargo.toml b/Cargo.toml index 4be22cf8..ce76d10c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ libc = "0.2" url = "1.1" bitflags = "0.7" reqwest = "0.2" -nom = "1.2" +nom = "2.0" flavors = {git = "https://github.com/Geal/flavors.git"} [build-dependencies] diff --git a/src/flvdemux.rs b/src/flvdemux.rs index 8152c003..f7aefd6a 100644 --- a/src/flvdemux.rs +++ b/src/flvdemux.rs @@ -27,67 +27,65 @@ use rsdemuxer::*; use buffer::*; use adapter::*; +const AUDIO_STREAM_ID: u32 = 0; +const VIDEO_STREAM_ID: u32 = 1; + #[derive(Debug)] -enum StreamingState { +enum State { Stopped, - Started { - adapter: Adapter, - stream_state: StreamState, - }, -} - -#[derive(Debug)] -enum StreamState { NeedHeader, - Initialized { - header: flavors::Header, - initialized_state: InitializedState, - }, -} - -#[derive(Debug)] -enum InitializedState { Skipping { skip_left: u32 }, - Setup { setup_state: SetupState }, + Streaming, } #[derive(Debug)] -struct SetupState { +struct StreamingState { + // TODO: Store in custom structs that just contain what we need audio: Option, - video: Option, /* TODO: parse and store various audio/video metadata from ScriptDataObject */ + video: Option, got_all_streams: bool, - last_position: Option, + last_position: Option, // TODO: parse and store various audio/video metadata from ScriptDataObject } #[derive(Debug)] pub struct FlvDemux { - streaming_state: StreamingState, + state: State, + adapter: Adapter, + // Only in >= State::Skipping + header: Option, + // Only in >= State::Streaming + streaming_state: Option, } impl FlvDemux { pub fn new() -> FlvDemux { - FlvDemux { streaming_state: StreamingState::Stopped } + FlvDemux { + state: State::Stopped, + adapter: Adapter::new(), + header: None, + streaming_state: None, + } } pub fn new_boxed() -> Box { Box::new(FlvDemux::new()) } - fn handle_script_tag(adapter: &mut Adapter, - header: &flavors::Header, - setup_state: &mut SetupState, + fn handle_script_tag(&mut self, tag_header: &flavors::TagHeader) -> Result { - adapter.flush((15 + tag_header.data_size) as usize).unwrap(); + self.adapter.flush((15 + tag_header.data_size) as usize).unwrap(); Ok(HandleBufferResult::Again) } - fn update_audio_stream(header: &flavors::Header, - setup_state: &mut SetupState, + fn update_audio_stream(&mut self, data_header: flavors::AudioDataHeader) -> Result { - let stream_changed = match setup_state.audio { + let header = self.header.as_ref().unwrap(); + let streaming_state = self.streaming_state.as_mut().unwrap(); + + let stream_changed = match streaming_state.audio { None => true, Some(flavors::AudioDataHeader { sound_format, sound_rate, sound_size, sound_type }) if sound_format != data_header.sound_format || @@ -101,10 +99,10 @@ impl FlvDemux { match data_header { flavors::AudioDataHeader { sound_format: flavors::SoundFormat::MP3, .. } => { let format = String::from("audio/mpeg, mpegversion=1, layer=3"); - let new_stream = setup_state.audio == None; + let new_stream = streaming_state.audio == None; - setup_state.audio = Some(data_header); - let stream = Stream::new(0, format, String::from("audio")); + streaming_state.audio = Some(data_header); + let stream = Stream::new(AUDIO_STREAM_ID, format, String::from("audio")); if new_stream { return Ok(HandleBufferResult::StreamAdded(stream)); } else { @@ -116,51 +114,49 @@ impl FlvDemux { } } } - setup_state.audio = Some(data_header); + streaming_state.audio = Some(data_header); - if !setup_state.got_all_streams && - (header.video && setup_state.video != None || !header.video) { - setup_state.got_all_streams = true; + if !streaming_state.got_all_streams && + (header.video && streaming_state.video != None || !header.video) { + streaming_state.got_all_streams = true; return Ok(HandleBufferResult::HaveAllStreams); } Ok(HandleBufferResult::Again) } - fn handle_audio_tag(adapter: &mut Adapter, - header: &flavors::Header, - setup_state: &mut SetupState, + fn handle_audio_tag(&mut self, tag_header: &flavors::TagHeader, data_header: flavors::AudioDataHeader) -> Result { - let res = Self::update_audio_stream(header, setup_state, data_header); + let res = self.update_audio_stream(data_header); match res { Ok(HandleBufferResult::Again) => (), _ => return res, } - if adapter.get_available() < (15 + tag_header.data_size) as usize { + if self.adapter.get_available() < (15 + tag_header.data_size) as usize { return Ok(HandleBufferResult::NeedMoreData); } - adapter.flush(16).unwrap(); + self.adapter.flush(16).unwrap(); if tag_header.data_size == 0 { return Ok(HandleBufferResult::Again); } - let mut buffer = adapter.get_buffer((tag_header.data_size - 1) as usize) - .unwrap(); - buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000)) - .unwrap(); + let mut buffer = self.adapter.get_buffer((tag_header.data_size - 1) as usize).unwrap(); + buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000)).unwrap(); - Ok(HandleBufferResult::BufferForStream(0, buffer)) + Ok(HandleBufferResult::BufferForStream(AUDIO_STREAM_ID, buffer)) } - fn update_video_stream(header: &flavors::Header, - setup_state: &mut SetupState, + fn update_video_stream(&mut self, data_header: flavors::VideoDataHeader) -> Result { - let stream_changed = match setup_state.video { + let header = self.header.as_ref().unwrap(); + let streaming_state = self.streaming_state.as_mut().unwrap(); + + let stream_changed = match streaming_state.video { None => true, Some(flavors::VideoDataHeader { codec_id, .. }) if codec_id != data_header.codec_id => { true @@ -172,10 +168,10 @@ impl FlvDemux { match data_header { flavors::VideoDataHeader { codec_id: flavors::CodecId::VP6, .. } => { let format = String::from("video/x-vp6-flash"); - let new_stream = setup_state.video == None; - setup_state.video = Some(data_header); + let new_stream = streaming_state.video == None; + streaming_state.video = Some(data_header); - let stream = Stream::new(1, format, String::from("video")); + let stream = Stream::new(VIDEO_STREAM_ID, format, String::from("video")); if new_stream { return Ok(HandleBufferResult::StreamAdded(stream)); } else { @@ -188,37 +184,36 @@ impl FlvDemux { } } - setup_state.video = Some(data_header); + streaming_state.video = Some(data_header); - if !setup_state.got_all_streams && - (header.audio && setup_state.audio != None || !header.audio) { - setup_state.got_all_streams = true; + if !streaming_state.got_all_streams && + (header.audio && streaming_state.audio != None || !header.audio) { + streaming_state.got_all_streams = true; return Ok(HandleBufferResult::HaveAllStreams); } Ok(HandleBufferResult::Again) } - fn handle_video_tag(adapter: &mut Adapter, - header: &flavors::Header, - setup_state: &mut SetupState, + fn handle_video_tag(&mut self, tag_header: &flavors::TagHeader, data_header: flavors::VideoDataHeader) -> Result { - let res = Self::update_video_stream(header, setup_state, data_header); + let res = self.update_video_stream(data_header); match res { Ok(HandleBufferResult::Again) => (), _ => return res, } - if adapter.get_available() < (15 + tag_header.data_size) as usize { + if self.adapter.get_available() < (15 + tag_header.data_size) as usize { return Ok(HandleBufferResult::NeedMoreData); } - let video = setup_state.video.as_ref().unwrap(); + let streaming_state = self.streaming_state.as_ref().unwrap(); + let video = streaming_state.video.as_ref().unwrap(); let is_keyframe = video.frame_type == flavors::FrameType::Key; - adapter.flush(16).unwrap(); + self.adapter.flush(16).unwrap(); let offset = if video.codec_id == flavors::CodecId::VP6 || video.codec_id == flavors::CodecId::VP6A { @@ -232,15 +227,16 @@ impl FlvDemux { } if tag_header.data_size < offset { - adapter.flush((tag_header.data_size - 1) as usize).unwrap(); + self.adapter.flush((tag_header.data_size - 1) as usize).unwrap(); return Ok(HandleBufferResult::Again); } if offset > 0 { - adapter.flush(offset as usize).unwrap(); + self.adapter.flush(offset as usize).unwrap(); } - let mut buffer = adapter.get_buffer((tag_header.data_size - 1 - offset) as usize) + let mut buffer = self.adapter + .get_buffer((tag_header.data_size - 1 - offset) as usize) .unwrap(); if !is_keyframe { buffer.set_flags(BUFFER_FLAG_DELTA_UNIT).unwrap(); @@ -248,116 +244,16 @@ impl FlvDemux { buffer.set_dts(Some((tag_header.timestamp as u64) * 1000 * 1000)) .unwrap(); - Ok(HandleBufferResult::BufferForStream(1, buffer)) + Ok(HandleBufferResult::BufferForStream(VIDEO_STREAM_ID, buffer)) } - fn update_setup_state(adapter: &mut Adapter, - header: &flavors::Header, - setup_state: &mut SetupState) - -> Result { - if adapter.get_available() < 16 { - return Ok(HandleBufferResult::NeedMoreData); - } - - let mut data = [0u8; 16]; - adapter.peek_into(&mut data).unwrap(); - - match nom::be_u32(&data[0..4]) { - IResult::Error(_) | - IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, previous_size) => { - (); - } - } - - let tag_header = match flavors::tag_header(&data[4..]) { - IResult::Error(_) | - IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, tag_header) => tag_header, - }; - - let res = match tag_header.tag_type { - flavors::TagType::Script => { - Self::handle_script_tag(adapter, header, setup_state, &tag_header) - } - flavors::TagType::Audio => { - let data_header = match flavors::audio_data_header(&data[15..]) { - IResult::Error(_) | - IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, data_header) => data_header, - }; - - Self::handle_audio_tag(adapter, header, setup_state, &tag_header, data_header) - } - flavors::TagType::Video => { - let data_header = match flavors::video_data_header(&data[15..]) { - IResult::Error(_) | - IResult::Incomplete(_) => { - unimplemented!(); - } - IResult::Done(_, data_header) => data_header, - }; - - Self::handle_video_tag(adapter, header, setup_state, &tag_header, data_header) - } - }; - - if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res { - if let Some(pts) = buffer.get_pts() { - setup_state.last_position = - setup_state.last_position.map(|last| cmp::max(last, pts)).or_else(|| Some(pts)); - } else if let Some(dts) = buffer.get_dts() { - setup_state.last_position = - setup_state.last_position.map(|last| cmp::max(last, dts)).or_else(|| Some(dts)); - } - } - - res - } - - fn update_initialized_state(adapter: &mut Adapter, - header: &flavors::Header, - initialized_state: &mut InitializedState) - -> Result { - match *initialized_state { - InitializedState::Skipping { skip_left: 0 } => { - *initialized_state = InitializedState::Setup { - setup_state: SetupState { - audio: None, - video: None, - got_all_streams: false, - last_position: None, - }, - }; - Ok(HandleBufferResult::Again) - } - InitializedState::Skipping { ref mut skip_left } => { - let skip = cmp::min(adapter.get_available(), *skip_left as usize); - adapter.flush(skip).unwrap(); - *skip_left -= skip as u32; - - Ok(HandleBufferResult::Again) - } - InitializedState::Setup { ref mut setup_state } => { - Self::update_setup_state(adapter, header, setup_state) - } - } - } - - fn update_state(adapter: &mut Adapter, - stream_state: &mut StreamState) - -> Result { - match *stream_state { - StreamState::NeedHeader => { - while adapter.get_available() >= 9 { + fn update_state(&mut self) -> Result { + match self.state { + State::Stopped => unreachable!(), + State::NeedHeader => { + while self.adapter.get_available() >= 9 { let mut data = [0u8; 9]; - adapter.peek_into(&mut data).unwrap(); + self.adapter.peek_into(&mut data).unwrap(); match flavors::header(&data) { IResult::Error(_) | @@ -369,24 +265,106 @@ impl FlvDemux { header.offset = 9; } let skip = header.offset - 9; - adapter.flush(9).unwrap(); + self.adapter.flush(9).unwrap(); - *stream_state = StreamState::Initialized { - header: header, - initialized_state: InitializedState::Skipping { skip_left: skip }, - }; + self.header = Some(header); + self.state = State::Skipping { skip_left: skip }; return Ok(HandleBufferResult::Again); } } - adapter.flush(1).unwrap(); + self.adapter.flush(1).unwrap(); } Ok(HandleBufferResult::NeedMoreData) } - StreamState::Initialized { ref header, ref mut initialized_state } => { - Self::update_initialized_state(adapter, header, initialized_state) + State::Skipping { skip_left: 0 } => { + self.state = State::Streaming; + self.streaming_state = Some(StreamingState { + audio: None, + video: None, + got_all_streams: false, + last_position: None, + }); + + Ok(HandleBufferResult::Again) + } + State::Skipping { ref mut skip_left } => { + let skip = cmp::min(self.adapter.get_available(), *skip_left as usize); + self.adapter.flush(skip).unwrap(); + *skip_left -= skip as u32; + + Ok(HandleBufferResult::Again) + } + State::Streaming => { + if self.adapter.get_available() < 16 { + return Ok(HandleBufferResult::NeedMoreData); + } + + let mut data = [0u8; 16]; + self.adapter.peek_into(&mut data).unwrap(); + + match nom::be_u32(&data[0..4]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, _previous_size) => { + // Nothing to do here, we just consume it for now + } + } + + let tag_header = match flavors::tag_header(&data[4..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, tag_header) => tag_header, + }; + + let res = match tag_header.tag_type { + flavors::TagType::Script => self.handle_script_tag(&tag_header), + flavors::TagType::Audio => { + let data_header = match flavors::audio_data_header(&data[15..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, data_header) => data_header, + }; + + self.handle_audio_tag(&tag_header, data_header) + } + flavors::TagType::Video => { + let data_header = match flavors::video_data_header(&data[15..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, data_header) => data_header, + }; + + self.handle_video_tag(&tag_header, data_header) + } + }; + + if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res { + let streaming_state = self.streaming_state.as_mut().unwrap(); + + if let Some(pts) = buffer.get_pts() { + streaming_state.last_position = streaming_state.last_position + .map(|last| cmp::max(last, pts)) + .or_else(|| Some(pts)); + } else if let Some(dts) = buffer.get_dts() { + streaming_state.last_position = streaming_state.last_position + .map(|last| cmp::max(last, dts)) + .or_else(|| Some(dts)); + } + } + + res + } } } @@ -397,16 +375,16 @@ impl Demuxer for FlvDemux { _upstream_size: Option, _random_access: bool) -> Result<(), ErrorMessage> { - self.streaming_state = StreamingState::Started { - adapter: Adapter::new(), - stream_state: StreamState::NeedHeader, - }; + self.state = State::NeedHeader; Ok(()) } fn stop(&mut self) -> Result<(), ErrorMessage> { - self.streaming_state = StreamingState::Stopped; + self.state = State::Stopped; + self.adapter.clear(); + self.header = None; + self.streaming_state = None; Ok(()) } @@ -416,20 +394,11 @@ impl Demuxer for FlvDemux { } fn handle_buffer(&mut self, buffer: Option) -> Result { - let (adapter, stream_state) = match self.streaming_state { - StreamingState::Started { ref mut adapter, ref mut stream_state } => { - (adapter, stream_state) - } - StreamingState::Stopped => { - unreachable!(); - } - }; - if let Some(buffer) = buffer { - adapter.push(buffer); + self.adapter.push(buffer); } - Self::update_state(adapter, stream_state) + self.update_state() } fn end_of_stream(&mut self) -> Result<(), ErrorMessage> { @@ -442,15 +411,7 @@ impl Demuxer for FlvDemux { } fn get_position(&self) -> Option { - if let StreamingState::Started { - stream_state: StreamState::Initialized { - initialized_state: InitializedState::Setup { - setup_state: SetupState { - last_position, .. - }, .. - }, .. - }, .. - } = self.streaming_state { + if let Some(StreamingState { last_position, .. }) = self.streaming_state { return last_position; }