diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index f0f9af43..35b446f0 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -125,6 +125,7 @@ struct State { io_context: Option, socket: Option, need_initial_events: bool, + configured_caps: Option, } impl Default for State { @@ -133,6 +134,7 @@ impl Default for State { io_context: None, socket: None, need_initial_events: true, + configured_caps: None, } } } @@ -212,37 +214,77 @@ impl UdpSrc { element.catch_panic(fallback, |element| f(udpsrc, element)) } - fn src_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(self.cat, obj: pad, "Handling event {:?}", event); - let mut handled = true; - match event.view() { - EventView::FlushStart(..) => {} - EventView::FlushStop(..) => {} - _ => (), - } + let ret = match event.view() { + EventView::FlushStart(..) => { + let _ = self.stop(element); + true + } + EventView::FlushStop(..) => { + let (ret, state, pending) = element.get_state(0.into()); + if ret == gst::StateChangeReturn::Success && state == gst::State::Playing + || ret == gst::StateChangeReturn::Async && pending == gst::State::Playing + { + let _ = self.start(element); + } + true + } + EventView::Reconfigure(..) => true, + EventView::Latency(..) => true, + _ => false, + }; - if handled { + if ret { gst_log!(self.cat, obj: pad, "Handled event {:?}", event); - pad.event_default(Some(element), event) } else { gst_log!(self.cat, obj: pad, "Didn't handle event {:?}", event); - false } + ret } - fn src_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool { + fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool { use gst::QueryView; gst_log!(self.cat, obj: pad, "Handling query {:?}", query); - match query.view_mut() { - _ => (), + let ret = match query.view_mut() { + QueryView::Latency(ref mut q) => { + q.set(true, 0.into(), 0.into()); + true + } + QueryView::Scheduling(ref mut q) => { + q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); + q.add_scheduling_modes(&[gst::PadMode::Push]); + true + } + QueryView::Caps(ref mut q) => { + let state = self.state.lock().unwrap(); + let caps = if let Some(ref caps) = state.configured_caps { + q.get_filter() + .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) + .unwrap_or(caps.clone()) + } else { + q.get_filter() + .map(|f| f.to_owned()) + .unwrap_or(gst::Caps::new_any()) + }; + + q.set_result(&caps); + + true + } + _ => false, }; - gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query); - pad.query_default(Some(element), query) + if ret { + gst_log!(self.cat, obj: pad, "Handled query {:?}", query); + } else { + gst_log!(self.cat, obj: pad, "Didn't handle query {:?}", query); + } + ret } fn prepare(&self, element: &Element) -> Result<(), ()> { @@ -322,25 +364,28 @@ impl UdpSrc { socket.schedule(&io_context, move |buffer| { let udpsrc = element_clone.get_impl().downcast_ref::().unwrap(); + let mut events = Vec::new(); let mut state = udpsrc.state.lock().unwrap(); if state.need_initial_events { gst_debug!(udpsrc.cat, obj: &element_clone, "Pushing initial events"); // TODO: Invent a stream id - udpsrc - .src_pad - .push_event(gst::Event::new_stream_start("meh").build()); + events.push(gst::Event::new_stream_start("meh").build()); if let Some(ref caps) = settings.caps { - udpsrc - .src_pad - .push_event(gst::Event::new_caps(&caps).build()); + events.push(gst::Event::new_caps(&caps).build()); + state.configured_caps = Some(caps.clone()); } - udpsrc.src_pad.push_event( + events.push( gst::Event::new_segment(&gst::FormattedSegment::::new()) .build(), ); state.need_initial_events = false; } + drop(state); + + for event in events { + udpsrc.src_pad.push_event(event); + } // TODO: Error handling udpsrc.src_pad.push(buffer).into_result().unwrap(); @@ -373,7 +418,7 @@ impl UdpSrc { fn start(&self, element: &Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Starting"); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if let Some(ref socket) = state.socket { socket.unpause(element.get_clock().unwrap(), element.get_base_time()); @@ -386,7 +431,7 @@ impl UdpSrc { fn stop(&self, element: &Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Stopping"); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if let Some(ref socket) = state.socket { socket.pause();