threadshare: Implement propert event/query handling

This commit is contained in:
Sebastian Dröge 2018-03-15 21:17:01 +02:00
parent e269e51524
commit 613706d446

View file

@ -125,6 +125,7 @@ struct State {
io_context: Option<IOContext>,
socket: Option<Socket>,
need_initial_events: bool,
configured_caps: Option<gst::Caps>,
}
impl Default for State {
@ -133,6 +134,7 @@ impl Default for State {
io_context: None,
socket: None,
need_initial_events: true,
configured_caps: None,
}
}
}
@ -212,37 +214,77 @@ impl UdpSrc {
element.catch_panic(fallback, |element| f(udpsrc, element))
}
fn src_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool {
fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
let mut handled = true;
match event.view() {
EventView::FlushStart(..) => {}
EventView::FlushStop(..) => {}
_ => (),
}
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = self.stop(element);
true
}
EventView::FlushStop(..) => {
let (ret, state, pending) = element.get_state(0.into());
if ret == gst::StateChangeReturn::Success && state == gst::State::Playing
|| ret == gst::StateChangeReturn::Async && pending == gst::State::Playing
{
let _ = self.start(element);
}
true
}
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
};
if handled {
if ret {
gst_log!(self.cat, obj: pad, "Handled event {:?}", event);
pad.event_default(Some(element), event)
} else {
gst_log!(self.cat, obj: pad, "Didn't handle event {:?}", event);
false
}
ret
}
fn src_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool {
fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
match query.view_mut() {
_ => (),
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), 0.into());
true
}
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
true
}
QueryView::Caps(ref mut q) => {
let state = self.state.lock().unwrap();
let caps = if let Some(ref caps) = state.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or(caps.clone())
} else {
q.get_filter()
.map(|f| f.to_owned())
.unwrap_or(gst::Caps::new_any())
};
q.set_result(&caps);
true
}
_ => false,
};
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
pad.query_default(Some(element), query)
if ret {
gst_log!(self.cat, obj: pad, "Handled query {:?}", query);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle query {:?}", query);
}
ret
}
fn prepare(&self, element: &Element) -> Result<(), ()> {
@ -322,25 +364,28 @@ impl UdpSrc {
socket.schedule(&io_context, move |buffer| {
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
let mut events = Vec::new();
let mut state = udpsrc.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(udpsrc.cat, obj: &element_clone, "Pushing initial events");
// TODO: Invent a stream id
udpsrc
.src_pad
.push_event(gst::Event::new_stream_start("meh").build());
events.push(gst::Event::new_stream_start("meh").build());
if let Some(ref caps) = settings.caps {
udpsrc
.src_pad
.push_event(gst::Event::new_caps(&caps).build());
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
udpsrc.src_pad.push_event(
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
state.need_initial_events = false;
}
drop(state);
for event in events {
udpsrc.src_pad.push_event(event);
}
// TODO: Error handling
udpsrc.src_pad.push(buffer).into_result().unwrap();
@ -373,7 +418,7 @@ impl UdpSrc {
fn start(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let mut state = self.state.lock().unwrap();
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.unpause(element.get_clock().unwrap(), element.get_base_time());
@ -386,7 +431,7 @@ impl UdpSrc {
fn stop(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();