threadshare: Minor appsrc refactoring

This commit is contained in:
Sebastian Dröge 2018-04-03 20:03:44 +03:00
parent d52f1d49dd
commit 82ab78fa3d

View file

@ -173,42 +173,10 @@ impl AppSrc {
.unwrap()
.downcast::<Element>()
.unwrap();
let mut buffer = args[1].get::<gst::Buffer>().unwrap();
let buffer = args[1].get::<gst::Buffer>().unwrap();
let appsrc = element.get_impl().downcast_ref::<AppSrc>().unwrap();
let settings = appsrc.settings.lock().unwrap().clone();
if settings.do_timestamp {
if let Some(clock) = element.get_clock() {
let base_time = element.get_base_time();
let now = clock.get_time();
let buffer = buffer.make_mut();
buffer.set_dts(now - base_time);
buffer.set_pts(gst::CLOCK_TIME_NONE);
} else {
gst_error!(appsrc.cat, obj: &element, "Don't have a clock yet");
return Some(false.to_value());
}
}
let mut state = appsrc.state.lock().unwrap();
if let Some(ref mut channel) = state.channel {
match channel.try_send(Either::Left(buffer)) {
Ok(_) => Some(true.to_value()),
Err(err) => {
gst_error!(
appsrc.cat,
obj: &element,
"Failed to queue buffer: {}",
err
);
Some(false.to_value())
}
}
} else {
Some(false.to_value())
}
Some(appsrc.push_buffer(&element, buffer).to_value())
},
);
@ -219,19 +187,7 @@ impl AppSrc {
.downcast::<Element>()
.unwrap();
let appsrc = element.get_impl().downcast_ref::<AppSrc>().unwrap();
let mut state = appsrc.state.lock().unwrap();
if let Some(ref mut channel) = state.channel {
match channel.try_send(Either::Right(gst::Event::new_eos().build())) {
Ok(_) => Some(true.to_value()),
Err(err) => {
gst_error!(appsrc.cat, obj: &element, "Failed to queue EOS: {}", err);
Some(false.to_value())
}
}
} else {
Some(false.to_value())
}
Some(appsrc.end_of_stream(&element).to_value())
});
}
@ -376,6 +332,52 @@ impl AppSrc {
ret
}
fn push_buffer(&self, element: &Element, mut buffer: gst::Buffer) -> bool {
let settings = self.settings.lock().unwrap().clone();
if settings.do_timestamp {
if let Some(clock) = element.get_clock() {
let base_time = element.get_base_time();
let now = clock.get_time();
let buffer = buffer.make_mut();
buffer.set_dts(now - base_time);
buffer.set_pts(gst::CLOCK_TIME_NONE);
} else {
gst_error!(self.cat, obj: element, "Don't have a clock yet");
return false;
}
}
let mut state = self.state.lock().unwrap();
if let Some(ref mut channel) = state.channel {
match channel.try_send(Either::Left(buffer)) {
Ok(_) => true,
Err(err) => {
gst_error!(self.cat, obj: element, "Failed to queue buffer: {}", err);
false
}
}
} else {
false
}
}
fn end_of_stream(&self, element: &Element) -> bool {
let mut state = self.state.lock().unwrap();
if let Some(ref mut channel) = state.channel {
match channel.try_send(Either::Right(gst::Event::new_eos().build())) {
Ok(_) => true,
Err(err) => {
gst_error!(self.cat, obj: element, "Failed to queue EOS: {}", err);
false
}
}
} else {
false
}
}
fn push_item(
&self,
element: &Element,