diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index 35ab8603..689b276b 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -1,4 +1,5 @@ // Copyright (C) 2018 Sebastian Dröge +// Copyright (C) 2019-2020 François Laignel // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -40,7 +41,7 @@ use std::sync::Mutex as StdMutex; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef}; +use crate::runtime::{Context, PadSrc, PadSrcRef, Task}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; @@ -152,134 +153,35 @@ impl Default for AppSrcPadHandlerState { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct AppSrcPadHandlerInner { state: FutMutex, configured_caps: StdMutex>, - receiver: FutMutex>, } -impl AppSrcPadHandlerInner { - fn new(receiver: mpsc::Receiver, caps: Option) -> Self { - AppSrcPadHandlerInner { - state: FutMutex::new(AppSrcPadHandlerState { - caps, - ..Default::default() - }), - configured_caps: StdMutex::new(None), - receiver: FutMutex::new(receiver), - } - } -} - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] struct AppSrcPadHandler(Arc); impl AppSrcPadHandler { - fn new(receiver: mpsc::Receiver, caps: Option) -> AppSrcPadHandler { - AppSrcPadHandler(Arc::new(AppSrcPadHandlerInner::new(receiver, caps))) - } - - fn reset(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); - - *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); - *self.0.configured_caps.lock().unwrap() = None; - - self.flush(pad); - - gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); - } - - fn flush(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - - // Purge the channel - let mut receiver = self - .0 - .receiver - .try_lock() - .expect("Channel receiver is locked elsewhere"); - loop { - match receiver.try_next() { - Ok(Some(_item)) => { - gst_log!(CAT, obj: pad.gst_pad(), "Dropping pending item"); - } - Err(_) => { - gst_log!(CAT, obj: pad.gst_pad(), "No more pending item"); - break; - } - Ok(None) => { - panic!("Channel sender dropped"); - } - } - } - + fn prepare(&self, caps: Option) { self.0 .state .try_lock() - .expect("state is locked elsewhere") - .need_segment = true; - - gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + .expect("State locked elsewhere") + .caps = caps; } - fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { - let this = self.clone(); - let pad_weak = pad.downgrade(); - let element = element.clone(); + fn reset(&self) { + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + } - pad.start_task(move || { - let this = this.clone(); - let pad_weak = pad_weak.clone(); - let element = element.clone(); - async move { - let item = this.0.receiver.lock().await.next().await; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - - let item = match item { - Some(item) => item, - None => { - gst_log!(CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); - return glib::Continue(false); - } - }; - - match this.push_item(&pad, &element, item).await { - Ok(_) => { - gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item"); - glib::Continue(true) - } - Err(gst::FlowError::Eos) => { - gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); - let eos = gst::Event::new_eos().build(); - pad.push_event(eos).await; - glib::Continue(false) - } - Err(gst::FlowError::Flushing) => { - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - glib::Continue(false) - } - Err(err) => { - gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); - gst_element_error!( - &element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - glib::Continue(false) - } - } - } - }); + fn set_need_segment(&self) { + self.0 + .state + .try_lock() + .expect("State locked elsewhere") + .need_segment = true; } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { @@ -429,9 +331,11 @@ enum AppSrcState { #[derive(Debug)] struct AppSrc { src_pad: PadSrc, - src_pad_handler: StdMutex>, + src_pad_handler: AppSrcPadHandler, + task: Task, state: StdMutex, sender: StdMutex>>, + receiver: StdMutex>>>>, settings: StdMutex, } @@ -439,7 +343,7 @@ impl AppSrc { fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool { let state = self.state.lock().unwrap(); if *state == AppSrcState::RejectBuffers { - gst_debug!(CAT, obj: element, "Rejecting buffer due to element state"); + gst_debug!(CAT, obj: element, "Rejecting buffer due to pad state"); return false; } @@ -503,22 +407,25 @@ impl AppSrc { ) })?; - let max_buffers = settings.max_buffers.try_into().unwrap(); + let max_buffers = settings.max_buffers.try_into().map_err(|err| { + gst_error_msg!( + gst::ResourceError::Settings, + ["Invalid max-buffers: {}, {}", settings.max_buffers, err] + ) + })?; + let (sender, receiver) = mpsc::channel(max_buffers); *self.sender.lock().unwrap() = Some(sender); + *self.receiver.lock().unwrap() = Some(Arc::new(FutMutex::new(receiver))); - let src_pad_handler = AppSrcPadHandler::new(receiver, settings.caps.clone()); - - self.src_pad - .prepare(context, &src_pad_handler) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing src_pads: {:?}", err] - ) - })?; - - *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + self.task.prepare(context).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + self.src_pad_handler.prepare(settings.caps.clone()); + self.src_pad.prepare(&self.src_pad_handler); gst_debug!(CAT, obj: element, "Prepared"); @@ -528,10 +435,11 @@ impl AppSrc { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); - let _ = self.src_pad.unprepare(); - *self.src_pad_handler.lock().unwrap() = None; - *self.sender.lock().unwrap() = None; + *self.receiver.lock().unwrap() = None; + + self.task.unprepare().unwrap(); + self.src_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); @@ -539,26 +447,51 @@ impl AppSrc { } fn stop(&self, element: &gst::Element) -> Result<(), ()> { + let mut state = self.state.lock().unwrap(); gst_debug!(CAT, obj: element, "Stopping"); - *self.state.lock().unwrap() = AppSrcState::RejectBuffers; - - // Now stop the task if it was still running, blocking - // until this has actually happened - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .reset(&self.src_pad.as_ref()); + self.flush(element); + self.src_pad_handler.reset(); + *state = AppSrcState::RejectBuffers; gst_debug!(CAT, obj: element, "Stopped"); Ok(()) } + fn flush(&self, element: &gst::Element) { + gst_log!(CAT, obj: element, "Flushing"); + + self.task.stop(); + + let receiver = self.receiver.lock().unwrap(); + let mut receiver = receiver + .as_ref() + .unwrap() + .try_lock() + .expect("receiver locked elsewhere"); + + // Purge the channel + loop { + match receiver.try_next() { + Ok(Some(_item)) => { + gst_log!(CAT, obj: element, "Dropping pending item"); + } + Err(_) => { + gst_log!(CAT, obj: element, "No more pending item"); + break; + } + Ok(None) => { + panic!("Channel sender dropped"); + } + } + } + + self.src_pad_handler.set_need_segment(); + + gst_log!(CAT, obj: element, "Flushed"); + } + fn start(&self, element: &gst::Element) -> Result<(), ()> { let mut state = self.state.lock().unwrap(); if *state == AppSrcState::Started { @@ -568,16 +501,70 @@ impl AppSrc { gst_debug!(CAT, obj: element, "Starting"); - self.start_unchecked(element, &mut state); + self.start_task(element); + *state = AppSrcState::Started; gst_debug!(CAT, obj: element, "Started"); Ok(()) } + fn start_task(&self, element: &gst::Element) { + let src_pad_handler = self.src_pad_handler.clone(); + let pad_weak = self.src_pad.downgrade(); + let element = element.clone(); + let receiver = Arc::clone(self.receiver.lock().unwrap().as_ref().expect("No receiver")); + + self.task.start(move || { + let src_pad_handler = src_pad_handler.clone(); + let pad_weak = pad_weak.clone(); + let element = element.clone(); + let receiver = Arc::clone(&receiver); + + async move { + let item = receiver.lock().await.next().await; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + + let item = match item { + Some(item) => item, + None => { + gst_log!(CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); + return glib::Continue(false); + } + }; + + match src_pad_handler.push_item(&pad, &element, item).await { + Ok(_) => { + gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item"); + glib::Continue(true) + } + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); + let eos = gst::Event::new_eos().build(); + pad.push_event(eos).await; + glib::Continue(false) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + glib::Continue(false) + } + Err(err) => { + gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); + gst_element_error!( + &element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + glib::Continue(false) + } + } + } + }); + } + fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `state` until `flush_stop` is complete - // so as to prevent race conditions due to concurrent state transitions. let mut state = self.state.lock().unwrap(); if *state == AppSrcState::Started { gst_debug!(CAT, obj: element, "Already started"); @@ -586,52 +573,28 @@ impl AppSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .flush(&self.src_pad.as_ref()); - - self.start_unchecked(element, &mut state); + self.flush(element); + self.start_task(element); + *state = AppSrcState::Started; gst_debug!(CAT, obj: element, "Flush Stopped"); } - fn start_unchecked(&self, element: &gst::Element, state: &mut AppSrcState) { - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .start_task(self.src_pad.as_ref(), element); - - *state = AppSrcState::Started; - } - fn flush_start(&self, element: &gst::Element) { - // Keep the lock on the `state` until `flush_start` is complete - // so as to prevent race conditions due to concurrent state transitions. let mut state = self.state.lock().unwrap(); - gst_debug!(CAT, obj: element, "Starting Flush"); + self.task.cancel(); *state = AppSrcState::RejectBuffers; - self.src_pad.cancel_task(); gst_debug!(CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - // Lock the state to prevent race condition due to concurrent FlushStop let mut state = self.state.lock().unwrap(); - gst_debug!(CAT, obj: element, "Pausing"); - self.src_pad.pause_task(); - + self.task.pause(); *state = AppSrcState::Paused; gst_debug!(CAT, obj: element, "Paused"); @@ -706,14 +669,16 @@ impl ObjectSubclass for AppSrc { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - Self { - src_pad, - src_pad_handler: StdMutex::new(None), + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), + src_pad_handler: AppSrcPadHandler::default(), + task: Task::default(), state: StdMutex::new(AppSrcState::RejectBuffers), sender: StdMutex::new(None), + receiver: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } } diff --git a/gst-plugin-threadshare/src/inputselector.rs b/gst-plugin-threadshare/src/inputselector.rs index ca30b06a..0d7ed545 100644 --- a/gst-plugin-threadshare/src/inputselector.rs +++ b/gst-plugin-threadshare/src/inputselector.rs @@ -28,7 +28,7 @@ use glib::{glib_object_impl, glib_object_subclass}; use gst; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_error_msg, gst_log, gst_trace}; +use gst::{gst_debug, gst_log, gst_trace}; use lazy_static::lazy_static; @@ -39,7 +39,7 @@ use std::u32; use crate::get_current_running_time; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; @@ -251,11 +251,8 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { let mut inner = this.0.lock().unwrap(); // Remember the segment for later use - match event.view() { - gst::EventView::Segment(e) => { - inner.segment = Some(e.get_segment().clone()); - } - _ => (), + if let gst::EventView::Segment(e) = event.view() { + inner.segment = Some(e.get_segment().clone()); } // We sent sticky events together with the next buffer once it becomes @@ -282,18 +279,15 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { event: gst::Event, ) -> bool { /* Drop all events for now */ - match event.view() { - gst::EventView::FlushStart(..) => { - /* Unblock downstream */ - inputselector.src_pad.gst_pad().push_event(event.clone()); + if let gst::EventView::FlushStart(..) = event.view() { + /* Unblock downstream */ + inputselector.src_pad.gst_pad().push_event(event.clone()); - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.lock().unwrap(); - if let Some(abort_handle) = inner.abort_handle.take() { - abort_handle.abort(); - } + if let Some(abort_handle) = inner.abort_handle.take() { + abort_handle.abort(); } - _ => (), } true } @@ -438,24 +432,7 @@ impl InputSelector { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_debug!(CAT, obj: element, "Preparing"); - let settings = self.settings.lock().unwrap(); - - let context = - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to acquire Context: {}", err] - ) - })?; - - self.src_pad - .prepare(context, &InputSelectorPadSrcHandler {}) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error joining Context: {:?}", err] - ) - })?; + self.src_pad.prepare(&InputSelectorPadSrcHandler); gst_debug!(CAT, obj: element, "Prepared"); @@ -466,7 +443,7 @@ impl InputSelector { let mut state = self.state.lock().unwrap(); gst_debug!(CAT, obj: element, "Unpreparing"); - let _ = self.src_pad.unprepare(); + self.src_pad.unprepare(); *state = State::default(); @@ -516,11 +493,11 @@ impl ObjectSubclass for InputSelector { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - Self { - src_pad, + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), pads: Mutex::new(Pads::default()), diff --git a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs index 9ec4b768..6e2a2244 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs @@ -42,7 +42,7 @@ use std::time::Duration; use crate::get_current_running_time; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; @@ -662,7 +662,7 @@ impl PadSinkHandler for SinkHandler { gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); if let EventView::FlushStart(..) = event.view() { - jb.src_pad.cancel_task(); + jb.task.cancel(); } gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); @@ -961,145 +961,6 @@ impl SrcHandler { (now, Some((next_wakeup, Duration::from_nanos(delay)))) } - - fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { - let this = self.clone(); - let element = element.clone(); - - pad.start_task(move || { - let this = this.clone(); - let element = element.clone(); - - async move { - let jb = JitterBuffer::from_instance(&element); - let (latency, context_wait) = { - let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND, - settings.context_wait as u64 * gst::MSECOND, - ) - }; - - loop { - let delay_fut = { - let mut state = jb.state.lock().unwrap(); - let (_, next_wakeup) = - this.get_next_wakeup(&element, &state, latency, context_wait); - - let (delay_fut, abort_handle) = match next_wakeup { - Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), - _ => { - let (delay_fut, abort_handle) = abortable(async move { - match next_wakeup { - Some((_, delay)) => { - runtime::time::delay_for(delay).await; - } - None => { - future::pending::<()>().await; - } - }; - }); - - let next_wakeup = - next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE); - (Some(delay_fut), Some((next_wakeup, abort_handle))) - } - }; - - state.wait_handle = abort_handle; - - delay_fut - }; - - // Got aborted, reschedule if needed - if let Some(delay_fut) = delay_fut { - gst_debug!(CAT, obj: &element, "Waiting"); - if let Err(Aborted) = delay_fut.await { - gst_debug!(CAT, obj: &element, "Waiting aborted"); - return glib::Continue(true); - } - } - - let (head_pts, head_seq) = { - let state = jb.state.lock().unwrap(); - // - // Check earliest PTS as we have just taken the lock - let (now, next_wakeup) = - this.get_next_wakeup(&element, &state, latency, context_wait); - - gst_debug!( - CAT, - obj: &element, - "Woke up at {}, earliest_pts {}", - now, - state.earliest_pts - ); - - if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup > now { - // Reschedule and wait a bit longer in the next iteration - return glib::Continue(true); - } - } else { - return glib::Continue(true); - } - - let (head_pts, head_seq) = state.jbuf.borrow().peek(); - - (head_pts, head_seq) - }; - - let res = this.pop_and_push(&element).await; - - { - let mut state = jb.state.lock().unwrap(); - - state.last_res = res; - - if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum { - let (earliest_pts, earliest_seqnum) = - state.jbuf.borrow().find_earliest(); - state.earliest_pts = earliest_pts; - state.earliest_seqnum = earliest_seqnum; - } - - if res.is_ok() { - // Return and reschedule if the next packet would be in the future - // Check earliest PTS as we have just taken the lock - let (now, next_wakeup) = - this.get_next_wakeup(&element, &state, latency, context_wait); - if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup > now { - // Reschedule and wait a bit longer in the next iteration - return glib::Continue(true); - } - } else { - return glib::Continue(true); - } - } - } - - match res { - Ok(_) => (), - Err(gst::FlowError::Eos) => { - gst_debug!(CAT, obj: &element, "Pushing EOS event",); - let event = gst::Event::new_eos().build(); - let _ = jb.src_pad.push_event(event).await; - return glib::Continue(false); - } - Err(gst::FlowError::Flushing) => { - gst_debug!(CAT, obj: &element, "Flushing",); - return glib::Continue(false); - } - Err(err) => { - gst_error!(CAT, obj: &element, "Error {}", err,); - return glib::Continue(false); - } - } - } - } - }); - } } impl PadSrcHandler for SrcHandler { @@ -1118,7 +979,7 @@ impl PadSrcHandler for SrcHandler { match event.view() { EventView::FlushStart(..) => { - jb.src_pad.cancel_task(); + jb.task.cancel(); } EventView::FlushStop(..) => { jb.flush_stop(element); @@ -1252,6 +1113,7 @@ struct JitterBuffer { src_pad: PadSrc, sink_pad_handler: SinkHandler, src_pad_handler: SrcHandler, + task: Task, state: StdMutex, settings: StdMutex, } @@ -1281,15 +1143,14 @@ impl JitterBuffer { Context::acquire(&settings.context, settings.context_wait).unwrap() }; - self.src_pad - .prepare(context, &self.src_pad_handler) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing src_pad: {:?}", err] - ) - })?; + self.task.prepare(context).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + self.src_pad.prepare(&self.src_pad_handler); self.sink_pad.prepare(&self.sink_pad_handler); gst_info!(CAT, obj: element, "Prepared"); @@ -1300,8 +1161,9 @@ impl JitterBuffer { fn unprepare(&self, element: &gst::Element) { gst_debug!(CAT, obj: element, "Unpreparing"); + self.task.unprepare().unwrap(); self.sink_pad.unprepare(); - let _ = self.src_pad.unprepare(); + self.src_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); } @@ -1313,12 +1175,162 @@ impl JitterBuffer { self.sink_pad_handler.clear(); self.src_pad_handler.clear(); - self.src_pad_handler - .start_task(self.src_pad.as_ref(), element); + self.start_task(element); gst_debug!(CAT, obj: element, "Started"); } + fn start_task(&self, element: &gst::Element) { + let src_pad_handler = self.src_pad_handler.clone(); + let element = element.clone(); + + self.task.start(move || { + let src_pad_handler = src_pad_handler.clone(); + let element = element.clone(); + + async move { + let jb = JitterBuffer::from_instance(&element); + let (latency, context_wait) = { + let settings = jb.settings.lock().unwrap(); + ( + settings.latency_ms as u64 * gst::MSECOND, + settings.context_wait as u64 * gst::MSECOND, + ) + }; + + loop { + let delay_fut = { + let mut state = jb.state.lock().unwrap(); + let (_, next_wakeup) = src_pad_handler.get_next_wakeup( + &element, + &state, + latency, + context_wait, + ); + + let (delay_fut, abort_handle) = match next_wakeup { + Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), + _ => { + let (delay_fut, abort_handle) = abortable(async move { + match next_wakeup { + Some((_, delay)) => { + runtime::time::delay_for(delay).await; + } + None => { + future::pending::<()>().await; + } + }; + }); + + let next_wakeup = + next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE); + (Some(delay_fut), Some((next_wakeup, abort_handle))) + } + }; + + state.wait_handle = abort_handle; + + delay_fut + }; + + // Got aborted, reschedule if needed + if let Some(delay_fut) = delay_fut { + gst_debug!(CAT, obj: &element, "Waiting"); + if let Err(Aborted) = delay_fut.await { + gst_debug!(CAT, obj: &element, "Waiting aborted"); + return glib::Continue(true); + } + } + + let (head_pts, head_seq) = { + let state = jb.state.lock().unwrap(); + // + // Check earliest PTS as we have just taken the lock + let (now, next_wakeup) = src_pad_handler.get_next_wakeup( + &element, + &state, + latency, + context_wait, + ); + + gst_debug!( + CAT, + obj: &element, + "Woke up at {}, earliest_pts {}", + now, + state.earliest_pts + ); + + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup > now { + // Reschedule and wait a bit longer in the next iteration + return glib::Continue(true); + } + } else { + return glib::Continue(true); + } + + let (head_pts, head_seq) = state.jbuf.borrow().peek(); + + (head_pts, head_seq) + }; + + let res = src_pad_handler.pop_and_push(&element).await; + + { + let mut state = jb.state.lock().unwrap(); + + state.last_res = res; + + if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum { + let (earliest_pts, earliest_seqnum) = + state.jbuf.borrow().find_earliest(); + state.earliest_pts = earliest_pts; + state.earliest_seqnum = earliest_seqnum; + } + + if res.is_ok() { + // Return and reschedule if the next packet would be in the future + // Check earliest PTS as we have just taken the lock + let (now, next_wakeup) = src_pad_handler.get_next_wakeup( + &element, + &state, + latency, + context_wait, + ); + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup > now { + // Reschedule and wait a bit longer in the next iteration + return glib::Continue(true); + } + } else { + return glib::Continue(true); + } + } + } + + match res { + Ok(_) => (), + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: &element, "Pushing EOS event",); + let event = gst::Event::new_eos().build(); + let _ = jb.src_pad.push_event(event).await; + return glib::Continue(false); + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: &element, "Flushing",); + return glib::Continue(false); + } + Err(err) => { + gst_error!(CAT, obj: &element, "Error {}", err,); + return glib::Continue(false); + } + } + } + } + }); + } + fn stop(&self, element: &gst::Element) { gst_debug!(CAT, obj: element, "Stopping"); @@ -1326,7 +1338,7 @@ impl JitterBuffer { abort_handle.abort(); } - self.src_pad.stop_task(); + self.task.stop(); self.src_pad_handler.clear(); self.sink_pad_handler.clear(); @@ -1336,7 +1348,7 @@ impl JitterBuffer { } fn flush_stop(&self, element: &gst::Element) { - self.src_pad.stop_task(); + self.task.stop(); self.start(element); } } @@ -1402,17 +1414,18 @@ impl ObjectSubclass for JitterBuffer { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("sink").unwrap(); - let sink_pad = PadSink::new_from_template(&templ, Some("sink")); - - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - Self { - sink_pad, - src_pad, + sink_pad: PadSink::new(gst::Pad::new_from_template( + &klass.get_pad_template("sink").unwrap(), + Some("sink"), + )), + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), sink_pad_handler: SinkHandler::new(), src_pad_handler: SrcHandler::new(), + task: Task::default(), state: StdMutex::new(State::default()), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index 45d7b05e..19145451 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -39,7 +39,9 @@ use std::sync::{Arc, Weak}; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak}; +use crate::runtime::{ + Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task, +}; use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState}; @@ -287,23 +289,8 @@ impl Drop for ProxyContext { } } -#[derive(Debug)] -struct ProxySinkPadHandlerInner { - proxy_ctx: ProxyContext, -} - #[derive(Clone, Debug)] -struct ProxySinkPadHandler(Arc); - -impl ProxySinkPadHandler { - fn new(proxy_ctx: ProxyContext) -> Self { - ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner { proxy_ctx })) - } - - fn proxy_ctx(&self) -> &ProxyContext { - &self.0.proxy_ctx - } -} +struct ProxySinkPadHandler; impl PadSinkHandler for ProxySinkPadHandler { type ElementImpl = ProxySink; @@ -360,12 +347,16 @@ impl PadSinkHandler for ProxySinkPadHandler { gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); - let src_pad = PROXY_SRC_PADS - .lock() - .unwrap() - .get(&self.proxy_ctx().name) - .and_then(|src_pad| src_pad.upgrade()) - .map(|src_pad| src_pad.gst_pad().clone()); + let src_pad = { + let proxy_ctx = proxysink.proxy_ctx.lock().unwrap(); + + PROXY_SRC_PADS + .lock() + .unwrap() + .get(&proxy_ctx.as_ref().unwrap().name) + .and_then(|src_pad| src_pad.upgrade()) + .map(|src_pad| src_pad.gst_pad().clone()) + }; if let EventView::FlushStart(..) = event.view() { proxysink.stop(&element).unwrap(); @@ -419,7 +410,7 @@ impl PadSinkHandler for ProxySinkPadHandler { #[derive(Debug)] struct ProxySink { sink_pad: PadSink, - sink_pad_handler: StdMutex>, + proxy_ctx: StdMutex>, settings: StdMutex, } @@ -447,13 +438,8 @@ impl ProxySink { loop { let more_queue_space_receiver = { - let sink_pad_handler = sink.sink_pad_handler.lock().unwrap(); - if sink_pad_handler.is_none() { - return; - } - - let proxy_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx(); - let mut shared_ctx = proxy_ctx.lock_shared(); + let proxy_ctx = sink.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue"); @@ -509,13 +495,8 @@ impl ProxySink { item: DataQueueItem, ) -> Result { let wait_fut = { - let sink_pad_handler = self.sink_pad_handler.lock().unwrap(); - - let proxy_ctx = sink_pad_handler - .as_ref() - .ok_or(gst::FlowError::Error)? - .proxy_ctx(); - let mut shared_ctx = proxy_ctx.lock_shared(); + let proxy_ctx = self.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); /* We've taken the lock again, make sure not to recreate * a pending queue if tearing down */ @@ -615,8 +596,8 @@ impl ProxySink { wait_fut.await; } - let sink_pad_handler = self.sink_pad_handler.lock().unwrap(); - let shared_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx().lock_shared(); + let proxy_ctx = self.proxy_ctx.lock().unwrap(); + let shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); shared_ctx.last_res } @@ -638,9 +619,9 @@ impl ProxySink { proxy_sink_pads.insert(proxy_context, self.sink_pad.downgrade()); } - let handler = ProxySinkPadHandler::new(proxy_ctx); - self.sink_pad.prepare(&handler); - *self.sink_pad_handler.lock().unwrap() = Some(handler); + *self.proxy_ctx.lock().unwrap() = Some(proxy_ctx); + + self.sink_pad.prepare(&ProxySinkPadHandler); gst_debug!(SINK_CAT, obj: element, "Prepared"); @@ -651,14 +632,17 @@ impl ProxySink { gst_debug!(SINK_CAT, obj: element, "Unpreparing"); self.sink_pad.unprepare(); - *self.sink_pad_handler.lock().unwrap() = None; + *self.proxy_ctx.lock().unwrap() = None; gst_debug!(SINK_CAT, obj: element, "Unprepared"); + Ok(()) } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let sink_pad_handler = self.sink_pad_handler.lock().unwrap(); + let proxy_ctx = self.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + gst_debug!(SINK_CAT, obj: element, "Starting"); { @@ -667,7 +651,6 @@ impl ProxySink { proxy_sink_pads.remove(&settings.proxy_context); } - let mut shared_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx().lock_shared(); shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); gst_debug!(SINK_CAT, obj: element, "Started"); @@ -676,10 +659,11 @@ impl ProxySink { } fn stop(&self, element: &gst::Element) -> Result<(), ()> { - let sink_pad_handler = self.sink_pad_handler.lock().unwrap(); + let proxy_ctx = self.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + gst_debug!(SINK_CAT, obj: element, "Stopping"); - let mut shared_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx().lock_shared(); let _ = shared_ctx.pending_queue.take(); shared_ctx.last_res = Err(gst::FlowError::Flushing); @@ -720,12 +704,12 @@ impl ObjectSubclass for ProxySink { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("sink").unwrap(); - let sink_pad = PadSink::new_from_template(&templ, Some("sink")); - Self { - sink_pad, - sink_pad_handler: StdMutex::new(None), + sink_pad: PadSink::new(gst::Pad::new_from_template( + &klass.get_pad_template("sink").unwrap(), + Some("sink"), + )), + proxy_ctx: StdMutex::new(None), settings: StdMutex::new(SettingsSink::default()), } } @@ -803,89 +787,18 @@ impl ElementImpl for ProxySink { } } -#[derive(Debug)] -struct ProxySrcPadHandlerInner { - proxy_ctx: ProxyContext, -} - #[derive(Clone, Debug)] -struct ProxySrcPadHandler(Arc); +struct ProxySrcPadHandler; impl ProxySrcPadHandler { - fn new(proxy_ctx: ProxyContext) -> Self { - ProxySrcPadHandler(Arc::new(ProxySrcPadHandlerInner { proxy_ctx })) - } - - fn proxy_ctx(&self) -> &ProxyContext { - &self.0.proxy_ctx - } - - fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { - let this = self.clone(); - let pad_weak = pad.downgrade(); - let element = element.clone(); - - pad.start_task(move || { - let this = this.clone(); - let pad_weak = pad_weak.clone(); - let element = element.clone(); - let mut dataqueue = dataqueue.clone(); - - async move { - let item = dataqueue.next().await; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - let item = match item { - Some(item) => item, - None => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused"); - return glib::Continue(false); - } - }; - - match this.push_item(&pad, item).await { - Ok(_) => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item"); - let mut shared_ctx = this.proxy_ctx().lock_shared(); - shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); - glib::Continue(true) - } - Err(gst::FlowError::Flushing) => { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); - let mut shared_ctx = this.proxy_ctx().lock_shared(); - shared_ctx.last_res = Err(gst::FlowError::Flushing); - glib::Continue(false) - } - Err(gst::FlowError::Eos) => { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS"); - let mut shared_ctx = this.proxy_ctx().lock_shared(); - shared_ctx.last_res = Err(gst::FlowError::Eos); - glib::Continue(false) - } - Err(err) => { - gst_error!(SRC_CAT, obj: pad.gst_pad(), "Got error {}", err); - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - let mut shared_ctx = this.proxy_ctx().lock_shared(); - shared_ctx.last_res = Err(err); - glib::Continue(false) - } - } - } - }); - } - async fn push_item( - &self, pad: &PadSrcRef<'_>, + proxysrc: &ProxySrc, item: DataQueueItem, ) -> Result<(), gst::FlowError> { { - let mut shared_ctx = self.proxy_ctx().lock_shared(); + let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); if let Some(ref mut pending_queue) = shared_ctx.pending_queue { pending_queue.notify_more_queue_space(); } @@ -923,12 +836,16 @@ impl PadSrcHandler for ProxySrcPadHandler { gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); - let sink_pad = PROXY_SINK_PADS - .lock() - .unwrap() - .get(&self.proxy_ctx().name) - .and_then(|sink_pad| sink_pad.upgrade()) - .map(|sink_pad| sink_pad.gst_pad().clone()); + let sink_pad = { + let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + + PROXY_SINK_PADS + .lock() + .unwrap() + .get(&proxy_ctx.as_ref().unwrap().name) + .and_then(|sink_pad| sink_pad.upgrade()) + .map(|sink_pad| sink_pad.gst_pad().clone()) + }; match event.view() { EventView::FlushStart(..) => proxysrc.stop(element).unwrap(), @@ -996,7 +913,8 @@ impl PadSrcHandler for ProxySrcPadHandler { #[derive(Debug)] struct ProxySrc { src_pad: PadSrc, - src_pad_handler: StdMutex>, + task: Task, + proxy_ctx: StdMutex>, dataqueue: StdMutex>, settings: StdMutex, } @@ -1058,19 +976,19 @@ impl ProxySrc { proxy_src_pads.insert(settings.proxy_context, self.src_pad.downgrade()); } + *self.proxy_ctx.lock().unwrap() = Some(proxy_ctx); + *self.dataqueue.lock().unwrap() = Some(dataqueue); - let handler = ProxySrcPadHandler::new(proxy_ctx); + self.src_pad.prepare(&ProxySrcPadHandler); - self.src_pad.prepare(ts_ctx, &handler).map_err(|err| { + self.task.prepare(ts_ctx).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, - ["Error preparing src_pad: {:?}", err] + ["Error preparing Task: {:?}", err] ) })?; - *self.src_pad_handler.lock().unwrap() = Some(handler); - gst_debug!(SRC_CAT, obj: element, "Prepared"); Ok(()) @@ -1085,11 +1003,11 @@ impl ProxySrc { proxy_src_pads.remove(&settings.proxy_context); } - self.src_pad.stop_task(); - let _ = self.src_pad.unprepare(); - *self.src_pad_handler.lock().unwrap() = None; + self.task.unprepare().unwrap(); + self.src_pad.unprepare(); *self.dataqueue.lock().unwrap() = None; + *self.proxy_ctx.lock().unwrap() = None; gst_debug!(SRC_CAT, obj: element, "Unprepared"); @@ -1104,9 +1022,7 @@ impl ProxySrc { let dataqueue = self.dataqueue.lock().unwrap(); gst_debug!(SRC_CAT, obj: element, "Stopping"); - // Now stop the task if it was still running, blocking - // until this has actually happened - self.src_pad.stop_task(); + self.task.stop(); let dataqueue = dataqueue.as_ref().unwrap(); dataqueue.clear(); @@ -1134,6 +1050,84 @@ impl ProxySrc { Ok(()) } + fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) { + dataqueue.start(); + + { + let proxy_ctx = self.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() { + pending_queue.notify_more_queue_space(); + } + } + + self.start_task(element, dataqueue); + } + + fn start_task(&self, element: &gst::Element, dataqueue: &DataQueue) { + let pad_weak = self.src_pad.downgrade(); + let dataqueue = dataqueue.clone(); + let element = element.clone(); + + self.task.start(move || { + let pad_weak = pad_weak.clone(); + let mut dataqueue = dataqueue.clone(); + let element = element.clone(); + + async move { + let item = dataqueue.next().await; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + let item = match item { + Some(item) => item, + None => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused"); + return glib::Continue(false); + } + }; + + let proxysrc = ProxySrc::from_instance(&element); + + match ProxySrcPadHandler::push_item(&pad, &proxysrc, item).await { + Ok(_) => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item"); + let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); + glib::Continue(true) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); + let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + shared_ctx.last_res = Err(gst::FlowError::Flushing); + glib::Continue(false) + } + Err(gst::FlowError::Eos) => { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS"); + let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + shared_ctx.last_res = Err(gst::FlowError::Eos); + glib::Continue(false) + } + Err(err) => { + gst_error!(SRC_CAT, obj: pad.gst_pad(), "Got error {}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); + shared_ctx.last_res = Err(err); + glib::Continue(false) + } + } + } + }); + } + fn flush_stop(&self, element: &gst::Element) { // Keep the lock on the `dataqueue` until `flush_stop` is complete // so as to prevent race conditions due to concurrent state transitions. @@ -1148,33 +1142,18 @@ impl ProxySrc { gst_debug!(SRC_CAT, obj: element, "Stopping Flush"); - self.src_pad.stop_task(); + self.task.stop(); self.start_unchecked(element, dataqueue); gst_debug!(SRC_CAT, obj: element, "Stopped Flush"); } - fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) { - dataqueue.start(); - - let src_pad_handler = self.src_pad_handler.lock().unwrap(); - let src_pad_handler = src_pad_handler.as_ref().unwrap(); - - let mut shared_ctx = src_pad_handler.proxy_ctx().lock_shared(); - if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() { - pending_queue.notify_more_queue_space(); - } - - src_pad_handler.start_task(self.src_pad.as_ref(), element, dataqueue.clone()); - } - fn pause(&self, element: &gst::Element) -> Result<(), ()> { let dataqueue = self.dataqueue.lock().unwrap(); gst_debug!(SRC_CAT, obj: element, "Pausing"); dataqueue.as_ref().unwrap().pause(); - - self.src_pad.pause_task(); + self.task.pause(); gst_debug!(SRC_CAT, obj: element, "Paused"); @@ -1217,12 +1196,13 @@ impl ObjectSubclass for ProxySrc { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - Self { - src_pad, - src_pad_handler: StdMutex::new(None), + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), + task: Task::default(), + proxy_ctx: StdMutex::new(None), dataqueue: StdMutex::new(None), settings: StdMutex::new(SettingsSrc::default()), } diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs index 9a555d75..7c02602f 100644 --- a/gst-plugin-threadshare/src/queue.rs +++ b/gst-plugin-threadshare/src/queue.rs @@ -33,12 +33,11 @@ use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_t use lazy_static::lazy_static; use std::collections::VecDeque; -use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState}; @@ -253,79 +252,10 @@ impl PadSinkHandler for QueuePadSinkHandler { } } -#[derive(Debug)] -struct QueuePadSrcHandlerInner { - last_res: Result, -} - -impl Default for QueuePadSrcHandlerInner { - fn default() -> QueuePadSrcHandlerInner { - QueuePadSrcHandlerInner { - last_res: Ok(gst::FlowSuccess::Ok), - } - } -} - -#[derive(Clone, Debug, Default)] -struct QueuePadSrcHandler(Arc>); +#[derive(Clone, Debug)] +struct QueuePadSrcHandler; impl QueuePadSrcHandler { - fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { - let this = self.clone(); - let pad_weak = pad.downgrade(); - let element = element.clone(); - pad.start_task(move || { - let this = this.clone(); - let pad_weak = pad_weak.clone(); - let element = element.clone(); - let mut dataqueue = dataqueue.clone(); - - async move { - let item = dataqueue.next().await; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - let item = match item { - Some(item) => item, - None => { - gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped"); - return glib::Continue(false); - } - }; - - match Self::push_item(&pad, &element, item).await { - Ok(()) => { - gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item"); - this.0.lock().unwrap().last_res = Ok(gst::FlowSuccess::Ok); - glib::Continue(true) - } - Err(gst::FlowError::Flushing) => { - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - this.0.lock().unwrap().last_res = Err(gst::FlowError::Flushing); - glib::Continue(false) - } - Err(gst::FlowError::Eos) => { - gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); - this.0.lock().unwrap().last_res = Err(gst::FlowError::Eos); - let eos = gst::Event::new_eos().build(); - pad.push_event(eos).await; - glib::Continue(false) - } - Err(err) => { - gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - this.0.lock().unwrap().last_res = Err(err); - glib::Continue(false) - } - } - } - }); - } - async fn push_item( pad: &PadSrcRef<'_>, element: &gst::Element, @@ -353,10 +283,6 @@ impl QueuePadSrcHandler { } } } - - fn unprepare(&self) { - *self.0.lock().unwrap() = QueuePadSrcHandlerInner::default(); - } } impl PadSrcHandler for QueuePadSrcHandler { @@ -426,9 +352,10 @@ impl PadSrcHandler for QueuePadSrcHandler { struct Queue { sink_pad: PadSink, src_pad: PadSrc, - src_pad_handler: QueuePadSrcHandler, + task: Task, dataqueue: StdMutex>, pending_queue: StdMutex>, + last_res: StdMutex>, settings: StdMutex, } @@ -598,7 +525,7 @@ impl Queue { wait_fut.await; } - self.src_pad_handler.0.lock().unwrap().last_res + *self.last_res.lock().unwrap() } fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { @@ -636,14 +563,14 @@ impl Queue { ) })?; - self.src_pad - .prepare(context, &self.src_pad_handler) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error joining Context: {:?}", err] - ) - })?; + self.task.prepare(context).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + + self.src_pad.prepare(&QueuePadSrcHandler); self.sink_pad.prepare(&QueuePadSinkHandler); gst_debug!(CAT, obj: element, "Prepared"); @@ -654,20 +581,43 @@ impl Queue { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); - self.src_pad.stop_task(); - self.sink_pad.unprepare(); - let _ = self.src_pad.unprepare(); + self.task.unprepare().unwrap(); + self.src_pad.unprepare(); - self.src_pad_handler.unprepare(); *self.dataqueue.lock().unwrap() = None; *self.pending_queue.lock().unwrap() = None; + *self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok); + gst_debug!(CAT, obj: element, "Unprepared"); Ok(()) } + fn stop(&self, element: &gst::Element) -> Result<(), ()> { + let dataqueue = self.dataqueue.lock().unwrap(); + gst_debug!(CAT, obj: element, "Stopping"); + + *self.last_res.lock().unwrap() = Err(gst::FlowError::Flushing); + + self.task.stop(); + + if let Some(dataqueue) = dataqueue.as_ref() { + dataqueue.pause(); + dataqueue.clear(); + dataqueue.stop(); + } + + if let Some(mut pending_queue) = self.pending_queue.lock().unwrap().take() { + pending_queue.notify_more_queue_space(); + } + + gst_debug!(CAT, obj: element, "Stopped"); + + Ok(()) + } + fn start(&self, element: &gst::Element) -> Result<(), ()> { let dataqueue = self.dataqueue.lock().unwrap(); let dataqueue = dataqueue.as_ref().unwrap(); @@ -678,13 +628,74 @@ impl Queue { gst_debug!(CAT, obj: element, "Starting"); - self.start_unchecked(element, dataqueue); + dataqueue.start(); + *self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok); + + self.start_task(element, dataqueue); gst_debug!(CAT, obj: element, "Started"); Ok(()) } + fn start_task(&self, element: &gst::Element, dataqueue: &DataQueue) { + let pad_weak = self.src_pad.downgrade(); + let dataqueue = dataqueue.clone(); + let element = element.clone(); + + self.task.start(move || { + let pad_weak = pad_weak.clone(); + let mut dataqueue = dataqueue.clone(); + let element = element.clone(); + + async move { + let item = dataqueue.next().await; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + let item = match item { + Some(item) => item, + None => { + gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped"); + return glib::Continue(false); + } + }; + + let queue = Queue::from_instance(&element); + + match QueuePadSrcHandler::push_item(&pad, &element, item).await { + Ok(()) => { + gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item"); + *queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok); + glib::Continue(true) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + *queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing); + glib::Continue(false) + } + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); + *queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos); + let eos = gst::Event::new_eos().build(); + pad.push_event(eos).await; + glib::Continue(false) + } + Err(err) => { + gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + *queue.last_res.lock().unwrap() = Err(err); + glib::Continue(false) + } + } + } + }); + } + fn flush_stop(&self, element: &gst::Element) { // Keep the lock on the `dataqueue` until `flush_stop` is complete // so as to prevent race conditions due to concurrent state transitions. @@ -699,42 +710,11 @@ impl Queue { gst_debug!(CAT, obj: element, "Stopping Flush"); - self.src_pad.stop_task(); - self.start_unchecked(element, dataqueue); + dataqueue.start(); + self.start_task(element, dataqueue); gst_debug!(CAT, obj: element, "Stopped Flush"); } - - fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) { - dataqueue.start(); - - self.src_pad_handler.0.lock().unwrap().last_res = Ok(gst::FlowSuccess::Ok); - self.src_pad_handler - .start_task(self.src_pad.as_ref(), element, dataqueue.clone()); - } - - fn stop(&self, element: &gst::Element) -> Result<(), ()> { - let dataqueue = self.dataqueue.lock().unwrap(); - gst_debug!(CAT, obj: element, "Stopping"); - - self.src_pad.stop_task(); - - if let Some(dataqueue) = dataqueue.as_ref() { - dataqueue.pause(); - dataqueue.clear(); - dataqueue.stop(); - } - - if let Some(pending_queue) = self.pending_queue.lock().unwrap().as_mut() { - pending_queue.notify_more_queue_space(); - } - - self.src_pad_handler.0.lock().unwrap().last_res = Err(gst::FlowError::Flushing); - - gst_debug!(CAT, obj: element, "Stopped"); - - Ok(()) - } } impl ObjectSubclass for Queue { @@ -777,18 +757,19 @@ impl ObjectSubclass for Queue { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("sink").unwrap(); - let sink_pad = PadSink::new_from_template(&templ, Some("sink")); - - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - Self { - sink_pad, - src_pad, - src_pad_handler: QueuePadSrcHandler::default(), + sink_pad: PadSink::new(gst::Pad::new_from_template( + &klass.get_pad_template("sink").unwrap(), + Some("sink"), + )), + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), + task: Task::default(), dataqueue: StdMutex::new(None), pending_queue: StdMutex::new(None), + last_res: StdMutex::new(Ok(gst::FlowSuccess::Ok)), settings: StdMutex::new(Settings::default()), } } diff --git a/gst-plugin-threadshare/src/runtime/mod.rs b/gst-plugin-threadshare/src/runtime/mod.rs index 6b2462db..39923a4e 100644 --- a/gst-plugin-threadshare/src/runtime/mod.rs +++ b/gst-plugin-threadshare/src/runtime/mod.rs @@ -50,6 +50,7 @@ pub mod pad; pub use pad::{PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak}; pub mod task; +pub use task::{Task, TaskState}; pub mod prelude { pub use super::pad::{PadSinkHandler, PadSrcHandler}; diff --git a/gst-plugin-threadshare/src/runtime/pad.rs b/gst-plugin-threadshare/src/runtime/pad.rs index 20216a5b..fde5ed3b 100644 --- a/gst-plugin-threadshare/src/runtime/pad.rs +++ b/gst-plugin-threadshare/src/runtime/pad.rs @@ -78,37 +78,12 @@ use gst::subclass::prelude::*; use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error}; use gst::{FlowError, FlowSuccess}; -use std::fmt; use std::marker::PhantomData; -use std::sync; use std::sync::{Arc, Weak}; use super::executor::{block_on_or_add_sub_task, Context}; -use super::task::Task; use super::RUNTIME_CAT; -/// Errors related to [`PadSrc`] `Context` handling. -/// -/// [`PadSrc`]: struct.PadSrc.html -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum PadContextError { - ActiveContext, - ActiveTask, -} - -impl fmt::Display for PadContextError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - PadContextError::ActiveContext => { - write!(f, "The PadSrc is already operating on a Context") - } - PadContextError::ActiveTask => write!(f, "A task is still active"), - } - } -} - -impl std::error::Error for PadContextError {} - #[inline] fn event_ret_to_event_full_res( ret: bool, @@ -169,10 +144,10 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { gst_error!( RUNTIME_CAT, obj: gst_pad, - "Error in PadSink activate: {:?}", + "Error in PadSrc activate: {:?}", err ); - gst_loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err) + gst_loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err) }) } @@ -229,14 +204,9 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { } } -#[derive(Default, Debug)] -struct PadSrcState; - #[derive(Debug)] struct PadSrcInner { - state: sync::Mutex, gst_pad: gst::Pad, - task: Task, } impl PadSrcInner { @@ -245,11 +215,7 @@ impl PadSrcInner { panic!("Wrong pad direction for PadSrc"); } - PadSrcInner { - state: sync::Mutex::new(PadSrcState::default()), - gst_pad, - task: Task::default(), - } + PadSrcInner { gst_pad } } } @@ -331,33 +297,6 @@ impl<'a> PadSrcRef<'a> { self.strong.push_event(event).await } - /// `Start` the `Pad` `task`. - /// - /// The `Task` will loop on the provided `func`. - /// The execution occurs on the `Task`'s context. - pub fn start_task(&self, func: F) - where - F: (FnMut() -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - { - self.strong.start_task(func); - } - - /// Pauses the `Started` `Pad` `Task`. - pub fn pause_task(&self) { - self.strong.pause_task(); - } - - /// Cancels the `Started` `Pad` `Task`. - pub fn cancel_task(&self) { - self.strong.cancel_task(); - } - - /// Stops the `Started` `Pad` `Task`. - pub fn stop_task(&self) { - self.strong.stop_task(); - } - fn activate_mode_hook( &self, mode: gst::PadMode, @@ -468,30 +407,6 @@ impl PadSrcStrong { was_handled } - - #[inline] - fn start_task(&self, func: F) - where - F: (FnMut() -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - { - self.0.task.start(func); - } - - #[inline] - fn pause_task(&self) { - self.0.task.pause(); - } - - #[inline] - fn cancel_task(&self) { - self.0.task.cancel(); - } - - #[inline] - fn stop_task(&self) { - self.0.task.stop(); - } } /// The `PadSrc` which `Element`s must own. @@ -513,10 +428,6 @@ impl PadSrc { this } - pub fn new_from_template(templ: &gst::PadTemplate, name: Option<&str>) -> Self { - Self::new(gst::Pad::new_from_template(templ, name)) - } - pub fn as_ref(&self) -> PadSrcRef<'_> { PadSrcRef::new(Arc::clone(&(self.0).0)) } @@ -647,52 +558,16 @@ impl PadSrc { }); } - pub fn prepare( - &self, - context: Context, - handler: &H, - ) -> Result<(), super::task::TaskError> { - let _state = (self.0).0.state.lock().unwrap(); + pub fn prepare(&self, handler: &H) { gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing"); - (self.0).0.task.prepare(context)?; - self.init_pad_functions(handler); - - Ok(()) - } - - pub fn prepare_with_func( - &self, - context: Context, - handler: &H, - prepare_func: F, - ) -> Result<(), super::task::TaskError> - where - F: (FnOnce() -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - { - let _state = (self.0).0.state.lock().unwrap(); - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing"); - - (self.0).0.task.prepare_with_func(context, prepare_func)?; - - self.init_pad_functions(handler); - - Ok(()) } /// Releases the resources held by this `PadSrc`. - pub fn unprepare(&self) -> Result<(), PadContextError> { - let _state = (self.0).0.state.lock().unwrap(); + pub fn unprepare(&self) { gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Unpreparing"); - (self.0) - .0 - .task - .unprepare() - .map_err(|_| PadContextError::ActiveTask)?; - self.gst_pad() .set_activate_function(move |_gst_pad, _parent| { Err(gst_loggable_error!(RUNTIME_CAT, "PadSrc unprepared")) @@ -704,8 +579,6 @@ impl PadSrc { .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing)); self.gst_pad() .set_query_function(move |_gst_pad, _parent, _query| false); - - Ok(()) } pub async fn push(&self, buffer: gst::Buffer) -> Result { @@ -719,30 +592,6 @@ impl PadSrc { pub async fn push_event(&self, event: gst::Event) -> bool { self.0.push_event(event).await } - - /// `Start` the `Pad` `task`. - /// - /// The `Task` will loop on the provided `func`. - /// The execution occurs on the `Task`'s context. - pub fn start_task(&self, func: F) - where - F: (FnMut() -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - { - self.0.start_task(func); - } - - pub fn pause_task(&self) { - self.0.pause_task(); - } - - pub fn cancel_task(&self) { - self.0.cancel_task(); - } - - pub fn stop_task(&self) { - self.0.stop_task(); - } } /// A trait to define `handler`s for [`PadSink`] callbacks. @@ -1044,8 +893,8 @@ impl PadSink { this } - pub fn new_from_template(templ: &gst::PadTemplate, name: Option<&str>) -> Self { - Self::new(gst::Pad::new_from_template(templ, name)) + pub fn as_ref(&self) -> PadSinkRef<'_> { + PadSinkRef::new(Arc::clone(&(self.0).0)) } pub fn gst_pad(&self) -> &gst::Pad { diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index dfda2438..1a35a668 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -44,9 +44,9 @@ use std::u16; use tokio::io::AsyncReadExt; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef}; +use crate::runtime::{Context, PadSrc, PadSrcRef, Task}; -use super::socket::{Socket, SocketError, SocketRead, SocketState, SocketStream}; +use super::socket::{Socket, SocketError, SocketRead, SocketState}; const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_PORT: u32 = 5000; @@ -191,140 +191,35 @@ impl Default for TcpClientSrcPadHandlerState { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct TcpClientSrcPadHandlerInner { state: FutMutex, configured_caps: StdMutex>, } -impl TcpClientSrcPadHandlerInner { - fn new(caps: Option) -> Self { - TcpClientSrcPadHandlerInner { - state: FutMutex::new(TcpClientSrcPadHandlerState { - caps, - ..Default::default() - }), - configured_caps: StdMutex::new(None), - } - } -} - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] struct TcpClientSrcPadHandler(Arc); impl TcpClientSrcPadHandler { - fn new(caps: Option) -> Self { - TcpClientSrcPadHandler(Arc::new(TcpClientSrcPadHandlerInner::new(caps))) - } - - fn reset(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); - - *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); - *self.0.configured_caps.lock().unwrap() = None; - - gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); - } - - fn flush(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - + fn prepare(&self, caps: Option) { self.0 .state .try_lock() - .expect("state is locked elsewhere") - .need_segment = true; - - gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + .expect("State locked elsewhere") + .caps = caps; } - fn start_task( - &self, - pad: PadSrcRef<'_>, - element: &gst::Element, - socket_stream: SocketStream, - ) { - let this = self.clone(); - let pad_weak = pad.downgrade(); - let element = element.clone(); - let socket_stream = Arc::new(FutMutex::new(socket_stream)); + fn reset(&self) { + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + } - pad.start_task(move || { - let this = this.clone(); - let pad_weak = pad_weak.clone(); - let element = element.clone(); - let socket_stream = socket_stream.clone(); - - async move { - let item = socket_stream.lock().await.next().await; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - let buffer = match item { - Some(Ok((buffer, _))) => buffer, - Some(Err(err)) => { - gst_error!(CAT, obj: &element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - SocketError::Io(err) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - } - } - return glib::Continue(false); - } - None => { - gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped"); - return glib::Continue(false); - } - }; - - let res = this.push_buffer(&pad, &element, buffer).await; - - match res { - Ok(_) => { - gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer"); - glib::Continue(true) - } - Err(gst::FlowError::Flushing) => { - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - glib::Continue(false) - } - Err(gst::FlowError::Eos) => { - gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); - let eos = gst::Event::new_eos().build(); - pad.push_event(eos).await; - glib::Continue(false) - } - Err(err) => { - gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - glib::Continue(false) - } - } - } - }); + fn set_need_segment(&self) { + self.0 + .state + .try_lock() + .expect("State locked elsewhere") + .need_segment = true; } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { @@ -465,7 +360,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler { struct TcpClientSrc { src_pad: PadSrc, - src_pad_handler: StdMutex>, + src_pad_handler: TcpClientSrcPadHandler, + task: Task, socket: StdMutex>>, settings: StdMutex, } @@ -480,7 +376,6 @@ lazy_static! { impl TcpClientSrc { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut socket_storage = self.socket.lock().unwrap(); let settings = self.settings.lock().unwrap().clone(); gst_debug!(CAT, obj: element, "Preparing"); @@ -538,21 +433,16 @@ impl TcpClientSrc { ) })?; - *socket_storage = Some(socket); - drop(socket_storage); + *self.socket.lock().unwrap() = Some(socket); - let src_pad_handler = TcpClientSrcPadHandler::new(settings.caps); - - self.src_pad - .prepare(context, &src_pad_handler) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing src_pads: {:?}", err] - ) - })?; - - *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + self.task.prepare(context).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + self.src_pad_handler.prepare(settings.caps); + self.src_pad.prepare(&self.src_pad_handler); gst_debug!(CAT, obj: element, "Prepared"); @@ -562,12 +452,10 @@ impl TcpClientSrc { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); - if let Some(socket) = self.socket.lock().unwrap().take() { - drop(socket); - } + *self.socket.lock().unwrap() = None; - let _ = self.src_pad.unprepare(); - *self.src_pad_handler.lock().unwrap() = None; + self.task.unprepare().unwrap(); + self.src_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); @@ -577,16 +465,8 @@ impl TcpClientSrc { fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Stopping"); - // Now stop the task if it was still running, blocking - // until this has actually happened - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .reset(&self.src_pad.as_ref()); + self.task.stop(); + self.src_pad_handler.reset(); gst_debug!(CAT, obj: element, "Stopped"); @@ -594,64 +474,117 @@ impl TcpClientSrc { } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let socket = self.socket.lock().unwrap(); - if let Some(socket) = socket.as_ref() { - if socket.state() == SocketState::Started { - gst_debug!(CAT, obj: element, "Already started"); - return Err(()); - } - - gst_debug!(CAT, obj: element, "Starting"); - - self.start_unchecked(element, socket); - - gst_debug!(CAT, obj: element, "Started"); - - Ok(()) - } else { - Err(()) - } - } - - fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `socket` until `flush_stop` is complete - // so as to prevent race conditions due to concurrent state transitions. - // Note that this won't deadlock as it doesn't lock the `SocketStream` - // in use within the `src_pad`'s `Task`. let socket = self.socket.lock().unwrap(); let socket = socket.as_ref().unwrap(); if socket.state() == SocketState::Started { gst_debug!(CAT, obj: element, "Already started"); - return; + return Ok(()); } - gst_debug!(CAT, obj: element, "Stopping Flush"); + gst_debug!(CAT, obj: element, "Starting"); + self.start_task(element, socket); + gst_debug!(CAT, obj: element, "Started"); - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .flush(&self.src_pad.as_ref()); - - self.start_unchecked(element, socket); - - gst_debug!(CAT, obj: element, "Stopped Flush"); + Ok(()) } - fn start_unchecked(&self, element: &gst::Element, socket: &Socket) { + fn start_task(&self, element: &gst::Element, socket: &Socket) { let socket_stream = socket .start(element.get_clock(), Some(element.get_base_time())) .unwrap(); + let socket_stream = Arc::new(FutMutex::new(socket_stream)); - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .start_task(self.src_pad.as_ref(), element, socket_stream); + let src_pad_handler = self.src_pad_handler.clone(); + let pad_weak = self.src_pad.downgrade(); + let element = element.clone(); + + self.task.start(move || { + let src_pad_handler = src_pad_handler.clone(); + let pad_weak = pad_weak.clone(); + let element = element.clone(); + let socket_stream = socket_stream.clone(); + + async move { + let item = socket_stream.lock().await.next().await; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + let buffer = match item { + Some(Ok((buffer, _))) => buffer, + Some(Err(err)) => { + gst_error!(CAT, obj: &element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst_element_error!( + element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + } + } + return glib::Continue(false); + } + None => { + gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped"); + return glib::Continue(false); + } + }; + + match src_pad_handler.push_buffer(&pad, &element, buffer).await { + Ok(_) => { + gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer"); + glib::Continue(true) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + glib::Continue(false) + } + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); + let eos = gst::Event::new_eos().build(); + pad.push_event(eos).await; + glib::Continue(false) + } + Err(err) => { + gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + glib::Continue(false) + } + } + } + }); + } + + fn flush_stop(&self, element: &gst::Element) { + let socket = self.socket.lock().unwrap(); + if let Some(socket) = socket.as_ref() { + if socket.state() == SocketState::Started { + gst_debug!(CAT, obj: element, "Already started"); + return; + } + + gst_debug!(CAT, obj: element, "Stopping Flush"); + + self.src_pad_handler.set_need_segment(); + self.start_task(element, socket); + + gst_debug!(CAT, obj: element, "Stopped Flush"); + } else { + gst_debug!(CAT, obj: element, "Socket not available"); + } } fn flush_start(&self, element: &gst::Element) { @@ -662,20 +595,16 @@ impl TcpClientSrc { socket.pause(); } - self.src_pad.cancel_task(); + self.task.cancel(); gst_debug!(CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let socket = self.socket.lock().unwrap(); gst_debug!(CAT, obj: element, "Pausing"); - if let Some(socket) = socket.as_ref() { - socket.pause(); - } - - self.src_pad.pause_task(); + self.socket.lock().unwrap().as_ref().unwrap().pause(); + self.task.pause(); gst_debug!(CAT, obj: element, "Paused"); @@ -714,11 +643,11 @@ impl ObjectSubclass for TcpClientSrc { fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); Self { - src_pad, - src_pad_handler: StdMutex::new(None), + src_pad: PadSrc::new(gst::Pad::new_from_template(&templ, Some("src"))), + src_pad_handler: TcpClientSrcPadHandler::default(), + task: Task::default(), socket: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/src/udpsink.rs b/gst-plugin-threadshare/src/udpsink.rs index bb96ce68..1d622b64 100644 --- a/gst-plugin-threadshare/src/udpsink.rs +++ b/gst-plugin-threadshare/src/udpsink.rs @@ -38,16 +38,15 @@ use gst::{ use lazy_static::lazy_static; use crate::runtime::prelude::*; -use crate::runtime::task::Task; -use crate::runtime::{self, Context, PadSink, PadSinkRef}; +use crate::runtime::{self, Context, PadSink, PadSinkRef, Task}; use crate::socket::{wrap_socket, GioSocketWrapper}; +use std::convert::TryInto; use std::mem; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::Arc; +use std::string::ToString; use std::sync::Mutex as StdMutex; -use std::sync::MutexGuard as StdMutexGuard; -use std::sync::{RwLock, RwLockWriteGuard}; +use std::sync::{Arc, RwLock}; use std::time::Duration; use std::u16; use std::u8; @@ -119,19 +118,6 @@ impl Default for Settings { } } -#[derive(Debug)] -enum TaskItem { - Buffer(gst::Buffer), - Event(gst::Event), -} - -#[derive(Debug)] -struct UdpSink { - sink_pad: PadSink, - sink_pad_handler: UdpSinkPadHandler, - settings: Arc>, -} - lazy_static! { static ref CAT: gst::DebugCategory = gst::DebugCategory::new( "ts-udpsink", @@ -329,11 +315,16 @@ static PROPERTIES: [subclass::Property; 19] = [ ]; #[derive(Debug)] -struct UdpSinkPadHandlerState { +enum TaskItem { + Buffer(gst::Buffer), + Event(gst::Event), +} + +#[derive(Debug)] +struct UdpSinkPadHandlerInner { sync: bool, segment: Option, latency: gst::ClockTime, - task: Option, socket: Arc>>, socket_v6: Arc>>, clients: Arc>, @@ -343,16 +334,12 @@ struct UdpSinkPadHandlerState { settings: Arc>, } -#[derive(Clone, Debug)] -struct UdpSinkPadHandler(Arc>); - -impl UdpSinkPadHandler { - fn new(settings: Arc>) -> UdpSinkPadHandler { - Self(Arc::new(RwLock::new(UdpSinkPadHandlerState { +impl UdpSinkPadHandlerInner { + fn new(settings: Arc>) -> Self { + UdpSinkPadHandlerInner { sync: DEFAULT_SYNC, segment: None, latency: gst::CLOCK_TIME_NONE, - task: None, socket: Arc::new(Mutex::new(None)), socket_v6: Arc::new(Mutex::new(None)), clients: Arc::new(vec![SocketAddr::new( @@ -363,7 +350,134 @@ impl UdpSinkPadHandler { clients_to_unconfigure: vec![], sender: Arc::new(Mutex::new(None)), settings, - }))) + } + } + + fn clear_clients( + &mut self, + gst_pad: &gst::Pad, + clients_to_add: impl Iterator, + ) { + Arc::make_mut(&mut self.clients).clear(); + + self.clients_to_configure = vec![]; + self.clients_to_unconfigure = vec![]; + + for addr in clients_to_add { + self.add_client(gst_pad, addr); + } + } + + fn remove_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { + if !self.clients.contains(&addr) { + gst_warning!(CAT, obj: gst_pad, "Not removing unknown client {:?}", &addr); + return; + } + + gst_info!(CAT, obj: gst_pad, "Removing client {:?}", addr); + + Arc::make_mut(&mut self.clients).retain(|addr2| addr != *addr2); + + self.clients_to_unconfigure.push(addr); + self.clients_to_configure.retain(|addr2| addr != *addr2); + } + + fn replace_client( + &mut self, + gst_pad: &gst::Pad, + addr: Option, + new_addr: Option, + ) { + if let Some(addr) = addr { + self.remove_client(gst_pad, addr); + } + + if let Some(new_addr) = new_addr { + self.add_client(gst_pad, new_addr); + } + } + + fn add_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { + if self.clients.contains(&addr) { + gst_warning!(CAT, obj: gst_pad, "Not adding client {:?} again", &addr); + return; + } + + gst_info!(CAT, obj: gst_pad, "Adding client {:?}", addr); + + Arc::make_mut(&mut self.clients).push(addr); + + self.clients_to_configure.push(addr); + self.clients_to_unconfigure.retain(|addr2| addr != *addr2); + } +} + +#[derive(Debug)] +enum SocketQualified { + Ipv4(tokio::net::UdpSocket), + Ipv6(tokio::net::UdpSocket), +} + +#[derive(Clone, Debug)] +struct UdpSinkPadHandler(Arc>); + +impl UdpSinkPadHandler { + fn new(settings: Arc>) -> UdpSinkPadHandler { + Self(Arc::new(RwLock::new(UdpSinkPadHandlerInner::new(settings)))) + } + + fn set_latency(&self, latency: gst::ClockTime) { + self.0.write().unwrap().latency = latency; + } + + fn prepare(&self) { + let mut inner = self.0.write().unwrap(); + inner.clients_to_configure = inner.clients.to_vec(); + } + + fn prepare_socket(&self, socket: SocketQualified) { + let mut inner = self.0.write().unwrap(); + + match socket { + SocketQualified::Ipv4(socket) => inner.socket = Arc::new(Mutex::new(Some(socket))), + SocketQualified::Ipv6(socket) => inner.socket_v6 = Arc::new(Mutex::new(Some(socket))), + } + } + + fn unprepare(&self) { + let mut inner = self.0.write().unwrap(); + *inner = UdpSinkPadHandlerInner::new(Arc::clone(&inner.settings)) + } + + fn clear_clients(&self, gst_pad: &gst::Pad, clients_to_add: impl Iterator) { + self.0 + .write() + .unwrap() + .clear_clients(gst_pad, clients_to_add); + } + + fn remove_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { + self.0.write().unwrap().remove_client(gst_pad, addr); + } + + fn replace_client( + &self, + gst_pad: &gst::Pad, + addr: Option, + new_addr: Option, + ) { + self.0 + .write() + .unwrap() + .replace_client(gst_pad, addr, new_addr); + } + + fn add_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { + self.0.write().unwrap().add_client(gst_pad, addr); + } + + fn get_clients(&self) -> Vec { + (*self.0.read().unwrap().clients).clone() } fn configure_client( @@ -511,32 +625,32 @@ impl UdpSinkPadHandler { socket_v6, settings, ) = { - let mut state = self.0.write().unwrap(); - let do_sync = state.sync; + let mut inner = self.0.write().unwrap(); + let do_sync = inner.sync; let mut rtime: gst::ClockTime = 0.into(); - if let Some(segment) = &state.segment { + if let Some(segment) = &inner.segment { if let Some(segment) = segment.downcast_ref::() { rtime = segment.to_running_time(buffer.get_pts()); - if state.latency.is_some() { - rtime += state.latency; + if inner.latency.is_some() { + rtime += inner.latency; } } } - let clients_to_configure = mem::replace(&mut state.clients_to_configure, vec![]); - let clients_to_unconfigure = mem::replace(&mut state.clients_to_unconfigure, vec![]); + let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]); + let clients_to_unconfigure = mem::replace(&mut inner.clients_to_unconfigure, vec![]); - let settings = state.settings.lock().unwrap().clone(); + let settings = inner.settings.lock().unwrap().clone(); ( do_sync, rtime, - Arc::clone(&state.clients), + Arc::clone(&inner.clients), clients_to_configure, clients_to_unconfigure, - Arc::clone(&state.socket), - Arc::clone(&state.socket_v6), + Arc::clone(&inner.socket), + Arc::clone(&inner.socket_v6), settings, ) }; @@ -642,64 +756,11 @@ impl UdpSinkPadHandler { let _ = element.post_message(&gst::Message::new_eos().src(Some(element)).build()); } EventView::Segment(e) => { - let mut state = self.0.write().unwrap(); - state.segment = Some(e.get_segment().clone()); + self.0.write().unwrap().segment = Some(e.get_segment().clone()); } _ => (), } } - - fn unprepare(&self) { - if let Some(task) = &self.0.read().unwrap().task { - task.unprepare().unwrap(); - } - } - - fn stop_task(&self) { - if let Some(task) = &self.0.read().unwrap().task { - task.stop(); - } - } - - fn start_task(&self, element: &gst::Element) { - let (sender, receiver) = mpsc::channel(0); - self.0.write().unwrap().sender = Arc::new(Mutex::new(Some(sender))); - - if let Some(task) = &self.0.read().unwrap().task { - let receiver = Arc::new(Mutex::new(receiver)); - let this = self.clone(); - let element_clone = element.clone(); - - task.start(move || { - let receiver = Arc::clone(&receiver); - let element = element_clone.clone(); - let this = this.clone(); - async move { - match receiver.lock().await.next().await { - Some(TaskItem::Buffer(buffer)) => { - match this.render(&element, buffer).await { - Err(err) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ["Failed to render item, stopping task: {}", err] - ); - - glib::Continue(false) - } - _ => glib::Continue(true), - } - } - Some(TaskItem::Event(event)) => { - this.handle_event(&element, event).await; - glib::Continue(true) - } - None => glib::Continue(false), - } - } - }); - } - } } impl PadSinkHandler for UdpSinkPadHandler { @@ -752,15 +813,16 @@ impl PadSinkHandler for UdpSinkPadHandler { event: gst::Event, ) -> BoxFuture<'static, bool> { let sender = Arc::clone(&self.0.read().unwrap().sender); - let this = self.clone(); let element = element.clone(); async move { if let EventView::FlushStop(_) = event.view() { - this.start_task(&element); + let udpsink = UdpSink::from_instance(&element); + let _ = udpsink.start(&element); } else if let Some(sender) = sender.lock().await.as_mut() { sender.send(TaskItem::Event(event)).await.unwrap(); } + true } .boxed() @@ -769,35 +831,49 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_event( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, - _element: &gst::Element, + udpsink: &UdpSink, + element: &gst::Element, event: gst::Event, ) -> bool { - match event.view() { - EventView::FlushStart(..) => { - self.stop_task(); - } - _ => (), + if let EventView::FlushStart(..) = event.view() { + let _ = udpsink.stop(&element); } true } } +#[derive(Debug)] +enum SocketFamily { + Ipv4, + Ipv6, +} + +#[derive(Debug)] +struct UdpSink { + sink_pad: PadSink, + sink_pad_handler: UdpSinkPadHandler, + task: Task, + settings: Arc>, +} + impl UdpSink { - fn prepare_socket_family( + fn prepare_socket( &self, + family: SocketFamily, context: &Context, element: &gst::Element, - ipv6: bool, ) -> Result<(), gst::ErrorMessage> { let mut settings = self.settings.lock().unwrap(); - let socket = if let Some(ref wrapped_socket) = if ipv6 { - &settings.socket_v6 - } else { - &settings.socket - } { + let wrapped_socket = match family { + SocketFamily::Ipv4 => &settings.socket, + SocketFamily::Ipv6 => &settings.socket_v6, + }; + + let socket_qualified: SocketQualified; + + if let Some(ref wrapped_socket) = wrapped_socket { use std::net::UdpSocket; let socket: UdpSocket; @@ -820,18 +896,20 @@ impl UdpSink { }) })?; - if ipv6 { - settings.used_socket_v6 = Some(wrapped_socket.clone()); - } else { - settings.used_socket = Some(wrapped_socket.clone()); + match family { + SocketFamily::Ipv4 => { + settings.used_socket = Some(wrapped_socket.clone()); + socket_qualified = SocketQualified::Ipv4(socket); + } + SocketFamily::Ipv6 => { + settings.used_socket_v6 = Some(wrapped_socket.clone()); + socket_qualified = SocketQualified::Ipv6(socket); + } } - - socket } else { - let bind_addr = if ipv6 { - &settings.bind_address_v6 - } else { - &settings.bind_address + let bind_addr = match family { + SocketFamily::Ipv4 => &settings.bind_address, + SocketFamily::Ipv6 => &settings.bind_address_v6, }; let bind_addr: IpAddr = bind_addr.parse().map_err(|err| { @@ -841,19 +919,17 @@ impl UdpSink { ) })?; - let bind_port = if ipv6 { - settings.bind_port_v6 - } else { - settings.bind_port + let bind_port = match family { + SocketFamily::Ipv4 => settings.bind_port, + SocketFamily::Ipv6 => settings.bind_port_v6, }; let saddr = SocketAddr::new(bind_addr, bind_port as u16); gst_debug!(CAT, obj: element, "Binding to {:?}", saddr); - let builder = if ipv6 { - net2::UdpBuilder::new_v6() - } else { - net2::UdpBuilder::new_v4() + let builder = match family { + SocketFamily::Ipv4 => net2::UdpBuilder::new_v4(), + SocketFamily::Ipv6 => net2::UdpBuilder::new_v6(), }; let builder = match builder { @@ -863,7 +939,10 @@ impl UdpSink { CAT, obj: element, "Failed to create {} socket builder: {}", - if ipv6 { "IPv6" } else { "IPv4" }, + match family { + SocketFamily::Ipv4 => "IPv4", + SocketFamily::Ipv6 => "IPv6", + }, err ); return Ok(()); @@ -897,33 +976,19 @@ impl UdpSink { })?; } - if ipv6 { - settings.used_socket_v6 = Some(wrapper); - } else { - settings.used_socket = Some(wrapper); + match family { + SocketFamily::Ipv4 => { + settings.used_socket = Some(wrapper); + socket_qualified = SocketQualified::Ipv4(socket) + } + SocketFamily::Ipv6 => { + settings.used_socket_v6 = Some(wrapper); + socket_qualified = SocketQualified::Ipv6(socket) + } } - - socket - }; - - let mut state = self.sink_pad_handler.0.write().unwrap(); - - if ipv6 { - state.socket_v6 = Arc::new(Mutex::new(Some(socket))); - } else { - state.socket = Arc::new(Mutex::new(Some(socket))); } - Ok(()) - } - - fn prepare_sockets( - &self, - context: &Context, - element: &gst::Element, - ) -> Result<(), gst::ErrorMessage> { - self.prepare_socket_family(context, element, false)?; - self.prepare_socket_family(context, element, true)?; + self.sink_pad_handler.prepare_socket(socket_qualified); Ok(()) } @@ -942,20 +1007,18 @@ impl UdpSink { })? }; - self.sink_pad.prepare(&self.sink_pad_handler); - self.prepare_sockets(&context, element).unwrap(); + self.sink_pad_handler.prepare(); + self.prepare_socket(SocketFamily::Ipv4, &context, element)?; + self.prepare_socket(SocketFamily::Ipv6, &context, element)?; - let task = Task::default(); - task.prepare(context).map_err(|err| { + self.task.prepare(context).map_err(|err| { gst_error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to start task: {}", err] + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] ) })?; - let mut state = self.sink_pad_handler.0.write().unwrap(); - state.task = Some(task); - state.clients_to_configure = state.clients.to_vec(); + self.sink_pad.prepare(&self.sink_pad_handler); gst_debug!(CAT, obj: element, "Started preparing"); @@ -965,105 +1028,108 @@ impl UdpSink { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); + self.task.unprepare().unwrap(); self.sink_pad_handler.unprepare(); self.sink_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); - Ok(()) - } - - fn start(&self, element: &gst::Element) -> Result<(), ()> { - gst_debug!(CAT, obj: element, "Starting"); - - self.sink_pad_handler.start_task(&element); Ok(()) } fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Stopping"); - - self.sink_pad_handler.stop_task(); - + self.task.stop(); gst_debug!(CAT, obj: element, "Stopped"); Ok(()) } - fn clear_clients( - &self, - element: &gst::Element, - state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>, - settings: &StdMutexGuard<'_, Settings>, - ) { - let clients = Arc::make_mut(&mut state.clients); - clients.clear(); + fn start(&self, element: &gst::Element) -> Result<(), ()> { + gst_debug!(CAT, obj: element, "Starting"); - state.clients_to_configure = vec![]; - state.clients_to_unconfigure = vec![]; + let sink_pad_handler = self.sink_pad_handler.clone(); + let element_clone = element.clone(); - if let Some(host) = &settings.host { - self.add_client(&element, state, &host, settings.port as u16); - } - } + let (sender, receiver) = mpsc::channel(0); + let receiver = Arc::new(Mutex::new(receiver)); - fn remove_client( - &self, - element: &gst::Element, - state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>, - host: &str, - port: u16, - ) { - let addr: IpAddr = match host.parse() { - Err(err) => { - gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); - return; + sink_pad_handler.0.write().unwrap().sender = Arc::new(Mutex::new(Some(sender))); + + self.task.start(move || { + let receiver = Arc::clone(&receiver); + let element = element_clone.clone(); + let sink_pad_handler = sink_pad_handler.clone(); + + async move { + match receiver.lock().await.next().await { + Some(TaskItem::Buffer(buffer)) => { + match sink_pad_handler.render(&element, buffer).await { + Err(err) => { + gst_element_error!( + element, + gst::StreamError::Failed, + ["Failed to render item, stopping task: {}", err] + ); + + glib::Continue(false) + } + _ => glib::Continue(true), + } + } + Some(TaskItem::Event(event)) => { + sink_pad_handler.handle_event(&element, event).await; + glib::Continue(true) + } + None => glib::Continue(false), + } } - Ok(addr) => addr, - }; - let addr = SocketAddr::new(addr, port); + }); - if !state.clients.contains(&addr) { - gst_warning!(CAT, obj: element, "Not removing unknown client {:?}", &addr); - return; - } + Ok(()) + } +} - gst_info!(CAT, obj: element, "Removing client {:?}", addr); - - let clients = Arc::make_mut(&mut state.clients); - clients.retain(|addr2| addr != *addr2); - state.clients_to_unconfigure.push(addr); - state.clients_to_configure.retain(|addr2| addr != *addr2); +impl UdpSink { + fn clear_clients(&self, clients_to_add: impl Iterator) { + self.sink_pad_handler + .clear_clients(&self.sink_pad.gst_pad(), clients_to_add); } - fn add_client( - &self, - element: &gst::Element, - state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>, - host: &str, - port: u16, - ) { - let addr: IpAddr = match host.parse() { - Err(err) => { - gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); - return; - } - Ok(addr) => addr, - }; - let addr = SocketAddr::new(addr, port); - - if state.clients.contains(&addr) { - gst_warning!(CAT, obj: element, "Not adding client {:?} again", &addr); - return; - } - - gst_info!(CAT, obj: element, "Adding client {:?}", addr); - - let clients = Arc::make_mut(&mut state.clients); - clients.push(addr); - state.clients_to_configure.push(addr); - state.clients_to_unconfigure.retain(|addr2| addr != *addr2); + fn remove_client(&self, addr: SocketAddr) { + self.sink_pad_handler + .remove_client(&self.sink_pad.gst_pad(), addr); } + + fn replace_client(&self, addr: Option, new_addr: Option) { + self.sink_pad_handler + .replace_client(&self.sink_pad.gst_pad(), addr, new_addr); + } + + fn add_client(&self, addr: SocketAddr) { + self.sink_pad_handler + .add_client(&self.sink_pad.gst_pad(), addr); + } +} + +fn try_into_socket_addr(element: &gst::Element, host: &str, port: u32) -> Result { + let addr: IpAddr = match host.parse() { + Err(err) => { + gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); + return Err(()); + } + Ok(addr) => addr, + }; + + let port: u16 = match port.try_into() { + Err(err) => { + gst_error!(CAT, obj: element, "Invalid port {}: {}", port, err); + return Err(()); + } + Ok(port) => port, + }; + + Ok(SocketAddr::new(addr, port)) } impl ObjectSubclass for UdpSink { @@ -1109,12 +1175,12 @@ impl ObjectSubclass for UdpSink { let port = args[2] .get::() .expect("signal arg") - .expect("missing signal arg"); + .expect("missing signal arg") as u32; - let udpsink = Self::from_instance(&element); - let mut state = udpsink.sink_pad_handler.0.write().unwrap(); - - udpsink.add_client(&element, &mut state, &host, port as u16); + if let Ok(addr) = try_into_socket_addr(&element, &host, port) { + let udpsink = Self::from_instance(&element); + udpsink.add_client(addr); + } None }, @@ -1137,15 +1203,15 @@ impl ObjectSubclass for UdpSink { let port = args[2] .get::() .expect("signal arg") - .expect("missing signal arg"); + .expect("missing signal arg") as u32; let udpsink = Self::from_instance(&element); - - let mut state = udpsink.sink_pad_handler.0.write().unwrap(); let settings = udpsink.settings.lock().unwrap(); - if Some(&host) != settings.host.as_ref() || port != settings.port as i32 { - udpsink.remove_client(&element, &mut state, &host, port as u16); + if Some(&host) != settings.host.as_ref() || port != settings.port { + if let Ok(addr) = try_into_socket_addr(&element, &host, port) { + udpsink.remove_client(addr); + } } None @@ -1164,10 +1230,13 @@ impl ObjectSubclass for UdpSink { .expect("missing signal arg"); let udpsink = Self::from_instance(&element); - let mut state = udpsink.sink_pad_handler.0.write().unwrap(); let settings = udpsink.settings.lock().unwrap(); + let current_client = settings + .host + .iter() + .filter_map(|host| try_into_socket_addr(&element, host, settings.port).ok()); - udpsink.clear_clients(&element, &mut state, &settings); + udpsink.clear_clients(current_client); None }, @@ -1177,14 +1246,15 @@ impl ObjectSubclass for UdpSink { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("sink").unwrap(); - let sink_pad = PadSink::new_from_template(&templ, Some("sink")); let settings = Arc::new(StdMutex::new(Settings::default())); - let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings)); Self { - sink_pad, - sink_pad_handler, + sink_pad: PadSink::new(gst::Pad::new_from_template( + &klass.get_pad_template("sink").unwrap(), + Some("sink"), + )), + sink_pad_handler: UdpSinkPadHandler::new(Arc::clone(&settings)), + task: Task::default(), settings, } } @@ -1200,33 +1270,39 @@ impl ObjectImpl for UdpSink { let mut settings = self.settings.lock().unwrap(); match *prop { subclass::Property("host", ..) => { - let mut state = self.sink_pad_handler.0.write().unwrap(); - if let Some(host) = &settings.host { - self.remove_client(&element, &mut state, &host, settings.port as u16); - } + let current_client = settings + .host + .as_ref() + .and_then(|host| try_into_socket_addr(&element, host, settings.port).ok()); - settings.host = value.get().expect("type checked upstream"); + let new_host = value.get().expect("type checked upstream"); - if let Some(host) = &settings.host { - self.add_client(&element, &mut state, &host, settings.port as u16); - } + let new_client = new_host + .and_then(|host| try_into_socket_addr(&element, host, settings.port).ok()); + + self.replace_client(current_client, new_client); + + settings.host = new_host.map(ToString::to_string); } subclass::Property("port", ..) => { - let mut state = self.sink_pad_handler.0.write().unwrap(); - if let Some(host) = &settings.host { - self.remove_client(&element, &mut state, &host, settings.port as u16); - } + let current_client = settings + .host + .as_ref() + .and_then(|host| try_into_socket_addr(&element, host, settings.port).ok()); - settings.port = value.get_some().expect("type checked upstream"); + let new_port = value.get_some().expect("type checked upstream"); - if let Some(host) = &settings.host { - self.add_client(&element, &mut state, &host, settings.port as u16); - } + let new_client = settings + .host + .as_ref() + .and_then(|host| try_into_socket_addr(&element, host, new_port).ok()); + + self.replace_client(current_client, new_client); + + settings.port = new_port; } subclass::Property("sync", ..) => { settings.sync = value.get_some().expect("type checked upstream"); - let mut state = self.sink_pad_handler.0.write().unwrap(); - state.sync = settings.sync; } subclass::Property("bind-address", ..) => { settings.bind_address = value @@ -1284,21 +1360,35 @@ impl ObjectImpl for UdpSink { .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); - let mut state = self.sink_pad_handler.0.write().unwrap(); - let clients = clients.split(','); - self.clear_clients(element, &mut state, &settings); - drop(settings); - for client in clients { - let split: Vec<&str> = client.rsplitn(2, ':').collect(); + let current_client = settings + .host + .iter() + .filter_map(|host| try_into_socket_addr(&element, host, settings.port).ok()); - if split.len() == 2 { - match split[0].parse::() { - Ok(port) => self.add_client(element, &mut state, split[1], port), - Err(_) => (), - } + let clients_iter = current_client.chain(clients.split(',').filter_map(|client| { + let rsplit: Vec<&str> = client.rsplitn(2, ':').collect(); + + if rsplit.len() == 2 { + rsplit[0] + .parse::() + .map_err(|err| { + gst_error!( + CAT, + obj: element, + "Invalid port {}: {}", + rsplit[0], + err + ); + }) + .and_then(|port| try_into_socket_addr(&element, rsplit[1], port)) + .ok() + } else { + None } - } + })); + + self.clear_clients(clients_iter); } subclass::Property("context", ..) => { settings.context = value @@ -1351,10 +1441,13 @@ impl ObjectImpl for UdpSink { subclass::Property("ttl-mc", ..) => Ok(settings.ttl_mc.to_value()), subclass::Property("qos-dscp", ..) => Ok(settings.qos_dscp.to_value()), subclass::Property("clients", ..) => { - let state = self.sink_pad_handler.0.read().unwrap(); + let clients: Vec = self + .sink_pad_handler + .get_clients() + .iter() + .map(ToString::to_string) + .collect(); - let clients: Vec = - state.clients.iter().map(|addr| addr.to_string()).collect(); Ok(clients.join(",").to_value()) } subclass::Property("context", ..) => Ok(settings.context.to_value()), @@ -1406,9 +1499,7 @@ impl ElementImpl for UdpSink { fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool { match event.view() { EventView::Latency(ev) => { - let mut state = self.sink_pad_handler.0.write().unwrap(); - state.latency = ev.get_latency(); - + self.sink_pad_handler.set_latency(ev.get_latency()); self.sink_pad.gst_pad().push_event(event) } EventView::Step(..) => false, diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 6fa58357..6542b0ba 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -44,11 +44,9 @@ use std::sync::Mutex as StdMutex; use std::u16; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef}; +use crate::runtime::{Context, PadSrc, PadSrcRef, Task}; -use super::socket::{ - wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState, SocketStream, -}; +use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState}; const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_PORT: u32 = 5000; @@ -245,160 +243,34 @@ impl Default for UdpSrcPadHandlerState { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct UdpSrcPadHandlerInner { state: FutMutex, configured_caps: StdMutex>, } -impl UdpSrcPadHandlerInner { - fn new(caps: Option, retrieve_sender_address: bool) -> Self { - UdpSrcPadHandlerInner { - state: FutMutex::new(UdpSrcPadHandlerState { - retrieve_sender_address, - caps, - ..Default::default() - }), - configured_caps: StdMutex::new(None), - } - } -} - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] struct UdpSrcPadHandler(Arc); impl UdpSrcPadHandler { - fn new(caps: Option, retrieve_sender_address: bool) -> UdpSrcPadHandler { - UdpSrcPadHandler(Arc::new(UdpSrcPadHandlerInner::new( - caps, - retrieve_sender_address, - ))) + fn prepare(&self, caps: Option, retrieve_sender_address: bool) { + let mut state = self.0.state.try_lock().expect("State locked elsewhere"); + + state.caps = caps; + state.retrieve_sender_address = retrieve_sender_address; } - fn reset(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); - + fn reset(&self) { *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); *self.0.configured_caps.lock().unwrap() = None; - - gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); } - fn flush(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - + fn set_need_segment(&self) { self.0 .state .try_lock() - .expect("state is locked elsewhere") + .expect("State locked elsewhere") .need_segment = true; - - gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); - } - - fn start_task( - &self, - pad: PadSrcRef<'_>, - element: &gst::Element, - socket_stream: SocketStream, - ) { - let this = self.clone(); - let pad_weak = pad.downgrade(); - let element = element.clone(); - let socket_stream = Arc::new(FutMutex::new(socket_stream)); - - pad.start_task(move || { - let this = this.clone(); - let pad_weak = pad_weak.clone(); - let element = element.clone(); - let socket_stream = socket_stream.clone(); - - async move { - let item = socket_stream.lock().await.next().await; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - let (mut buffer, saddr) = match item { - Some(Ok((buffer, saddr))) => (buffer, saddr), - Some(Err(err)) => { - gst_error!(CAT, obj: &element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - SocketError::Io(err) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - } - } - return glib::Continue(false); - } - None => { - gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped"); - return glib::Continue(false); - } - }; - - if let Some(saddr) = saddr { - if this.0.state.lock().await.retrieve_sender_address { - let inet_addr = match saddr.ip() { - IpAddr::V4(ip) => gio::InetAddress::new_from_bytes( - gio::InetAddressBytes::V4(&ip.octets()), - ), - IpAddr::V6(ip) => gio::InetAddress::new_from_bytes( - gio::InetAddressBytes::V6(&ip.octets()), - ), - }; - let inet_socket_addr = - &gio::InetSocketAddress::new(&inet_addr, saddr.port()); - NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr); - } - } - - let res = this.push_buffer(&pad, &element, buffer).await; - - match res { - Ok(_) => { - gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer"); - glib::Continue(true) - } - Err(gst::FlowError::Flushing) => { - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - glib::Continue(false) - } - Err(gst::FlowError::Eos) => { - gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); - let eos = gst::Event::new_eos().build(); - pad.push_event(eos).await; - glib::Continue(false) - } - Err(err) => { - gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - glib::Continue(false) - } - } - } - }); } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { @@ -534,7 +406,8 @@ impl PadSrcHandler for UdpSrcPadHandler { struct UdpSrc { src_pad: PadSrc, - src_pad_handler: StdMutex>, + src_pad_handler: UdpSrcPadHandler, + task: Task, socket: StdMutex>>, settings: StdMutex, } @@ -549,7 +422,6 @@ lazy_static! { impl UdpSrc { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut socket_storage = self.socket.lock().unwrap(); let mut settings = self.settings.lock().unwrap().clone(); gst_debug!(CAT, obj: element, "Preparing"); @@ -729,24 +601,18 @@ impl UdpSrc { ) })?; - *socket_storage = Some(socket); - drop(socket_storage); - + *self.socket.lock().unwrap() = Some(socket); element.notify("used-socket"); - let src_pad_handler = - UdpSrcPadHandler::new(settings.caps, settings.retrieve_sender_address); - - self.src_pad - .prepare(context, &src_pad_handler) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing src_pads: {:?}", err] - ) - })?; - - *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + self.task.prepare(context).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + self.src_pad_handler + .prepare(settings.caps, settings.retrieve_sender_address); + self.src_pad.prepare(&self.src_pad_handler); gst_debug!(CAT, obj: element, "Prepared"); @@ -756,15 +622,12 @@ impl UdpSrc { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); + *self.socket.lock().unwrap() = None; self.settings.lock().unwrap().used_socket = None; element.notify("used-socket"); - if let Some(socket) = self.socket.lock().unwrap().take() { - drop(socket); - } - - let _ = self.src_pad.unprepare(); - *self.src_pad_handler.lock().unwrap() = None; + self.task.unprepare().unwrap(); + self.src_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); @@ -774,16 +637,8 @@ impl UdpSrc { fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Stopping"); - // Now stop the task if it was still running, blocking - // until this has actually happened - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .reset(&self.src_pad.as_ref()); + self.task.stop(); + self.src_pad_handler.reset(); gst_debug!(CAT, obj: element, "Stopped"); @@ -791,64 +646,133 @@ impl UdpSrc { } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let socket = self.socket.lock().unwrap(); - if let Some(socket) = socket.as_ref() { - if socket.state() == SocketState::Started { - gst_debug!(CAT, obj: element, "Already started"); - return Err(()); - } - - gst_debug!(CAT, obj: element, "Starting"); - - self.start_unchecked(element, socket); - - gst_debug!(CAT, obj: element, "Started"); - - Ok(()) - } else { - Err(()) - } - } - - fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `socket` until `flush_stop` is complete - // so as to prevent race conditions due to concurrent state transitions. - // Note that this won't deadlock as it doesn't lock the `SocketStream` - // in use within the `src_pad`'s `Task`. let socket = self.socket.lock().unwrap(); let socket = socket.as_ref().unwrap(); if socket.state() == SocketState::Started { gst_debug!(CAT, obj: element, "Already started"); - return; + return Ok(()); } - gst_debug!(CAT, obj: element, "Stopping Flush"); + gst_debug!(CAT, obj: element, "Starting"); + self.start_task(element, socket); + gst_debug!(CAT, obj: element, "Started"); - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .flush(&self.src_pad.as_ref()); - - self.start_unchecked(element, socket); - - gst_debug!(CAT, obj: element, "Stopped Flush"); + Ok(()) } - fn start_unchecked(&self, element: &gst::Element, socket: &Socket) { + fn start_task(&self, element: &gst::Element, socket: &Socket) { let socket_stream = socket .start(element.get_clock(), Some(element.get_base_time())) .unwrap(); + let socket_stream = Arc::new(FutMutex::new(socket_stream)); - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .start_task(self.src_pad.as_ref(), element, socket_stream); + let src_pad_handler = self.src_pad_handler.clone(); + let pad_weak = self.src_pad.downgrade(); + let element = element.clone(); + + self.task.start(move || { + let src_pad_handler = src_pad_handler.clone(); + let pad_weak = pad_weak.clone(); + let element = element.clone(); + let socket_stream = socket_stream.clone(); + + async move { + let item = socket_stream.lock().await.next().await; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + let (mut buffer, saddr) = match item { + Some(Ok((buffer, saddr))) => (buffer, saddr), + Some(Err(err)) => { + gst_error!(CAT, obj: &element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst_element_error!( + element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + } + } + return glib::Continue(false); + } + None => { + gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped"); + return glib::Continue(false); + } + }; + + if let Some(saddr) = saddr { + if src_pad_handler.0.state.lock().await.retrieve_sender_address { + let inet_addr = match saddr.ip() { + IpAddr::V4(ip) => gio::InetAddress::new_from_bytes( + gio::InetAddressBytes::V4(&ip.octets()), + ), + IpAddr::V6(ip) => gio::InetAddress::new_from_bytes( + gio::InetAddressBytes::V6(&ip.octets()), + ), + }; + let inet_socket_addr = + &gio::InetSocketAddress::new(&inet_addr, saddr.port()); + NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr); + } + } + + match src_pad_handler.push_buffer(&pad, &element, buffer).await { + Ok(_) => { + gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer"); + glib::Continue(true) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + glib::Continue(false) + } + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); + let eos = gst::Event::new_eos().build(); + pad.push_event(eos).await; + glib::Continue(false) + } + Err(err) => { + gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + glib::Continue(false) + } + } + } + }); + } + + fn flush_stop(&self, element: &gst::Element) { + let socket = self.socket.lock().unwrap(); + if let Some(socket) = socket.as_ref() { + if socket.state() == SocketState::Started { + gst_debug!(CAT, obj: element, "Already started"); + return; + } + + gst_debug!(CAT, obj: element, "Stopping Flush"); + + self.src_pad_handler.set_need_segment(); + self.start_task(element, socket); + + gst_debug!(CAT, obj: element, "Stopped Flush"); + } else { + gst_debug!(CAT, obj: element, "Socket not available"); + } } fn flush_start(&self, element: &gst::Element) { @@ -859,20 +783,16 @@ impl UdpSrc { socket.pause(); } - self.src_pad.cancel_task(); + self.task.cancel(); gst_debug!(CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let socket = self.socket.lock().unwrap(); gst_debug!(CAT, obj: element, "Pausing"); - if let Some(socket) = socket.as_ref() { - socket.pause(); - } - - self.src_pad.pause_task(); + self.socket.lock().unwrap().as_ref().unwrap().pause(); + self.task.pause(); gst_debug!(CAT, obj: element, "Paused"); @@ -926,12 +846,13 @@ impl ObjectSubclass for UdpSrc { } fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - Self { - src_pad, - src_pad_handler: StdMutex::new(None), + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), + src_pad_handler: UdpSrcPadHandler::default(), + task: Task::default(), socket: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } @@ -1020,6 +941,7 @@ impl ObjectImpl for UdpSrc { let element = obj.downcast_ref::().unwrap(); element.add_pad(self.src_pad.gst_pad()).unwrap(); + super::set_element_flags(element, gst::ElementFlags::SOURCE); } } diff --git a/gst-plugin-threadshare/tests/pad.rs b/gst-plugin-threadshare/tests/pad.rs index ae06dddf..8bf680c0 100644 --- a/gst-plugin-threadshare/tests/pad.rs +++ b/gst-plugin-threadshare/tests/pad.rs @@ -39,7 +39,7 @@ use std::sync::Arc; use std::sync::Mutex as StdMutex; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; const DEFAULT_CONTEXT: &str = ""; const THROTTLING_DURATION: u32 = 2; @@ -67,7 +67,7 @@ static SRC_PROPERTIES: [glib::subclass::Property; 1] = ) })]; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] struct Settings { context: String, } @@ -80,106 +80,11 @@ lazy_static! { ); } -#[derive(Debug)] -struct PadSrcHandlerTestInner { - receiver: FutMutex>, -} - -impl PadSrcHandlerTestInner { - fn new(receiver: mpsc::Receiver) -> PadSrcHandlerTestInner { - PadSrcHandlerTestInner { - receiver: FutMutex::new(receiver), - } - } -} - #[derive(Clone, Debug)] -struct PadSrcHandlerTest(Arc); +struct PadSrcTestHandler; -impl PadSrcHandlerTest { - fn new(receiver: mpsc::Receiver) -> PadSrcHandlerTest { - PadSrcHandlerTest(Arc::new(PadSrcHandlerTestInner::new(receiver))) - } - - fn stop(&self, pad: &PadSrcRef<'_>) { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Stopping handler"); - - pad.stop_task(); - // From here on, the task is stopped so it can't hold resources anymore - - self.flush(pad); - - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handler stopped"); - } - - fn flush(&self, pad: &PadSrcRef<'_>) { - // Precondition: task must be stopped - // TODO: assert the task state when Task & PadSrc are separated - - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); - - // Purge the channel - let mut receiver = self - .0 - .receiver - .try_lock() - .expect("Channel receiver is locked elsewhere"); - loop { - match receiver.try_next() { - Ok(Some(_item)) => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Dropping pending item"); - } - Err(_) => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "No more pending item"); - break; - } - Ok(None) => { - panic!("Channel sender dropped"); - } - } - } - - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushed"); - } - - fn start_task(&self, pad: PadSrcRef<'_>) { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "SrcPad task starting"); - - let this = self.clone(); - let pad_weak = pad.downgrade(); - - pad.start_task(move || { - let pad_weak = pad_weak.clone(); - let this = this.clone(); - - async move { - let item = this.0.receiver.lock().await.next().await; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - - let item = match item { - Some(item) => item, - None => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); - return glib::Continue(false); - } - }; - - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - match this.push_item(pad, item).await { - Ok(_) => glib::Continue(true), - Err(gst::FlowError::Flushing) => glib::Continue(false), - Err(err) => panic!("Got error {:?}", err), - } - } - }); - } - - async fn push_item( - self, - pad: PadSrcRef<'_>, - item: Item, - ) -> Result { +impl PadSrcTestHandler { + async fn push_item(pad: PadSrcRef<'_>, item: Item) -> Result { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item); match item { @@ -194,7 +99,7 @@ impl PadSrcHandlerTest { } } -impl PadSrcHandler for PadSrcHandlerTest { +impl PadSrcHandler for PadSrcTestHandler { type ElementImpl = ElementSrcTest; fn src_event( @@ -239,9 +144,10 @@ enum ElementSrcTestState { #[derive(Debug)] struct ElementSrcTest { src_pad: PadSrc, - src_pad_handler: StdMutex>, + task: Task, state: StdMutex, sender: StdMutex>>, + receiver: StdMutex>>>>, settings: StdMutex, } @@ -249,10 +155,7 @@ impl ElementSrcTest { fn try_push(&self, item: Item) -> Result<(), Item> { let state = self.state.lock().unwrap(); if *state == ElementSrcTestState::RejectItems { - gst_debug!( - SRC_CAT, - "ElementSrcTest rejecting item due to element state" - ); + gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state"); return Err(item); } @@ -276,21 +179,17 @@ impl ElementSrcTest { ) })?; + self.task.prepare(context).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + self.src_pad.prepare(&PadSrcTestHandler); + let (sender, receiver) = mpsc::channel(1); *self.sender.lock().unwrap() = Some(sender); - - let src_pad_handler = PadSrcHandlerTest::new(receiver); - - self.src_pad - .prepare(context, &src_pad_handler) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Error joining Context: {:?}", err] - ) - })?; - - *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + *self.receiver.lock().unwrap() = Some(Arc::new(FutMutex::new(receiver))); gst_debug!(SRC_CAT, obj: element, "Prepared"); @@ -300,14 +199,60 @@ impl ElementSrcTest { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(SRC_CAT, obj: element, "Unpreparing"); - self.src_pad.unprepare().unwrap(); - *self.src_pad_handler.lock().unwrap() = None; + self.task.unprepare().unwrap(); + self.src_pad.unprepare(); + + *self.sender.lock().unwrap() = None; + *self.receiver.lock().unwrap() = None; gst_debug!(SRC_CAT, obj: element, "Unprepared"); Ok(()) } + fn stop(&self, element: &gst::Element) -> Result<(), ()> { + let mut state = self.state.lock().unwrap(); + gst_debug!(SRC_CAT, obj: element, "Stopping"); + + self.flush(element); + *state = ElementSrcTestState::RejectItems; + + gst_debug!(SRC_CAT, obj: element, "Stopped"); + + Ok(()) + } + + fn flush(&self, element: &gst::Element) { + gst_debug!(SRC_CAT, obj: element, "Flushing"); + + self.task.stop(); + + let receiver = self.receiver.lock().unwrap(); + let mut receiver = receiver + .as_ref() + .unwrap() + .try_lock() + .expect("receiver locked elsewhere"); + + // Purge the channel + loop { + match receiver.try_next() { + Ok(Some(_item)) => { + gst_debug!(SRC_CAT, obj: element, "Dropping pending item"); + } + Err(_) => { + gst_debug!(SRC_CAT, obj: element, "No more pending item"); + break; + } + Ok(None) => { + panic!("Channel sender dropped"); + } + } + } + + gst_debug!(SRC_CAT, obj: element, "Flushed"); + } + fn start(&self, element: &gst::Element) -> Result<(), ()> { let mut state = self.state.lock().unwrap(); if *state == ElementSrcTestState::Started { @@ -317,13 +262,45 @@ impl ElementSrcTest { gst_debug!(SRC_CAT, obj: element, "Starting"); - self.start_unchecked(&mut state); + self.start_task(); + *state = ElementSrcTestState::Started; gst_debug!(SRC_CAT, obj: element, "Started"); Ok(()) } + fn start_task(&self) { + let pad_weak = self.src_pad.downgrade(); + let receiver = Arc::clone(self.receiver.lock().unwrap().as_ref().expect("No receiver")); + + self.task.start(move || { + let pad_weak = pad_weak.clone(); + let receiver = Arc::clone(&receiver); + + async move { + let item = receiver.lock().await.next().await; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + + let item = match item { + Some(item) => item, + None => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); + return glib::Continue(false); + } + }; + + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + match PadSrcTestHandler::push_item(pad, item).await { + Ok(_) => glib::Continue(true), + Err(gst::FlowError::Flushing) => glib::Continue(false), + Err(err) => panic!("Got error {:?}", err), + } + } + }); + } + fn flush_stop(&self, element: &gst::Element) { let mut state = self.state.lock().unwrap(); if *state == ElementSrcTestState::Started { @@ -333,77 +310,34 @@ impl ElementSrcTest { gst_debug!(SRC_CAT, obj: element, "Stopping Flush"); - // Stop it so we wait for it to actually finish - self.src_pad.stop_task(); - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .flush(&self.src_pad.as_ref()); - - // And then start it again - self.start_unchecked(&mut state); + self.flush(element); + self.start_task(); + *state = ElementSrcTestState::Started; gst_debug!(SRC_CAT, obj: element, "Stopped Flush"); } - fn start_unchecked(&self, state: &mut ElementSrcTestState) { - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .start_task(self.src_pad.as_ref()); - - *state = ElementSrcTestState::Started; - } - fn flush_start(&self, element: &gst::Element) { - // Keep the lock on the `state` until `flush_start` is complete - // so as to prevent race conditions due to concurrent state transitions. let mut state = self.state.lock().unwrap(); - gst_debug!(SRC_CAT, obj: element, "Starting Flush"); + self.task.cancel(); *state = ElementSrcTestState::RejectItems; - self.src_pad.cancel_task(); gst_debug!(SRC_CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - // Lock the state to prevent race condition due to concurrent FlushStop let mut state = self.state.lock().unwrap(); - gst_debug!(SRC_CAT, obj: element, "Pausing"); - self.src_pad.pause_task(); - + self.task.pause(); *state = ElementSrcTestState::Paused; gst_debug!(SRC_CAT, obj: element, "Paused"); Ok(()) } - - fn stop(&self, element: &gst::Element) -> Result<(), ()> { - gst_debug!(SRC_CAT, obj: element, "Stopping"); - - *self.state.lock().unwrap() = ElementSrcTestState::RejectItems; - - self.src_pad_handler - .lock() - .unwrap() - .as_ref() - .unwrap() - .stop(&self.src_pad.as_ref()); - - gst_debug!(SRC_CAT, obj: element, "Stopped"); - - Ok(()) - } } impl ObjectSubclass for ElementSrcTest { @@ -436,19 +370,16 @@ impl ObjectSubclass for ElementSrcTest { } fn new_with_class(klass: &glib::subclass::simple::ClassStruct) -> Self { - let templ = klass.get_pad_template("src").unwrap(); - let src_pad = PadSrc::new_from_template(&templ, Some("src")); - - let settings = Settings { - context: String::new(), - }; - ElementSrcTest { - src_pad, - src_pad_handler: StdMutex::new(None), + src_pad: PadSrc::new(gst::Pad::new_from_template( + &klass.get_pad_template("src").unwrap(), + Some("src"), + )), + task: Task::default(), state: StdMutex::new(ElementSrcTestState::RejectItems), sender: StdMutex::new(None), - settings: StdMutex::new(settings), + receiver: StdMutex::new(None), + settings: StdMutex::new(Settings::default()), } } } @@ -525,9 +456,7 @@ impl ElementImpl for ElementSrcTest { fn send_event(&self, element: &gst::Element, event: gst::Event) -> bool { match event.view() { EventView::FlushStart(..) => { - // Cancel the task so that it finishes ASAP - // and clear the sender - self.pause(element).unwrap(); + self.flush_start(element); } EventView::FlushStop(..) => { self.flush_stop(element); @@ -569,16 +498,10 @@ static SINK_PROPERTIES: [glib::subclass::Property; 1] = ) })]; -#[derive(Clone, Debug)] -struct PadSinkHandlerTest; +#[derive(Clone, Debug, Default)] +struct PadSinkTestHandler; -impl Default for PadSinkHandlerTest { - fn default() -> Self { - PadSinkHandlerTest - } -} - -impl PadSinkHandler for PadSinkHandlerTest { +impl PadSinkHandler for PadSinkTestHandler { type ElementImpl = ElementSinkTest; fn sink_chain( @@ -764,10 +687,10 @@ impl ObjectSubclass for ElementSinkTest { fn new_with_class(klass: &glib::subclass::simple::ClassStruct) -> Self { let templ = klass.get_pad_template("sink").unwrap(); - let sink_pad = PadSink::new_from_template(&templ, Some("sink")); + let gst_pad = gst::Pad::new_from_template(&templ, Some("sink")); ElementSinkTest { - sink_pad, + sink_pad: PadSink::new(gst_pad), flushing: AtomicBool::new(true), sender: FutMutex::new(None), } @@ -811,7 +734,7 @@ impl ElementImpl for ElementSinkTest { match transition { gst::StateChange::NullToReady => { - self.sink_pad.prepare(&PadSinkHandlerTest::default()); + self.sink_pad.prepare(&PadSinkTestHandler::default()); } gst::StateChange::PausedToReady => { self.stop(element); @@ -1366,6 +1289,19 @@ fn start_flush() { EventView::Segment(_) => (), other => panic!("Unexpected event {:?}", other), }, + Item::Buffer(buffer) => { + // In some cases, the first Buffer might be processed before FlushStart + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + } other => panic!("Unexpected item {:?}", other), }