Get rid of demuxer wrapper

It was just an unneeded layer of indirection now after all the
refactoring
This commit is contained in:
Sebastian Dröge 2017-09-22 13:49:19 +03:00
parent cda23d5c41
commit 0d22579297

View file

@ -93,244 +93,6 @@ impl Stream {
} }
} }
// TODO: Get rid of this wrapper
pub struct DemuxerWrapper {
cat: gst::DebugCategory,
demuxer: Mutex<Box<DemuxerImpl>>,
}
impl DemuxerWrapper {
fn new(demuxer_impl: Box<DemuxerImpl>) -> DemuxerWrapper {
DemuxerWrapper {
cat: gst::DebugCategory::new(
"rsdemux",
gst::DebugColorFlags::empty(),
"Rust demuxer base class",
),
demuxer: Mutex::new(demuxer_impl),
}
}
fn start(&self, element: &RsDemuxer, upstream_size: Option<u64>, random_access: bool) -> bool {
let demuxer_impl = &mut self.demuxer.lock().unwrap();
gst_debug!(
self.cat,
obj: element,
"Starting with upstream size {:?} and random access {}",
upstream_size,
random_access
);
match demuxer_impl.start(element, upstream_size, random_access) {
Ok(..) => {
gst_trace!(self.cat, obj: element, "Successfully started",);
true
}
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed to start: {:?}", msg);
self.post_message(element, msg);
false
}
}
}
fn stop(&self, element: &RsDemuxer) -> bool {
let demuxer_impl = &mut self.demuxer.lock().unwrap();
gst_debug!(self.cat, obj: element, "Stopping");
match demuxer_impl.stop(element) {
Ok(..) => {
gst_trace!(self.cat, obj: element, "Successfully stop");
true
}
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed to stop: {:?}", msg);
self.post_message(element, msg);
false
}
}
}
fn is_seekable(&self, element: &RsDemuxer) -> bool {
let demuxer_impl = &self.demuxer.lock().unwrap();
let seekable = demuxer_impl.is_seekable(element);
gst_debug!(self.cat, obj: element, "Seekable {}", seekable);
seekable
}
fn get_position(&self, element: &RsDemuxer) -> Option<u64> {
let demuxer_impl = &self.demuxer.lock().unwrap();
demuxer_impl.get_position(element)
}
fn get_duration(&self, element: &RsDemuxer) -> Option<u64> {
let demuxer_impl = &self.demuxer.lock().unwrap();
demuxer_impl.get_duration(element)
}
fn seek(
&self,
demuxer: &Demuxer,
element: &RsDemuxer,
start: u64,
stop: u64,
offset: &mut u64,
) -> bool {
let stop = if stop == u64::MAX { None } else { Some(stop) };
gst_debug!(self.cat, obj: element, "Seeking to {:?}-{:?}", start, stop);
let res = {
let mut demuxer_impl = &mut self.demuxer.lock().unwrap();
match demuxer_impl.seek(element, start, stop) {
Ok(res) => res,
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed to seek: {:?}", msg);
self.post_message(element, msg);
return false;
}
}
};
match res {
SeekResult::TooEarly => {
gst_debug!(self.cat, obj: element, "Seeked too early");
false
}
SeekResult::Ok(off) => {
gst_trace!(self.cat, obj: element, "Seeked successfully");
*offset = off;
true
}
SeekResult::Eos => {
gst_debug!(self.cat, obj: element, "Seeked after EOS");
*offset = u64::MAX;
demuxer.stream_eos(element, None);
true
}
}
}
fn handle_buffer(
&self,
demuxer: &Demuxer,
element: &RsDemuxer,
buffer: gst::Buffer,
) -> gst::FlowReturn {
let mut res = {
let mut demuxer_impl = &mut self.demuxer.lock().unwrap();
gst_trace!(self.cat, obj: element, "Handling buffer {:?}", buffer);
match demuxer_impl.handle_buffer(element, Some(buffer)) {
Ok(res) => res,
Err(flow_error) => {
gst_error!(
self.cat,
obj: element,
"Failed handling buffer: {:?}",
flow_error
);
match flow_error {
FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => {
self.post_message(element, msg)
}
_ => (),
}
return flow_error.to_native();
}
}
};
// Loop until AllEos, NeedMoreData or error when pushing downstream
loop {
gst_trace!(self.cat, obj: element, "Handled {:?}", res);
match res {
HandleBufferResult::NeedMoreData => {
return gst::FlowReturn::Ok;
}
HandleBufferResult::StreamAdded(stream) => {
demuxer.add_stream(element, stream.index, stream.caps, &stream.stream_id);
}
HandleBufferResult::HaveAllStreams => {
demuxer.added_all_streams(element);
}
HandleBufferResult::StreamChanged(stream) => {
demuxer.stream_format_changed(element, stream.index, stream.caps);
}
HandleBufferResult::StreamsChanged(streams) => for stream in streams {
demuxer.stream_format_changed(element, stream.index, stream.caps);
},
HandleBufferResult::BufferForStream(index, buffer) => {
let flow_ret = demuxer.stream_push_buffer(element, index, buffer);
if flow_ret != gst::FlowReturn::Ok {
return flow_ret;
}
}
HandleBufferResult::Eos(index) => {
demuxer.stream_eos(element, index);
return gst::FlowReturn::Eos;
}
HandleBufferResult::Again => {
// nothing, just call again
}
};
gst_trace!(self.cat, obj: element, "Calling again");
res = {
let mut demuxer_impl = &mut self.demuxer.lock().unwrap();
match demuxer_impl.handle_buffer(element, None) {
Ok(res) => res,
Err(flow_error) => {
gst_error!(
self.cat,
obj: element,
"Failed calling again: {:?}",
flow_error
);
match flow_error {
FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => {
self.post_message(element, msg)
}
_ => (),
}
return flow_error.to_native();
}
}
}
}
}
fn end_of_stream(&self, element: &RsDemuxer) {
let mut demuxer_impl = &mut self.demuxer.lock().unwrap();
gst_debug!(self.cat, obj: element, "End of stream");
match demuxer_impl.end_of_stream(element) {
Ok(_) => (),
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed end of stream: {:?}", msg);
self.post_message(element, msg);
}
}
}
fn post_message(&self, element: &RsDemuxer, msg: &ErrorMessage) {
msg.post(element);
}
}
pub struct DemuxerInfo { pub struct DemuxerInfo {
pub name: String, pub name: String,
pub long_name: String, pub long_name: String,
@ -344,11 +106,12 @@ pub struct DemuxerInfo {
} }
pub struct Demuxer { pub struct Demuxer {
cat: gst::DebugCategory,
sinkpad: gst::Pad, sinkpad: gst::Pad,
flow_combiner: Mutex<UniqueFlowCombiner>, flow_combiner: Mutex<UniqueFlowCombiner>,
group_id: Mutex<u32>, group_id: Mutex<u32>,
srcpads: Mutex<BTreeMap<u32, gst::Pad>>, srcpads: Mutex<BTreeMap<u32, gst::Pad>>,
wrap: Box<DemuxerWrapper>, imp: Mutex<Box<DemuxerImpl>>,
} }
#[derive(Default)] #[derive(Default)]
@ -372,17 +135,18 @@ unsafe impl Send for UniqueFlowCombiner {}
unsafe impl Sync for UniqueFlowCombiner {} unsafe impl Sync for UniqueFlowCombiner {}
impl Demuxer { impl Demuxer {
fn get_wrap(&self) -> &DemuxerWrapper {
&self.wrap
}
fn new(demuxer: &RsDemuxer, sinkpad: gst::Pad, demuxer_info: &DemuxerInfo) -> Self { fn new(demuxer: &RsDemuxer, sinkpad: gst::Pad, demuxer_info: &DemuxerInfo) -> Self {
Self { Self {
cat: gst::DebugCategory::new(
"rsdemux",
gst::DebugColorFlags::empty(),
"Rust demuxer base class",
),
sinkpad: sinkpad, sinkpad: sinkpad,
flow_combiner: Mutex::new(Default::default()), flow_combiner: Mutex::new(Default::default()),
group_id: Mutex::new(gst::util_group_id_next()), group_id: Mutex::new(gst::util_group_id_next()),
srcpads: Mutex::new(BTreeMap::new()), srcpads: Mutex::new(BTreeMap::new()),
wrap: Box::new(DemuxerWrapper::new((demuxer_info.create_instance)(demuxer))), imp: Mutex::new(((demuxer_info.create_instance)(demuxer))),
} }
} }
@ -531,6 +295,48 @@ impl Demuxer {
} }
} }
fn start(&self, element: &RsDemuxer, upstream_size: Option<u64>, random_access: bool) -> bool {
let demuxer_impl = &mut self.imp.lock().unwrap();
gst_debug!(
self.cat,
obj: element,
"Starting with upstream size {:?} and random access {}",
upstream_size,
random_access
);
match demuxer_impl.start(element, upstream_size, random_access) {
Ok(..) => {
gst_trace!(self.cat, obj: element, "Successfully started",);
true
}
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed to start: {:?}", msg);
msg.post(element);
false
}
}
}
fn stop(&self, element: &RsDemuxer) -> bool {
let demuxer_impl = &mut self.imp.lock().unwrap();
gst_debug!(self.cat, obj: element, "Stopping");
match demuxer_impl.stop(element) {
Ok(..) => {
gst_trace!(self.cat, obj: element, "Successfully stop");
true
}
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed to stop: {:?}", msg);
msg.post(element);
false
}
}
}
fn sink_activatemode( fn sink_activatemode(
_pad: &gst::Pad, _pad: &gst::Pad,
parent: &Option<gst::Object>, parent: &Option<gst::Object>,
@ -544,7 +350,6 @@ impl Demuxer {
.downcast::<RsElement>() .downcast::<RsElement>()
.unwrap(); .unwrap();
let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap(); let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap();
let wrap = demuxer.get_wrap();
if active { if active {
let mut query = gst::Query::new_duration(gst::Format::Bytes); let mut query = gst::Query::new_duration(gst::Format::Bytes);
@ -559,7 +364,7 @@ impl Demuxer {
None None
}; };
if !wrap.start(&element, upstream_size, mode == gst::PadMode::Pull) { if !demuxer.start(&element, upstream_size, mode == gst::PadMode::Pull) {
return false; return false;
} }
@ -574,7 +379,7 @@ impl Demuxer {
let _ = demuxer.sinkpad.stop_task(); let _ = demuxer.sinkpad.stop_task();
} }
wrap.stop(&element) demuxer.stop(&element)
} }
} }
@ -590,8 +395,92 @@ impl Demuxer {
.downcast::<RsElement>() .downcast::<RsElement>()
.unwrap(); .unwrap();
let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap(); let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap();
let wrap = demuxer.get_wrap();
wrap.handle_buffer(demuxer, &element, buffer) let mut res = {
let mut demuxer_impl = &mut demuxer.imp.lock().unwrap();
gst_trace!(demuxer.cat, obj: &element, "Handling buffer {:?}", buffer);
match demuxer_impl.handle_buffer(&element, Some(buffer)) {
Ok(res) => res,
Err(flow_error) => {
gst_error!(
demuxer.cat,
obj: &element,
"Failed handling buffer: {:?}",
flow_error
);
match flow_error {
FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => {
msg.post(&element)
}
_ => (),
}
return flow_error.to_native();
}
}
};
// Loop until AllEos, NeedMoreData or error when pushing downstream
loop {
gst_trace!(demuxer.cat, obj: &element, "Handled {:?}", res);
match res {
HandleBufferResult::NeedMoreData => {
return gst::FlowReturn::Ok;
}
HandleBufferResult::StreamAdded(stream) => {
demuxer.add_stream(&element, stream.index, stream.caps, &stream.stream_id);
}
HandleBufferResult::HaveAllStreams => {
demuxer.added_all_streams(&element);
}
HandleBufferResult::StreamChanged(stream) => {
demuxer.stream_format_changed(&element, stream.index, stream.caps);
}
HandleBufferResult::StreamsChanged(streams) => for stream in streams {
demuxer.stream_format_changed(&element, stream.index, stream.caps);
},
HandleBufferResult::BufferForStream(index, buffer) => {
let flow_ret = demuxer.stream_push_buffer(&element, index, buffer);
if flow_ret != gst::FlowReturn::Ok {
return flow_ret;
}
}
HandleBufferResult::Eos(index) => {
demuxer.stream_eos(&element, index);
return gst::FlowReturn::Eos;
}
HandleBufferResult::Again => {
// nothing, just call again
}
};
gst_trace!(demuxer.cat, obj: &element, "Calling again");
res = {
let mut demuxer_impl = &mut demuxer.imp.lock().unwrap();
match demuxer_impl.handle_buffer(&element, None) {
Ok(res) => res,
Err(flow_error) => {
gst_error!(
demuxer.cat,
obj: &element,
"Failed calling again: {:?}",
flow_error
);
match flow_error {
FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => {
msg.post(&element)
}
_ => (),
}
return flow_error.to_native();
}
}
}
}
} }
fn sink_event(pad: &gst::Pad, parent: &Option<gst::Object>, event: gst::Event) -> bool { fn sink_event(pad: &gst::Pad, parent: &Option<gst::Object>, event: gst::Event) -> bool {
@ -604,11 +493,24 @@ impl Demuxer {
.downcast::<RsElement>() .downcast::<RsElement>()
.unwrap(); .unwrap();
let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap(); let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap();
let wrap = demuxer.get_wrap();
match event.view() { match event.view() {
EventView::Eos(..) => { EventView::Eos(..) => {
wrap.end_of_stream(&element); let mut demuxer_impl = &mut demuxer.imp.lock().unwrap();
gst_debug!(demuxer.cat, obj: &element, "End of stream");
match demuxer_impl.end_of_stream(&element) {
Ok(_) => (),
Err(ref msg) => {
gst_error!(
demuxer.cat,
obj: &element,
"Failed end of stream: {:?}",
msg
);
msg.post(&element);
}
}
pad.event_default(parent.as_ref(), event) pad.event_default(parent.as_ref(), event)
} }
EventView::Segment(..) => pad.event_default(parent.as_ref(), event), EventView::Segment(..) => pad.event_default(parent.as_ref(), event),
@ -626,14 +528,20 @@ impl Demuxer {
.downcast::<RsElement>() .downcast::<RsElement>()
.unwrap(); .unwrap();
let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap(); let demuxer = element.get_impl().downcast_ref::<Demuxer>().unwrap();
let wrap = demuxer.get_wrap();
match query.view_mut() { match query.view_mut() {
QueryView::Position(ref mut q) => { QueryView::Position(ref mut q) => {
let (fmt, _) = q.get(); let (fmt, _) = q.get();
if fmt == gst::Format::Time { if fmt == gst::Format::Time {
let position = wrap.get_position(&element); let demuxer_impl = &demuxer.imp.lock().unwrap();
gst_trace!(wrap.cat, obj: &element, "Returning position {:?}", position);
let position = demuxer_impl.get_position(&element);
gst_trace!(
demuxer.cat,
obj: &element,
"Returning position {:?}",
position
);
match position { match position {
None => return false, None => return false,
@ -649,8 +557,15 @@ impl Demuxer {
QueryView::Duration(ref mut q) => { QueryView::Duration(ref mut q) => {
let (fmt, _) = q.get(); let (fmt, _) = q.get();
if fmt == gst::Format::Time { if fmt == gst::Format::Time {
let duration = wrap.get_duration(&element); let demuxer_impl = &demuxer.imp.lock().unwrap();
gst_trace!(wrap.cat, obj: &element, "Returning duration {:?}", duration);
let duration = demuxer_impl.get_duration(&element);
gst_trace!(
demuxer.cat,
obj: &element,
"Returning duration {:?}",
duration
);
match duration { match duration {
None => return false, None => return false,
@ -682,6 +597,45 @@ impl Demuxer {
_ => pad.event_default(parent.as_ref(), event), _ => pad.event_default(parent.as_ref(), event),
} }
} }
fn seek(&self, element: &RsDemuxer, start: u64, stop: u64, offset: &mut u64) -> bool {
let stop = if stop == u64::MAX { None } else { Some(stop) };
gst_debug!(self.cat, obj: element, "Seeking to {:?}-{:?}", start, stop);
let res = {
let mut demuxer_impl = &mut self.imp.lock().unwrap();
match demuxer_impl.seek(element, start, stop) {
Ok(res) => res,
Err(ref msg) => {
gst_error!(self.cat, obj: element, "Failed to seek: {:?}", msg);
msg.post(element);
return false;
}
}
};
match res {
SeekResult::TooEarly => {
gst_debug!(self.cat, obj: element, "Seeked too early");
false
}
SeekResult::Ok(off) => {
gst_trace!(self.cat, obj: element, "Seeked successfully");
*offset = off;
true
}
SeekResult::Eos => {
gst_debug!(self.cat, obj: element, "Seeked after EOS");
*offset = u64::MAX;
self.stream_eos(&element, None);
true
}
}
}
} }
impl ElementImpl for Demuxer { impl ElementImpl for Demuxer {
@ -691,9 +645,8 @@ impl ElementImpl for Demuxer {
transition: gst::StateChange, transition: gst::StateChange,
) -> gst::StateChangeReturn { ) -> gst::StateChangeReturn {
let mut ret = gst::StateChangeReturn::Success; let mut ret = gst::StateChangeReturn::Success;
let wrap = self.get_wrap();
gst_trace!(wrap.cat, obj: element, "Changing state {:?}", transition); gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
match transition { match transition {
gst::StateChange::ReadyToPaused => { gst::StateChange::ReadyToPaused => {