Simplify state handling

This is not too nice now as we have None values to unwrap, but the code
is at least less convoluted.
This commit is contained in:
Sebastian Dröge 2016-12-04 00:57:42 +02:00
parent 62d24efc04
commit 9e53c0a926
2 changed files with 170 additions and 209 deletions

View file

@ -11,7 +11,7 @@ libc = "0.2"
url = "1.1" url = "1.1"
bitflags = "0.7" bitflags = "0.7"
reqwest = "0.2" reqwest = "0.2"
nom = "1.2" nom = "2.0"
flavors = {git = "https://github.com/Geal/flavors.git"} flavors = {git = "https://github.com/Geal/flavors.git"}
[build-dependencies] [build-dependencies]

View file

@ -27,67 +27,65 @@ use rsdemuxer::*;
use buffer::*; use buffer::*;
use adapter::*; use adapter::*;
const AUDIO_STREAM_ID: u32 = 0;
const VIDEO_STREAM_ID: u32 = 1;
#[derive(Debug)] #[derive(Debug)]
enum StreamingState { enum State {
Stopped, Stopped,
Started {
adapter: Adapter,
stream_state: StreamState,
},
}
#[derive(Debug)]
enum StreamState {
NeedHeader, NeedHeader,
Initialized {
header: flavors::Header,
initialized_state: InitializedState,
},
}
#[derive(Debug)]
enum InitializedState {
Skipping { skip_left: u32 }, Skipping { skip_left: u32 },
Setup { setup_state: SetupState }, Streaming,
} }
#[derive(Debug)] #[derive(Debug)]
struct SetupState { struct StreamingState {
// TODO: Store in custom structs that just contain what we need
audio: Option<flavors::AudioDataHeader>, audio: Option<flavors::AudioDataHeader>,
video: Option<flavors::VideoDataHeader>, /* TODO: parse and store various audio/video metadata from ScriptDataObject */ video: Option<flavors::VideoDataHeader>,
got_all_streams: bool, got_all_streams: bool,
last_position: Option<u64>, last_position: Option<u64>, // TODO: parse and store various audio/video metadata from ScriptDataObject
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FlvDemux { pub struct FlvDemux {
streaming_state: StreamingState, state: State,
adapter: Adapter,
// Only in >= State::Skipping
header: Option<flavors::Header>,
// Only in >= State::Streaming
streaming_state: Option<StreamingState>,
} }
impl FlvDemux { impl FlvDemux {
pub fn new() -> FlvDemux { pub fn new() -> FlvDemux {
FlvDemux { streaming_state: StreamingState::Stopped } FlvDemux {
state: State::Stopped,
adapter: Adapter::new(),
header: None,
streaming_state: None,
}
} }
pub fn new_boxed() -> Box<Demuxer> { pub fn new_boxed() -> Box<Demuxer> {
Box::new(FlvDemux::new()) Box::new(FlvDemux::new())
} }
fn handle_script_tag(adapter: &mut Adapter, fn handle_script_tag(&mut self,
header: &flavors::Header,
setup_state: &mut SetupState,
tag_header: &flavors::TagHeader) tag_header: &flavors::TagHeader)
-> Result<HandleBufferResult, FlowError> { -> Result<HandleBufferResult, FlowError> {
adapter.flush((15 + tag_header.data_size) as usize).unwrap(); self.adapter.flush((15 + tag_header.data_size) as usize).unwrap();
Ok(HandleBufferResult::Again) Ok(HandleBufferResult::Again)
} }
fn update_audio_stream(header: &flavors::Header, fn update_audio_stream(&mut self,
setup_state: &mut SetupState,
data_header: flavors::AudioDataHeader) data_header: flavors::AudioDataHeader)
-> Result<HandleBufferResult, FlowError> { -> Result<HandleBufferResult, FlowError> {
let stream_changed = match setup_state.audio { let header = self.header.as_ref().unwrap();
let streaming_state = self.streaming_state.as_mut().unwrap();
let stream_changed = match streaming_state.audio {
None => true, None => true,
Some(flavors::AudioDataHeader { sound_format, sound_rate, sound_size, sound_type }) Some(flavors::AudioDataHeader { sound_format, sound_rate, sound_size, sound_type })
if sound_format != data_header.sound_format || if sound_format != data_header.sound_format ||
@ -101,10 +99,10 @@ impl FlvDemux {
match data_header { match data_header {
flavors::AudioDataHeader { sound_format: flavors::SoundFormat::MP3, .. } => { flavors::AudioDataHeader { sound_format: flavors::SoundFormat::MP3, .. } => {
let format = String::from("audio/mpeg, mpegversion=1, layer=3"); let format = String::from("audio/mpeg, mpegversion=1, layer=3");
let new_stream = setup_state.audio == None; let new_stream = streaming_state.audio == None;
setup_state.audio = Some(data_header); streaming_state.audio = Some(data_header);
let stream = Stream::new(0, format, String::from("audio")); let stream = Stream::new(AUDIO_STREAM_ID, format, String::from("audio"));
if new_stream { if new_stream {
return Ok(HandleBufferResult::StreamAdded(stream)); return Ok(HandleBufferResult::StreamAdded(stream));
} else { } else {
@ -116,51 +114,49 @@ impl FlvDemux {
} }
} }
} }
setup_state.audio = Some(data_header); streaming_state.audio = Some(data_header);
if !setup_state.got_all_streams && if !streaming_state.got_all_streams &&
(header.video && setup_state.video != None || !header.video) { (header.video && streaming_state.video != None || !header.video) {
setup_state.got_all_streams = true; streaming_state.got_all_streams = true;
return Ok(HandleBufferResult::HaveAllStreams); return Ok(HandleBufferResult::HaveAllStreams);
} }
Ok(HandleBufferResult::Again) Ok(HandleBufferResult::Again)
} }
fn handle_audio_tag(adapter: &mut Adapter, fn handle_audio_tag(&mut self,
header: &flavors::Header,
setup_state: &mut SetupState,
tag_header: &flavors::TagHeader, tag_header: &flavors::TagHeader,
data_header: flavors::AudioDataHeader) data_header: flavors::AudioDataHeader)
-> Result<HandleBufferResult, FlowError> { -> Result<HandleBufferResult, FlowError> {
let res = Self::update_audio_stream(header, setup_state, data_header); let res = self.update_audio_stream(data_header);
match res { match res {
Ok(HandleBufferResult::Again) => (), Ok(HandleBufferResult::Again) => (),
_ => return res, _ => return res,
} }
if adapter.get_available() < (15 + tag_header.data_size) as usize { if self.adapter.get_available() < (15 + tag_header.data_size) as usize {
return Ok(HandleBufferResult::NeedMoreData); return Ok(HandleBufferResult::NeedMoreData);
} }
adapter.flush(16).unwrap(); self.adapter.flush(16).unwrap();
if tag_header.data_size == 0 { if tag_header.data_size == 0 {
return Ok(HandleBufferResult::Again); return Ok(HandleBufferResult::Again);
} }
let mut buffer = adapter.get_buffer((tag_header.data_size - 1) as usize) let mut buffer = self.adapter.get_buffer((tag_header.data_size - 1) as usize).unwrap();
.unwrap(); buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000)).unwrap();
buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000))
.unwrap();
Ok(HandleBufferResult::BufferForStream(0, buffer)) Ok(HandleBufferResult::BufferForStream(AUDIO_STREAM_ID, buffer))
} }
fn update_video_stream(header: &flavors::Header, fn update_video_stream(&mut self,
setup_state: &mut SetupState,
data_header: flavors::VideoDataHeader) data_header: flavors::VideoDataHeader)
-> Result<HandleBufferResult, FlowError> { -> Result<HandleBufferResult, FlowError> {
let stream_changed = match setup_state.video { let header = self.header.as_ref().unwrap();
let streaming_state = self.streaming_state.as_mut().unwrap();
let stream_changed = match streaming_state.video {
None => true, None => true,
Some(flavors::VideoDataHeader { codec_id, .. }) if codec_id != data_header.codec_id => { Some(flavors::VideoDataHeader { codec_id, .. }) if codec_id != data_header.codec_id => {
true true
@ -172,10 +168,10 @@ impl FlvDemux {
match data_header { match data_header {
flavors::VideoDataHeader { codec_id: flavors::CodecId::VP6, .. } => { flavors::VideoDataHeader { codec_id: flavors::CodecId::VP6, .. } => {
let format = String::from("video/x-vp6-flash"); let format = String::from("video/x-vp6-flash");
let new_stream = setup_state.video == None; let new_stream = streaming_state.video == None;
setup_state.video = Some(data_header); streaming_state.video = Some(data_header);
let stream = Stream::new(1, format, String::from("video")); let stream = Stream::new(VIDEO_STREAM_ID, format, String::from("video"));
if new_stream { if new_stream {
return Ok(HandleBufferResult::StreamAdded(stream)); return Ok(HandleBufferResult::StreamAdded(stream));
} else { } else {
@ -188,37 +184,36 @@ impl FlvDemux {
} }
} }
setup_state.video = Some(data_header); streaming_state.video = Some(data_header);
if !setup_state.got_all_streams && if !streaming_state.got_all_streams &&
(header.audio && setup_state.audio != None || !header.audio) { (header.audio && streaming_state.audio != None || !header.audio) {
setup_state.got_all_streams = true; streaming_state.got_all_streams = true;
return Ok(HandleBufferResult::HaveAllStreams); return Ok(HandleBufferResult::HaveAllStreams);
} }
Ok(HandleBufferResult::Again) Ok(HandleBufferResult::Again)
} }
fn handle_video_tag(adapter: &mut Adapter, fn handle_video_tag(&mut self,
header: &flavors::Header,
setup_state: &mut SetupState,
tag_header: &flavors::TagHeader, tag_header: &flavors::TagHeader,
data_header: flavors::VideoDataHeader) data_header: flavors::VideoDataHeader)
-> Result<HandleBufferResult, FlowError> { -> Result<HandleBufferResult, FlowError> {
let res = Self::update_video_stream(header, setup_state, data_header); let res = self.update_video_stream(data_header);
match res { match res {
Ok(HandleBufferResult::Again) => (), Ok(HandleBufferResult::Again) => (),
_ => return res, _ => return res,
} }
if adapter.get_available() < (15 + tag_header.data_size) as usize { if self.adapter.get_available() < (15 + tag_header.data_size) as usize {
return Ok(HandleBufferResult::NeedMoreData); return Ok(HandleBufferResult::NeedMoreData);
} }
let video = setup_state.video.as_ref().unwrap(); let streaming_state = self.streaming_state.as_ref().unwrap();
let video = streaming_state.video.as_ref().unwrap();
let is_keyframe = video.frame_type == flavors::FrameType::Key; let is_keyframe = video.frame_type == flavors::FrameType::Key;
adapter.flush(16).unwrap(); self.adapter.flush(16).unwrap();
let offset = if video.codec_id == flavors::CodecId::VP6 || let offset = if video.codec_id == flavors::CodecId::VP6 ||
video.codec_id == flavors::CodecId::VP6A { video.codec_id == flavors::CodecId::VP6A {
@ -232,15 +227,16 @@ impl FlvDemux {
} }
if tag_header.data_size < offset { if tag_header.data_size < offset {
adapter.flush((tag_header.data_size - 1) as usize).unwrap(); self.adapter.flush((tag_header.data_size - 1) as usize).unwrap();
return Ok(HandleBufferResult::Again); return Ok(HandleBufferResult::Again);
} }
if offset > 0 { if offset > 0 {
adapter.flush(offset as usize).unwrap(); self.adapter.flush(offset as usize).unwrap();
} }
let mut buffer = adapter.get_buffer((tag_header.data_size - 1 - offset) as usize) let mut buffer = self.adapter
.get_buffer((tag_header.data_size - 1 - offset) as usize)
.unwrap(); .unwrap();
if !is_keyframe { if !is_keyframe {
buffer.set_flags(BUFFER_FLAG_DELTA_UNIT).unwrap(); buffer.set_flags(BUFFER_FLAG_DELTA_UNIT).unwrap();
@ -248,116 +244,16 @@ impl FlvDemux {
buffer.set_dts(Some((tag_header.timestamp as u64) * 1000 * 1000)) buffer.set_dts(Some((tag_header.timestamp as u64) * 1000 * 1000))
.unwrap(); .unwrap();
Ok(HandleBufferResult::BufferForStream(1, buffer)) Ok(HandleBufferResult::BufferForStream(VIDEO_STREAM_ID, buffer))
} }
fn update_setup_state(adapter: &mut Adapter, fn update_state(&mut self) -> Result<HandleBufferResult, FlowError> {
header: &flavors::Header, match self.state {
setup_state: &mut SetupState) State::Stopped => unreachable!(),
-> Result<HandleBufferResult, FlowError> { State::NeedHeader => {
if adapter.get_available() < 16 { while self.adapter.get_available() >= 9 {
return Ok(HandleBufferResult::NeedMoreData);
}
let mut data = [0u8; 16];
adapter.peek_into(&mut data).unwrap();
match nom::be_u32(&data[0..4]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, previous_size) => {
();
}
}
let tag_header = match flavors::tag_header(&data[4..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, tag_header) => tag_header,
};
let res = match tag_header.tag_type {
flavors::TagType::Script => {
Self::handle_script_tag(adapter, header, setup_state, &tag_header)
}
flavors::TagType::Audio => {
let data_header = match flavors::audio_data_header(&data[15..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, data_header) => data_header,
};
Self::handle_audio_tag(adapter, header, setup_state, &tag_header, data_header)
}
flavors::TagType::Video => {
let data_header = match flavors::video_data_header(&data[15..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, data_header) => data_header,
};
Self::handle_video_tag(adapter, header, setup_state, &tag_header, data_header)
}
};
if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res {
if let Some(pts) = buffer.get_pts() {
setup_state.last_position =
setup_state.last_position.map(|last| cmp::max(last, pts)).or_else(|| Some(pts));
} else if let Some(dts) = buffer.get_dts() {
setup_state.last_position =
setup_state.last_position.map(|last| cmp::max(last, dts)).or_else(|| Some(dts));
}
}
res
}
fn update_initialized_state(adapter: &mut Adapter,
header: &flavors::Header,
initialized_state: &mut InitializedState)
-> Result<HandleBufferResult, FlowError> {
match *initialized_state {
InitializedState::Skipping { skip_left: 0 } => {
*initialized_state = InitializedState::Setup {
setup_state: SetupState {
audio: None,
video: None,
got_all_streams: false,
last_position: None,
},
};
Ok(HandleBufferResult::Again)
}
InitializedState::Skipping { ref mut skip_left } => {
let skip = cmp::min(adapter.get_available(), *skip_left as usize);
adapter.flush(skip).unwrap();
*skip_left -= skip as u32;
Ok(HandleBufferResult::Again)
}
InitializedState::Setup { ref mut setup_state } => {
Self::update_setup_state(adapter, header, setup_state)
}
}
}
fn update_state(adapter: &mut Adapter,
stream_state: &mut StreamState)
-> Result<HandleBufferResult, FlowError> {
match *stream_state {
StreamState::NeedHeader => {
while adapter.get_available() >= 9 {
let mut data = [0u8; 9]; let mut data = [0u8; 9];
adapter.peek_into(&mut data).unwrap(); self.adapter.peek_into(&mut data).unwrap();
match flavors::header(&data) { match flavors::header(&data) {
IResult::Error(_) | IResult::Error(_) |
@ -369,24 +265,106 @@ impl FlvDemux {
header.offset = 9; header.offset = 9;
} }
let skip = header.offset - 9; let skip = header.offset - 9;
adapter.flush(9).unwrap(); self.adapter.flush(9).unwrap();
*stream_state = StreamState::Initialized { self.header = Some(header);
header: header, self.state = State::Skipping { skip_left: skip };
initialized_state: InitializedState::Skipping { skip_left: skip },
};
return Ok(HandleBufferResult::Again); return Ok(HandleBufferResult::Again);
} }
} }
adapter.flush(1).unwrap(); self.adapter.flush(1).unwrap();
} }
Ok(HandleBufferResult::NeedMoreData) Ok(HandleBufferResult::NeedMoreData)
} }
StreamState::Initialized { ref header, ref mut initialized_state } => { State::Skipping { skip_left: 0 } => {
Self::update_initialized_state(adapter, header, initialized_state) self.state = State::Streaming;
self.streaming_state = Some(StreamingState {
audio: None,
video: None,
got_all_streams: false,
last_position: None,
});
Ok(HandleBufferResult::Again)
}
State::Skipping { ref mut skip_left } => {
let skip = cmp::min(self.adapter.get_available(), *skip_left as usize);
self.adapter.flush(skip).unwrap();
*skip_left -= skip as u32;
Ok(HandleBufferResult::Again)
}
State::Streaming => {
if self.adapter.get_available() < 16 {
return Ok(HandleBufferResult::NeedMoreData);
}
let mut data = [0u8; 16];
self.adapter.peek_into(&mut data).unwrap();
match nom::be_u32(&data[0..4]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, _previous_size) => {
// Nothing to do here, we just consume it for now
}
}
let tag_header = match flavors::tag_header(&data[4..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, tag_header) => tag_header,
};
let res = match tag_header.tag_type {
flavors::TagType::Script => self.handle_script_tag(&tag_header),
flavors::TagType::Audio => {
let data_header = match flavors::audio_data_header(&data[15..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, data_header) => data_header,
};
self.handle_audio_tag(&tag_header, data_header)
}
flavors::TagType::Video => {
let data_header = match flavors::video_data_header(&data[15..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, data_header) => data_header,
};
self.handle_video_tag(&tag_header, data_header)
}
};
if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res {
let streaming_state = self.streaming_state.as_mut().unwrap();
if let Some(pts) = buffer.get_pts() {
streaming_state.last_position = streaming_state.last_position
.map(|last| cmp::max(last, pts))
.or_else(|| Some(pts));
} else if let Some(dts) = buffer.get_dts() {
streaming_state.last_position = streaming_state.last_position
.map(|last| cmp::max(last, dts))
.or_else(|| Some(dts));
}
}
res
} }
} }
} }
@ -397,16 +375,16 @@ impl Demuxer for FlvDemux {
_upstream_size: Option<u64>, _upstream_size: Option<u64>,
_random_access: bool) _random_access: bool)
-> Result<(), ErrorMessage> { -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Started { self.state = State::NeedHeader;
adapter: Adapter::new(),
stream_state: StreamState::NeedHeader,
};
Ok(()) Ok(())
} }
fn stop(&mut self) -> Result<(), ErrorMessage> { fn stop(&mut self) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped; self.state = State::Stopped;
self.adapter.clear();
self.header = None;
self.streaming_state = None;
Ok(()) Ok(())
} }
@ -416,20 +394,11 @@ impl Demuxer for FlvDemux {
} }
fn handle_buffer(&mut self, buffer: Option<Buffer>) -> Result<HandleBufferResult, FlowError> { fn handle_buffer(&mut self, buffer: Option<Buffer>) -> Result<HandleBufferResult, FlowError> {
let (adapter, stream_state) = match self.streaming_state {
StreamingState::Started { ref mut adapter, ref mut stream_state } => {
(adapter, stream_state)
}
StreamingState::Stopped => {
unreachable!();
}
};
if let Some(buffer) = buffer { if let Some(buffer) = buffer {
adapter.push(buffer); self.adapter.push(buffer);
} }
Self::update_state(adapter, stream_state) self.update_state()
} }
fn end_of_stream(&mut self) -> Result<(), ErrorMessage> { fn end_of_stream(&mut self) -> Result<(), ErrorMessage> {
@ -442,15 +411,7 @@ impl Demuxer for FlvDemux {
} }
fn get_position(&self) -> Option<u64> { fn get_position(&self) -> Option<u64> {
if let StreamingState::Started { if let Some(StreamingState { last_position, .. }) = self.streaming_state {
stream_state: StreamState::Initialized {
initialized_state: InitializedState::Setup {
setup_state: SetupState {
last_position, ..
}, ..
}, ..
}, ..
} = self.streaming_state {
return last_position; return last_position;
} }