threadshare: separate Task from PadSrc

This commit is contained in:
François Laignel 2020-03-26 19:07:25 +01:00
parent 89682aa6a8
commit 20a9eba4c8
11 changed files with 1411 additions and 1767 deletions

View file

@ -1,4 +1,5 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -40,7 +41,7 @@ use std::sync::Mutex as StdMutex;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef};
use crate::runtime::{Context, PadSrc, PadSrcRef, Task};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -152,134 +153,35 @@ impl Default for AppSrcPadHandlerState {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
struct AppSrcPadHandlerInner {
state: FutMutex<AppSrcPadHandlerState>,
configured_caps: StdMutex<Option<gst::Caps>>,
receiver: FutMutex<mpsc::Receiver<StreamItem>>,
}
impl AppSrcPadHandlerInner {
fn new(receiver: mpsc::Receiver<StreamItem>, caps: Option<gst::Caps>) -> Self {
AppSrcPadHandlerInner {
state: FutMutex::new(AppSrcPadHandlerState {
caps,
..Default::default()
}),
configured_caps: StdMutex::new(None),
receiver: FutMutex::new(receiver),
}
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
struct AppSrcPadHandler(Arc<AppSrcPadHandlerInner>);
impl AppSrcPadHandler {
fn new(receiver: mpsc::Receiver<StreamItem>, caps: Option<gst::Caps>) -> AppSrcPadHandler {
AppSrcPadHandler(Arc::new(AppSrcPadHandlerInner::new(receiver, caps)))
}
fn reset(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler");
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
*self.0.configured_caps.lock().unwrap() = None;
self.flush(pad);
gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset");
}
fn flush(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
// Purge the channel
let mut receiver = self
.0
.receiver
.try_lock()
.expect("Channel receiver is locked elsewhere");
loop {
match receiver.try_next() {
Ok(Some(_item)) => {
gst_log!(CAT, obj: pad.gst_pad(), "Dropping pending item");
}
Err(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "No more pending item");
break;
}
Ok(None) => {
panic!("Channel sender dropped");
}
}
}
fn prepare(&self, caps: Option<gst::Caps>) {
self.0
.state
.try_lock()
.expect("state is locked elsewhere")
.expect("State locked elsewhere")
.caps = caps;
}
fn reset(&self) {
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
*self.0.configured_caps.lock().unwrap() = None;
}
fn set_need_segment(&self) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.need_segment = true;
gst_debug!(CAT, obj: pad.gst_pad(), "Flushed");
}
fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) {
let this = self.clone();
let pad_weak = pad.downgrade();
let element = element.clone();
pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
async move {
let item = this.0.receiver.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SrcPad channel aborted");
return glib::Continue(false);
}
};
match this.push_item(&pad, &element, item).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
glib::Continue(true)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
&element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
@ -429,9 +331,11 @@ enum AppSrcState {
#[derive(Debug)]
struct AppSrc {
src_pad: PadSrc,
src_pad_handler: StdMutex<Option<AppSrcPadHandler>>,
src_pad_handler: AppSrcPadHandler,
task: Task,
state: StdMutex<AppSrcState>,
sender: StdMutex<Option<mpsc::Sender<StreamItem>>>,
receiver: StdMutex<Option<Arc<FutMutex<mpsc::Receiver<StreamItem>>>>>,
settings: StdMutex<Settings>,
}
@ -439,7 +343,7 @@ impl AppSrc {
fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool {
let state = self.state.lock().unwrap();
if *state == AppSrcState::RejectBuffers {
gst_debug!(CAT, obj: element, "Rejecting buffer due to element state");
gst_debug!(CAT, obj: element, "Rejecting buffer due to pad state");
return false;
}
@ -503,22 +407,25 @@ impl AppSrc {
)
})?;
let max_buffers = settings.max_buffers.try_into().unwrap();
let (sender, receiver) = mpsc::channel(max_buffers);
*self.sender.lock().unwrap() = Some(sender);
let src_pad_handler = AppSrcPadHandler::new(receiver, settings.caps.clone());
self.src_pad
.prepare(context, &src_pad_handler)
.map_err(|err| {
let max_buffers = settings.max_buffers.try_into().map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing src_pads: {:?}", err]
gst::ResourceError::Settings,
["Invalid max-buffers: {}, {}", settings.max_buffers, err]
)
})?;
*self.src_pad_handler.lock().unwrap() = Some(src_pad_handler);
let (sender, receiver) = mpsc::channel(max_buffers);
*self.sender.lock().unwrap() = Some(sender);
*self.receiver.lock().unwrap() = Some(Arc::new(FutMutex::new(receiver)));
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad_handler.prepare(settings.caps.clone());
self.src_pad.prepare(&self.src_pad_handler);
gst_debug!(CAT, obj: element, "Prepared");
@ -528,10 +435,11 @@ impl AppSrc {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
let _ = self.src_pad.unprepare();
*self.src_pad_handler.lock().unwrap() = None;
*self.sender.lock().unwrap() = None;
*self.receiver.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -539,26 +447,51 @@ impl AppSrc {
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Stopping");
*self.state.lock().unwrap() = AppSrcState::RejectBuffers;
// Now stop the task if it was still running, blocking
// until this has actually happened
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.reset(&self.src_pad.as_ref());
self.flush(element);
self.src_pad_handler.reset();
*state = AppSrcState::RejectBuffers;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn flush(&self, element: &gst::Element) {
gst_log!(CAT, obj: element, "Flushing");
self.task.stop();
let receiver = self.receiver.lock().unwrap();
let mut receiver = receiver
.as_ref()
.unwrap()
.try_lock()
.expect("receiver locked elsewhere");
// Purge the channel
loop {
match receiver.try_next() {
Ok(Some(_item)) => {
gst_log!(CAT, obj: element, "Dropping pending item");
}
Err(_) => {
gst_log!(CAT, obj: element, "No more pending item");
break;
}
Ok(None) => {
panic!("Channel sender dropped");
}
}
}
self.src_pad_handler.set_need_segment();
gst_log!(CAT, obj: element, "Flushed");
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
if *state == AppSrcState::Started {
@ -568,16 +501,70 @@ impl AppSrc {
gst_debug!(CAT, obj: element, "Starting");
self.start_unchecked(element, &mut state);
self.start_task(element);
*state = AppSrcState::Started;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self, element: &gst::Element) {
let src_pad_handler = self.src_pad_handler.clone();
let pad_weak = self.src_pad.downgrade();
let element = element.clone();
let receiver = Arc::clone(self.receiver.lock().unwrap().as_ref().expect("No receiver"));
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let receiver = Arc::clone(&receiver);
async move {
let item = receiver.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SrcPad channel aborted");
return glib::Continue(false);
}
};
match src_pad_handler.push_item(&pad, &element, item).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
glib::Continue(true)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
&element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `state` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
let mut state = self.state.lock().unwrap();
if *state == AppSrcState::Started {
gst_debug!(CAT, obj: element, "Already started");
@ -586,52 +573,28 @@ impl AppSrc {
gst_debug!(CAT, obj: element, "Stopping Flush");
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.flush(&self.src_pad.as_ref());
self.start_unchecked(element, &mut state);
self.flush(element);
self.start_task(element);
*state = AppSrcState::Started;
gst_debug!(CAT, obj: element, "Flush Stopped");
}
fn start_unchecked(&self, element: &gst::Element, state: &mut AppSrcState) {
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.start_task(self.src_pad.as_ref(), element);
*state = AppSrcState::Started;
}
fn flush_start(&self, element: &gst::Element) {
// Keep the lock on the `state` until `flush_start` is complete
// so as to prevent race conditions due to concurrent state transitions.
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Starting Flush");
self.task.cancel();
*state = AppSrcState::RejectBuffers;
self.src_pad.cancel_task();
gst_debug!(CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
// Lock the state to prevent race condition due to concurrent FlushStop
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Pausing");
self.src_pad.pause_task();
self.task.pause();
*state = AppSrcState::Paused;
gst_debug!(CAT, obj: element, "Paused");
@ -706,14 +669,16 @@ impl ObjectSubclass for AppSrc {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
src_pad,
src_pad_handler: StdMutex::new(None),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad_handler: AppSrcPadHandler::default(),
task: Task::default(),
state: StdMutex::new(AppSrcState::RejectBuffers),
sender: StdMutex::new(None),
receiver: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}

View file

@ -28,7 +28,7 @@ use glib::{glib_object_impl, glib_object_subclass};
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error_msg, gst_log, gst_trace};
use gst::{gst_debug, gst_log, gst_trace};
use lazy_static::lazy_static;
@ -39,7 +39,7 @@ use std::u32;
use crate::get_current_running_time;
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -251,12 +251,9 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
let mut inner = this.0.lock().unwrap();
// Remember the segment for later use
match event.view() {
gst::EventView::Segment(e) => {
if let gst::EventView::Segment(e) = event.view() {
inner.segment = Some(e.get_segment().clone());
}
_ => (),
}
// We sent sticky events together with the next buffer once it becomes
// the active pad.
@ -282,8 +279,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
event: gst::Event,
) -> bool {
/* Drop all events for now */
match event.view() {
gst::EventView::FlushStart(..) => {
if let gst::EventView::FlushStart(..) = event.view() {
/* Unblock downstream */
inputselector.src_pad.gst_pad().push_event(event.clone());
@ -293,8 +289,6 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
abort_handle.abort();
}
}
_ => (),
}
true
}
@ -438,24 +432,7 @@ impl InputSelector {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap();
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
self.src_pad
.prepare(context, &InputSelectorPadSrcHandler {})
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error joining Context: {:?}", err]
)
})?;
self.src_pad.prepare(&InputSelectorPadSrcHandler);
gst_debug!(CAT, obj: element, "Prepared");
@ -466,7 +443,7 @@ impl InputSelector {
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Unpreparing");
let _ = self.src_pad.unprepare();
self.src_pad.unprepare();
*state = State::default();
@ -516,11 +493,11 @@ impl ObjectSubclass for InputSelector {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
src_pad,
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
pads: Mutex::new(Pads::default()),

View file

@ -42,7 +42,7 @@ use std::time::Duration;
use crate::get_current_running_time;
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@ -662,7 +662,7 @@ impl PadSinkHandler for SinkHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
if let EventView::FlushStart(..) = event.view() {
jb.src_pad.cancel_task();
jb.task.cancel();
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
@ -961,145 +961,6 @@ impl SrcHandler {
(now, Some((next_wakeup, Duration::from_nanos(delay))))
}
fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) {
let this = self.clone();
let element = element.clone();
pad.start_task(move || {
let this = this.clone();
let element = element.clone();
async move {
let jb = JitterBuffer::from_instance(&element);
let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap();
(
settings.latency_ms as u64 * gst::MSECOND,
settings.context_wait as u64 * gst::MSECOND,
)
};
loop {
let delay_fut = {
let mut state = jb.state.lock().unwrap();
let (_, next_wakeup) =
this.get_next_wakeup(&element, &state, latency, context_wait);
let (delay_fut, abort_handle) = match next_wakeup {
Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
Some((_, delay)) => {
runtime::time::delay_for(delay).await;
}
None => {
future::pending::<()>().await;
}
};
});
let next_wakeup =
next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
state.wait_handle = abort_handle;
delay_fut
};
// Got aborted, reschedule if needed
if let Some(delay_fut) = delay_fut {
gst_debug!(CAT, obj: &element, "Waiting");
if let Err(Aborted) = delay_fut.await {
gst_debug!(CAT, obj: &element, "Waiting aborted");
return glib::Continue(true);
}
}
let (head_pts, head_seq) = {
let state = jb.state.lock().unwrap();
//
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) =
this.get_next_wakeup(&element, &state, latency, context_wait);
gst_debug!(
CAT,
obj: &element,
"Woke up at {}, earliest_pts {}",
now,
state.earliest_pts
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return glib::Continue(true);
}
} else {
return glib::Continue(true);
}
let (head_pts, head_seq) = state.jbuf.borrow().peek();
(head_pts, head_seq)
};
let res = this.pop_and_push(&element).await;
{
let mut state = jb.state.lock().unwrap();
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
let (earliest_pts, earliest_seqnum) =
state.jbuf.borrow().find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
}
if res.is_ok() {
// Return and reschedule if the next packet would be in the future
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) =
this.get_next_wakeup(&element, &state, latency, context_wait);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return glib::Continue(true);
}
} else {
return glib::Continue(true);
}
}
}
match res {
Ok(_) => (),
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &element, "Pushing EOS event",);
let event = gst::Event::new_eos().build();
let _ = jb.src_pad.push_event(event).await;
return glib::Continue(false);
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &element, "Flushing",);
return glib::Continue(false);
}
Err(err) => {
gst_error!(CAT, obj: &element, "Error {}", err,);
return glib::Continue(false);
}
}
}
}
});
}
}
impl PadSrcHandler for SrcHandler {
@ -1118,7 +979,7 @@ impl PadSrcHandler for SrcHandler {
match event.view() {
EventView::FlushStart(..) => {
jb.src_pad.cancel_task();
jb.task.cancel();
}
EventView::FlushStop(..) => {
jb.flush_stop(element);
@ -1252,6 +1113,7 @@ struct JitterBuffer {
src_pad: PadSrc,
sink_pad_handler: SinkHandler,
src_pad_handler: SrcHandler,
task: Task,
state: StdMutex<State>,
settings: StdMutex<Settings>,
}
@ -1281,15 +1143,14 @@ impl JitterBuffer {
Context::acquire(&settings.context, settings.context_wait).unwrap()
};
self.src_pad
.prepare(context, &self.src_pad_handler)
.map_err(|err| {
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing src_pad: {:?}", err]
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad.prepare(&self.src_pad_handler);
self.sink_pad.prepare(&self.sink_pad_handler);
gst_info!(CAT, obj: element, "Prepared");
@ -1300,8 +1161,9 @@ impl JitterBuffer {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
self.sink_pad.unprepare();
let _ = self.src_pad.unprepare();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
}
@ -1313,12 +1175,162 @@ impl JitterBuffer {
self.sink_pad_handler.clear();
self.src_pad_handler.clear();
self.src_pad_handler
.start_task(self.src_pad.as_ref(), element);
self.start_task(element);
gst_debug!(CAT, obj: element, "Started");
}
fn start_task(&self, element: &gst::Element) {
let src_pad_handler = self.src_pad_handler.clone();
let element = element.clone();
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let element = element.clone();
async move {
let jb = JitterBuffer::from_instance(&element);
let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap();
(
settings.latency_ms as u64 * gst::MSECOND,
settings.context_wait as u64 * gst::MSECOND,
)
};
loop {
let delay_fut = {
let mut state = jb.state.lock().unwrap();
let (_, next_wakeup) = src_pad_handler.get_next_wakeup(
&element,
&state,
latency,
context_wait,
);
let (delay_fut, abort_handle) = match next_wakeup {
Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
Some((_, delay)) => {
runtime::time::delay_for(delay).await;
}
None => {
future::pending::<()>().await;
}
};
});
let next_wakeup =
next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
state.wait_handle = abort_handle;
delay_fut
};
// Got aborted, reschedule if needed
if let Some(delay_fut) = delay_fut {
gst_debug!(CAT, obj: &element, "Waiting");
if let Err(Aborted) = delay_fut.await {
gst_debug!(CAT, obj: &element, "Waiting aborted");
return glib::Continue(true);
}
}
let (head_pts, head_seq) = {
let state = jb.state.lock().unwrap();
//
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = src_pad_handler.get_next_wakeup(
&element,
&state,
latency,
context_wait,
);
gst_debug!(
CAT,
obj: &element,
"Woke up at {}, earliest_pts {}",
now,
state.earliest_pts
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return glib::Continue(true);
}
} else {
return glib::Continue(true);
}
let (head_pts, head_seq) = state.jbuf.borrow().peek();
(head_pts, head_seq)
};
let res = src_pad_handler.pop_and_push(&element).await;
{
let mut state = jb.state.lock().unwrap();
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
let (earliest_pts, earliest_seqnum) =
state.jbuf.borrow().find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
}
if res.is_ok() {
// Return and reschedule if the next packet would be in the future
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = src_pad_handler.get_next_wakeup(
&element,
&state,
latency,
context_wait,
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return glib::Continue(true);
}
} else {
return glib::Continue(true);
}
}
}
match res {
Ok(_) => (),
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &element, "Pushing EOS event",);
let event = gst::Event::new_eos().build();
let _ = jb.src_pad.push_event(event).await;
return glib::Continue(false);
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &element, "Flushing",);
return glib::Continue(false);
}
Err(err) => {
gst_error!(CAT, obj: &element, "Error {}", err,);
return glib::Continue(false);
}
}
}
}
});
}
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
@ -1326,7 +1338,7 @@ impl JitterBuffer {
abort_handle.abort();
}
self.src_pad.stop_task();
self.task.stop();
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
@ -1336,7 +1348,7 @@ impl JitterBuffer {
}
fn flush_stop(&self, element: &gst::Element) {
self.src_pad.stop_task();
self.task.stop();
self.start(element);
}
}
@ -1402,17 +1414,18 @@ impl ObjectSubclass for JitterBuffer {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = PadSink::new_from_template(&templ, Some("sink"));
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
sink_pad,
src_pad,
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
sink_pad_handler: SinkHandler::new(),
src_pad_handler: SrcHandler::new(),
task: Task::default(),
state: StdMutex::new(State::default()),
settings: StdMutex::new(Settings::default()),
}

View file

@ -39,7 +39,9 @@ use std::sync::{Arc, Weak};
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak};
use crate::runtime::{
Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task,
};
use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState};
@ -287,23 +289,8 @@ impl Drop for ProxyContext {
}
}
#[derive(Debug)]
struct ProxySinkPadHandlerInner {
proxy_ctx: ProxyContext,
}
#[derive(Clone, Debug)]
struct ProxySinkPadHandler(Arc<ProxySinkPadHandlerInner>);
impl ProxySinkPadHandler {
fn new(proxy_ctx: ProxyContext) -> Self {
ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner { proxy_ctx }))
}
fn proxy_ctx(&self) -> &ProxyContext {
&self.0.proxy_ctx
}
}
struct ProxySinkPadHandler;
impl PadSinkHandler for ProxySinkPadHandler {
type ElementImpl = ProxySink;
@ -360,12 +347,16 @@ impl PadSinkHandler for ProxySinkPadHandler {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
let src_pad = PROXY_SRC_PADS
let src_pad = {
let proxy_ctx = proxysink.proxy_ctx.lock().unwrap();
PROXY_SRC_PADS
.lock()
.unwrap()
.get(&self.proxy_ctx().name)
.get(&proxy_ctx.as_ref().unwrap().name)
.and_then(|src_pad| src_pad.upgrade())
.map(|src_pad| src_pad.gst_pad().clone());
.map(|src_pad| src_pad.gst_pad().clone())
};
if let EventView::FlushStart(..) = event.view() {
proxysink.stop(&element).unwrap();
@ -419,7 +410,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
#[derive(Debug)]
struct ProxySink {
sink_pad: PadSink,
sink_pad_handler: StdMutex<Option<ProxySinkPadHandler>>,
proxy_ctx: StdMutex<Option<ProxyContext>>,
settings: StdMutex<SettingsSink>,
}
@ -447,13 +438,8 @@ impl ProxySink {
loop {
let more_queue_space_receiver = {
let sink_pad_handler = sink.sink_pad_handler.lock().unwrap();
if sink_pad_handler.is_none() {
return;
}
let proxy_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx();
let mut shared_ctx = proxy_ctx.lock_shared();
let proxy_ctx = sink.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue");
@ -509,13 +495,8 @@ impl ProxySink {
item: DataQueueItem,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let wait_fut = {
let sink_pad_handler = self.sink_pad_handler.lock().unwrap();
let proxy_ctx = sink_pad_handler
.as_ref()
.ok_or(gst::FlowError::Error)?
.proxy_ctx();
let mut shared_ctx = proxy_ctx.lock_shared();
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
/* We've taken the lock again, make sure not to recreate
* a pending queue if tearing down */
@ -615,8 +596,8 @@ impl ProxySink {
wait_fut.await;
}
let sink_pad_handler = self.sink_pad_handler.lock().unwrap();
let shared_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx().lock_shared();
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res
}
@ -638,9 +619,9 @@ impl ProxySink {
proxy_sink_pads.insert(proxy_context, self.sink_pad.downgrade());
}
let handler = ProxySinkPadHandler::new(proxy_ctx);
self.sink_pad.prepare(&handler);
*self.sink_pad_handler.lock().unwrap() = Some(handler);
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
self.sink_pad.prepare(&ProxySinkPadHandler);
gst_debug!(SINK_CAT, obj: element, "Prepared");
@ -651,14 +632,17 @@ impl ProxySink {
gst_debug!(SINK_CAT, obj: element, "Unpreparing");
self.sink_pad.unprepare();
*self.sink_pad_handler.lock().unwrap() = None;
*self.proxy_ctx.lock().unwrap() = None;
gst_debug!(SINK_CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let sink_pad_handler = self.sink_pad_handler.lock().unwrap();
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
gst_debug!(SINK_CAT, obj: element, "Starting");
{
@ -667,7 +651,6 @@ impl ProxySink {
proxy_sink_pads.remove(&settings.proxy_context);
}
let mut shared_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
gst_debug!(SINK_CAT, obj: element, "Started");
@ -676,10 +659,11 @@ impl ProxySink {
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let sink_pad_handler = self.sink_pad_handler.lock().unwrap();
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
gst_debug!(SINK_CAT, obj: element, "Stopping");
let mut shared_ctx = sink_pad_handler.as_ref().unwrap().proxy_ctx().lock_shared();
let _ = shared_ctx.pending_queue.take();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
@ -720,12 +704,12 @@ impl ObjectSubclass for ProxySink {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = PadSink::new_from_template(&templ, Some("sink"));
Self {
sink_pad,
sink_pad_handler: StdMutex::new(None),
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
proxy_ctx: StdMutex::new(None),
settings: StdMutex::new(SettingsSink::default()),
}
}
@ -803,89 +787,18 @@ impl ElementImpl for ProxySink {
}
}
#[derive(Debug)]
struct ProxySrcPadHandlerInner {
proxy_ctx: ProxyContext,
}
#[derive(Clone, Debug)]
struct ProxySrcPadHandler(Arc<ProxySrcPadHandlerInner>);
struct ProxySrcPadHandler;
impl ProxySrcPadHandler {
fn new(proxy_ctx: ProxyContext) -> Self {
ProxySrcPadHandler(Arc::new(ProxySrcPadHandlerInner { proxy_ctx }))
}
fn proxy_ctx(&self) -> &ProxyContext {
&self.0.proxy_ctx
}
fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) {
let this = self.clone();
let pad_weak = pad.downgrade();
let element = element.clone();
pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let mut dataqueue = dataqueue.clone();
async move {
let item = dataqueue.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused");
return glib::Continue(false);
}
};
match this.push_item(&pad, item).await {
Ok(_) => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item");
let mut shared_ctx = this.proxy_ctx().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing");
let mut shared_ctx = this.proxy_ctx().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS");
let mut shared_ctx = this.proxy_ctx().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Eos);
glib::Continue(false)
}
Err(err) => {
gst_error!(SRC_CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let mut shared_ctx = this.proxy_ctx().lock_shared();
shared_ctx.last_res = Err(err);
glib::Continue(false)
}
}
}
});
}
async fn push_item(
&self,
pad: &PadSrcRef<'_>,
proxysrc: &ProxySrc,
item: DataQueueItem,
) -> Result<(), gst::FlowError> {
{
let mut shared_ctx = self.proxy_ctx().lock_shared();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
if let Some(ref mut pending_queue) = shared_ctx.pending_queue {
pending_queue.notify_more_queue_space();
}
@ -923,12 +836,16 @@ impl PadSrcHandler for ProxySrcPadHandler {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let sink_pad = PROXY_SINK_PADS
let sink_pad = {
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
PROXY_SINK_PADS
.lock()
.unwrap()
.get(&self.proxy_ctx().name)
.get(&proxy_ctx.as_ref().unwrap().name)
.and_then(|sink_pad| sink_pad.upgrade())
.map(|sink_pad| sink_pad.gst_pad().clone());
.map(|sink_pad| sink_pad.gst_pad().clone())
};
match event.view() {
EventView::FlushStart(..) => proxysrc.stop(element).unwrap(),
@ -996,7 +913,8 @@ impl PadSrcHandler for ProxySrcPadHandler {
#[derive(Debug)]
struct ProxySrc {
src_pad: PadSrc,
src_pad_handler: StdMutex<Option<ProxySrcPadHandler>>,
task: Task,
proxy_ctx: StdMutex<Option<ProxyContext>>,
dataqueue: StdMutex<Option<DataQueue>>,
settings: StdMutex<SettingsSrc>,
}
@ -1058,19 +976,19 @@ impl ProxySrc {
proxy_src_pads.insert(settings.proxy_context, self.src_pad.downgrade());
}
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
*self.dataqueue.lock().unwrap() = Some(dataqueue);
let handler = ProxySrcPadHandler::new(proxy_ctx);
self.src_pad.prepare(&ProxySrcPadHandler);
self.src_pad.prepare(ts_ctx, &handler).map_err(|err| {
self.task.prepare(ts_ctx).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing src_pad: {:?}", err]
["Error preparing Task: {:?}", err]
)
})?;
*self.src_pad_handler.lock().unwrap() = Some(handler);
gst_debug!(SRC_CAT, obj: element, "Prepared");
Ok(())
@ -1085,11 +1003,11 @@ impl ProxySrc {
proxy_src_pads.remove(&settings.proxy_context);
}
self.src_pad.stop_task();
let _ = self.src_pad.unprepare();
*self.src_pad_handler.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
*self.dataqueue.lock().unwrap() = None;
*self.proxy_ctx.lock().unwrap() = None;
gst_debug!(SRC_CAT, obj: element, "Unprepared");
@ -1104,9 +1022,7 @@ impl ProxySrc {
let dataqueue = self.dataqueue.lock().unwrap();
gst_debug!(SRC_CAT, obj: element, "Stopping");
// Now stop the task if it was still running, blocking
// until this has actually happened
self.src_pad.stop_task();
self.task.stop();
let dataqueue = dataqueue.as_ref().unwrap();
dataqueue.clear();
@ -1134,6 +1050,84 @@ impl ProxySrc {
Ok(())
}
fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) {
dataqueue.start();
{
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
}
self.start_task(element, dataqueue);
}
fn start_task(&self, element: &gst::Element, dataqueue: &DataQueue) {
let pad_weak = self.src_pad.downgrade();
let dataqueue = dataqueue.clone();
let element = element.clone();
self.task.start(move || {
let pad_weak = pad_weak.clone();
let mut dataqueue = dataqueue.clone();
let element = element.clone();
async move {
let item = dataqueue.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused");
return glib::Continue(false);
}
};
let proxysrc = ProxySrc::from_instance(&element);
match ProxySrcPadHandler::push_item(&pad, &proxysrc, item).await {
Ok(_) => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Eos);
glib::Continue(false)
}
Err(err) => {
gst_error!(SRC_CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(err);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `dataqueue` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
@ -1148,33 +1142,18 @@ impl ProxySrc {
gst_debug!(SRC_CAT, obj: element, "Stopping Flush");
self.src_pad.stop_task();
self.task.stop();
self.start_unchecked(element, dataqueue);
gst_debug!(SRC_CAT, obj: element, "Stopped Flush");
}
fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) {
dataqueue.start();
let src_pad_handler = self.src_pad_handler.lock().unwrap();
let src_pad_handler = src_pad_handler.as_ref().unwrap();
let mut shared_ctx = src_pad_handler.proxy_ctx().lock_shared();
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
src_pad_handler.start_task(self.src_pad.as_ref(), element, dataqueue.clone());
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
gst_debug!(SRC_CAT, obj: element, "Pausing");
dataqueue.as_ref().unwrap().pause();
self.src_pad.pause_task();
self.task.pause();
gst_debug!(SRC_CAT, obj: element, "Paused");
@ -1217,12 +1196,13 @@ impl ObjectSubclass for ProxySrc {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
src_pad,
src_pad_handler: StdMutex::new(None),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
task: Task::default(),
proxy_ctx: StdMutex::new(None),
dataqueue: StdMutex::new(None),
settings: StdMutex::new(SettingsSrc::default()),
}

View file

@ -33,12 +33,11 @@ use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_t
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState};
@ -253,79 +252,10 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
}
#[derive(Debug)]
struct QueuePadSrcHandlerInner {
last_res: Result<gst::FlowSuccess, gst::FlowError>,
}
impl Default for QueuePadSrcHandlerInner {
fn default() -> QueuePadSrcHandlerInner {
QueuePadSrcHandlerInner {
last_res: Ok(gst::FlowSuccess::Ok),
}
}
}
#[derive(Clone, Debug, Default)]
struct QueuePadSrcHandler(Arc<StdMutex<QueuePadSrcHandlerInner>>);
#[derive(Clone, Debug)]
struct QueuePadSrcHandler;
impl QueuePadSrcHandler {
fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) {
let this = self.clone();
let pad_weak = pad.downgrade();
let element = element.clone();
pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let mut dataqueue = dataqueue.clone();
async move {
let item = dataqueue.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped");
return glib::Continue(false);
}
};
match Self::push_item(&pad, &element, item).await {
Ok(()) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
this.0.lock().unwrap().last_res = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
this.0.lock().unwrap().last_res = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
this.0.lock().unwrap().last_res = Err(gst::FlowError::Eos);
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
this.0.lock().unwrap().last_res = Err(err);
glib::Continue(false)
}
}
}
});
}
async fn push_item(
pad: &PadSrcRef<'_>,
element: &gst::Element,
@ -353,10 +283,6 @@ impl QueuePadSrcHandler {
}
}
}
fn unprepare(&self) {
*self.0.lock().unwrap() = QueuePadSrcHandlerInner::default();
}
}
impl PadSrcHandler for QueuePadSrcHandler {
@ -426,9 +352,10 @@ impl PadSrcHandler for QueuePadSrcHandler {
struct Queue {
sink_pad: PadSink,
src_pad: PadSrc,
src_pad_handler: QueuePadSrcHandler,
task: Task,
dataqueue: StdMutex<Option<DataQueue>>,
pending_queue: StdMutex<Option<PendingQueue>>,
last_res: StdMutex<Result<gst::FlowSuccess, gst::FlowError>>,
settings: StdMutex<Settings>,
}
@ -598,7 +525,7 @@ impl Queue {
wait_fut.await;
}
self.src_pad_handler.0.lock().unwrap().last_res
*self.last_res.lock().unwrap()
}
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
@ -636,14 +563,14 @@ impl Queue {
)
})?;
self.src_pad
.prepare(context, &self.src_pad_handler)
.map_err(|err| {
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error joining Context: {:?}", err]
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad.prepare(&QueuePadSrcHandler);
self.sink_pad.prepare(&QueuePadSinkHandler);
gst_debug!(CAT, obj: element, "Prepared");
@ -654,20 +581,43 @@ impl Queue {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
self.src_pad.stop_task();
self.sink_pad.unprepare();
let _ = self.src_pad.unprepare();
self.task.unprepare().unwrap();
self.src_pad.unprepare();
self.src_pad_handler.unprepare();
*self.dataqueue.lock().unwrap() = None;
*self.pending_queue.lock().unwrap() = None;
*self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
gst_debug!(CAT, obj: element, "Stopping");
*self.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
self.task.stop();
if let Some(dataqueue) = dataqueue.as_ref() {
dataqueue.pause();
dataqueue.clear();
dataqueue.stop();
}
if let Some(mut pending_queue) = self.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
@ -678,13 +628,74 @@ impl Queue {
gst_debug!(CAT, obj: element, "Starting");
self.start_unchecked(element, dataqueue);
dataqueue.start();
*self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
self.start_task(element, dataqueue);
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self, element: &gst::Element, dataqueue: &DataQueue) {
let pad_weak = self.src_pad.downgrade();
let dataqueue = dataqueue.clone();
let element = element.clone();
self.task.start(move || {
let pad_weak = pad_weak.clone();
let mut dataqueue = dataqueue.clone();
let element = element.clone();
async move {
let item = dataqueue.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped");
return glib::Continue(false);
}
};
let queue = Queue::from_instance(&element);
match QueuePadSrcHandler::push_item(&pad, &element, item).await {
Ok(()) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
*queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
*queue.last_res.lock().unwrap() = Err(err);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `dataqueue` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
@ -699,42 +710,11 @@ impl Queue {
gst_debug!(CAT, obj: element, "Stopping Flush");
self.src_pad.stop_task();
self.start_unchecked(element, dataqueue);
dataqueue.start();
self.start_task(element, dataqueue);
gst_debug!(CAT, obj: element, "Stopped Flush");
}
fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) {
dataqueue.start();
self.src_pad_handler.0.lock().unwrap().last_res = Ok(gst::FlowSuccess::Ok);
self.src_pad_handler
.start_task(self.src_pad.as_ref(), element, dataqueue.clone());
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
gst_debug!(CAT, obj: element, "Stopping");
self.src_pad.stop_task();
if let Some(dataqueue) = dataqueue.as_ref() {
dataqueue.pause();
dataqueue.clear();
dataqueue.stop();
}
if let Some(pending_queue) = self.pending_queue.lock().unwrap().as_mut() {
pending_queue.notify_more_queue_space();
}
self.src_pad_handler.0.lock().unwrap().last_res = Err(gst::FlowError::Flushing);
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
}
impl ObjectSubclass for Queue {
@ -777,18 +757,19 @@ impl ObjectSubclass for Queue {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = PadSink::new_from_template(&templ, Some("sink"));
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
sink_pad,
src_pad,
src_pad_handler: QueuePadSrcHandler::default(),
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
task: Task::default(),
dataqueue: StdMutex::new(None),
pending_queue: StdMutex::new(None),
last_res: StdMutex::new(Ok(gst::FlowSuccess::Ok)),
settings: StdMutex::new(Settings::default()),
}
}

View file

@ -50,6 +50,7 @@ pub mod pad;
pub use pad::{PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak};
pub mod task;
pub use task::{Task, TaskState};
pub mod prelude {
pub use super::pad::{PadSinkHandler, PadSrcHandler};

View file

@ -78,37 +78,12 @@ use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error};
use gst::{FlowError, FlowSuccess};
use std::fmt;
use std::marker::PhantomData;
use std::sync;
use std::sync::{Arc, Weak};
use super::executor::{block_on_or_add_sub_task, Context};
use super::task::Task;
use super::RUNTIME_CAT;
/// Errors related to [`PadSrc`] `Context` handling.
///
/// [`PadSrc`]: struct.PadSrc.html
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum PadContextError {
ActiveContext,
ActiveTask,
}
impl fmt::Display for PadContextError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PadContextError::ActiveContext => {
write!(f, "The PadSrc is already operating on a Context")
}
PadContextError::ActiveTask => write!(f, "A task is still active"),
}
}
}
impl std::error::Error for PadContextError {}
#[inline]
fn event_ret_to_event_full_res(
ret: bool,
@ -169,10 +144,10 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
gst_error!(
RUNTIME_CAT,
obj: gst_pad,
"Error in PadSink activate: {:?}",
"Error in PadSrc activate: {:?}",
err
);
gst_loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
gst_loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
})
}
@ -229,14 +204,9 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
}
}
#[derive(Default, Debug)]
struct PadSrcState;
#[derive(Debug)]
struct PadSrcInner {
state: sync::Mutex<PadSrcState>,
gst_pad: gst::Pad,
task: Task,
}
impl PadSrcInner {
@ -245,11 +215,7 @@ impl PadSrcInner {
panic!("Wrong pad direction for PadSrc");
}
PadSrcInner {
state: sync::Mutex::new(PadSrcState::default()),
gst_pad,
task: Task::default(),
}
PadSrcInner { gst_pad }
}
}
@ -331,33 +297,6 @@ impl<'a> PadSrcRef<'a> {
self.strong.push_event(event).await
}
/// `Start` the `Pad` `task`.
///
/// The `Task` will loop on the provided `func`.
/// The execution occurs on the `Task`'s context.
pub fn start_task<F, Fut>(&self, func: F)
where
F: (FnMut() -> Fut) + Send + 'static,
Fut: Future<Output = glib::Continue> + Send + 'static,
{
self.strong.start_task(func);
}
/// Pauses the `Started` `Pad` `Task`.
pub fn pause_task(&self) {
self.strong.pause_task();
}
/// Cancels the `Started` `Pad` `Task`.
pub fn cancel_task(&self) {
self.strong.cancel_task();
}
/// Stops the `Started` `Pad` `Task`.
pub fn stop_task(&self) {
self.strong.stop_task();
}
fn activate_mode_hook(
&self,
mode: gst::PadMode,
@ -468,30 +407,6 @@ impl PadSrcStrong {
was_handled
}
#[inline]
fn start_task<F, Fut>(&self, func: F)
where
F: (FnMut() -> Fut) + Send + 'static,
Fut: Future<Output = glib::Continue> + Send + 'static,
{
self.0.task.start(func);
}
#[inline]
fn pause_task(&self) {
self.0.task.pause();
}
#[inline]
fn cancel_task(&self) {
self.0.task.cancel();
}
#[inline]
fn stop_task(&self) {
self.0.task.stop();
}
}
/// The `PadSrc` which `Element`s must own.
@ -513,10 +428,6 @@ impl PadSrc {
this
}
pub fn new_from_template(templ: &gst::PadTemplate, name: Option<&str>) -> Self {
Self::new(gst::Pad::new_from_template(templ, name))
}
pub fn as_ref(&self) -> PadSrcRef<'_> {
PadSrcRef::new(Arc::clone(&(self.0).0))
}
@ -647,52 +558,16 @@ impl PadSrc {
});
}
pub fn prepare<H: PadSrcHandler>(
&self,
context: Context,
handler: &H,
) -> Result<(), super::task::TaskError> {
let _state = (self.0).0.state.lock().unwrap();
pub fn prepare<H: PadSrcHandler>(&self, handler: &H) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing");
(self.0).0.task.prepare(context)?;
self.init_pad_functions(handler);
Ok(())
}
pub fn prepare_with_func<H: PadSrcHandler, F, Fut>(
&self,
context: Context,
handler: &H,
prepare_func: F,
) -> Result<(), super::task::TaskError>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let _state = (self.0).0.state.lock().unwrap();
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing");
(self.0).0.task.prepare_with_func(context, prepare_func)?;
self.init_pad_functions(handler);
Ok(())
}
/// Releases the resources held by this `PadSrc`.
pub fn unprepare(&self) -> Result<(), PadContextError> {
let _state = (self.0).0.state.lock().unwrap();
pub fn unprepare(&self) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Unpreparing");
(self.0)
.0
.task
.unprepare()
.map_err(|_| PadContextError::ActiveTask)?;
self.gst_pad()
.set_activate_function(move |_gst_pad, _parent| {
Err(gst_loggable_error!(RUNTIME_CAT, "PadSrc unprepared"))
@ -704,8 +579,6 @@ impl PadSrc {
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
self.gst_pad()
.set_query_function(move |_gst_pad, _parent, _query| false);
Ok(())
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
@ -719,30 +592,6 @@ impl PadSrc {
pub async fn push_event(&self, event: gst::Event) -> bool {
self.0.push_event(event).await
}
/// `Start` the `Pad` `task`.
///
/// The `Task` will loop on the provided `func`.
/// The execution occurs on the `Task`'s context.
pub fn start_task<F, Fut>(&self, func: F)
where
F: (FnMut() -> Fut) + Send + 'static,
Fut: Future<Output = glib::Continue> + Send + 'static,
{
self.0.start_task(func);
}
pub fn pause_task(&self) {
self.0.pause_task();
}
pub fn cancel_task(&self) {
self.0.cancel_task();
}
pub fn stop_task(&self) {
self.0.stop_task();
}
}
/// A trait to define `handler`s for [`PadSink`] callbacks.
@ -1044,8 +893,8 @@ impl PadSink {
this
}
pub fn new_from_template(templ: &gst::PadTemplate, name: Option<&str>) -> Self {
Self::new(gst::Pad::new_from_template(templ, name))
pub fn as_ref(&self) -> PadSinkRef<'_> {
PadSinkRef::new(Arc::clone(&(self.0).0))
}
pub fn gst_pad(&self) -> &gst::Pad {

View file

@ -44,9 +44,9 @@ use std::u16;
use tokio::io::AsyncReadExt;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef};
use crate::runtime::{Context, PadSrc, PadSrcRef, Task};
use super::socket::{Socket, SocketError, SocketRead, SocketState, SocketStream};
use super::socket::{Socket, SocketError, SocketRead, SocketState};
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000;
@ -191,140 +191,35 @@ impl Default for TcpClientSrcPadHandlerState {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
struct TcpClientSrcPadHandlerInner {
state: FutMutex<TcpClientSrcPadHandlerState>,
configured_caps: StdMutex<Option<gst::Caps>>,
}
impl TcpClientSrcPadHandlerInner {
fn new(caps: Option<gst::Caps>) -> Self {
TcpClientSrcPadHandlerInner {
state: FutMutex::new(TcpClientSrcPadHandlerState {
caps,
..Default::default()
}),
configured_caps: StdMutex::new(None),
}
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
struct TcpClientSrcPadHandler(Arc<TcpClientSrcPadHandlerInner>);
impl TcpClientSrcPadHandler {
fn new(caps: Option<gst::Caps>) -> Self {
TcpClientSrcPadHandler(Arc::new(TcpClientSrcPadHandlerInner::new(caps)))
}
fn reset(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler");
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
*self.0.configured_caps.lock().unwrap() = None;
gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset");
}
fn flush(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
fn prepare(&self, caps: Option<gst::Caps>) {
self.0
.state
.try_lock()
.expect("state is locked elsewhere")
.expect("State locked elsewhere")
.caps = caps;
}
fn reset(&self) {
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
*self.0.configured_caps.lock().unwrap() = None;
}
fn set_need_segment(&self) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.need_segment = true;
gst_debug!(CAT, obj: pad.gst_pad(), "Flushed");
}
fn start_task(
&self,
pad: PadSrcRef<'_>,
element: &gst::Element,
socket_stream: SocketStream<TcpClientReader>,
) {
let this = self.clone();
let pad_weak = pad.downgrade();
let element = element.clone();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let buffer = match item {
Some(Ok((buffer, _))) => buffer,
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
return glib::Continue(false);
}
};
let res = this.push_buffer(&pad, &element, buffer).await;
match res {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
@ -465,7 +360,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
struct TcpClientSrc {
src_pad: PadSrc,
src_pad_handler: StdMutex<Option<TcpClientSrcPadHandler>>,
src_pad_handler: TcpClientSrcPadHandler,
task: Task,
socket: StdMutex<Option<Socket<TcpClientReader>>>,
settings: StdMutex<Settings>,
}
@ -480,7 +376,6 @@ lazy_static! {
impl TcpClientSrc {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut socket_storage = self.socket.lock().unwrap();
let settings = self.settings.lock().unwrap().clone();
gst_debug!(CAT, obj: element, "Preparing");
@ -538,21 +433,16 @@ impl TcpClientSrc {
)
})?;
*socket_storage = Some(socket);
drop(socket_storage);
*self.socket.lock().unwrap() = Some(socket);
let src_pad_handler = TcpClientSrcPadHandler::new(settings.caps);
self.src_pad
.prepare(context, &src_pad_handler)
.map_err(|err| {
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing src_pads: {:?}", err]
["Error preparing Task: {:?}", err]
)
})?;
*self.src_pad_handler.lock().unwrap() = Some(src_pad_handler);
self.src_pad_handler.prepare(settings.caps);
self.src_pad.prepare(&self.src_pad_handler);
gst_debug!(CAT, obj: element, "Prepared");
@ -562,12 +452,10 @@ impl TcpClientSrc {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
if let Some(socket) = self.socket.lock().unwrap().take() {
drop(socket);
}
*self.socket.lock().unwrap() = None;
let _ = self.src_pad.unprepare();
*self.src_pad_handler.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -577,16 +465,8 @@ impl TcpClientSrc {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Stopping");
// Now stop the task if it was still running, blocking
// until this has actually happened
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.reset(&self.src_pad.as_ref());
self.task.stop();
self.src_pad_handler.reset();
gst_debug!(CAT, obj: element, "Stopped");
@ -595,31 +475,102 @@ impl TcpClientSrc {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
if let Some(socket) = socket.as_ref() {
let socket = socket.as_ref().unwrap();
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Err(());
return Ok(());
}
gst_debug!(CAT, obj: element, "Starting");
self.start_unchecked(element, socket);
self.start_task(element, socket);
gst_debug!(CAT, obj: element, "Started");
Ok(())
} else {
Err(())
}
fn start_task(&self, element: &gst::Element, socket: &Socket<TcpClientReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
let src_pad_handler = self.src_pad_handler.clone();
let pad_weak = self.src_pad.downgrade();
let element = element.clone();
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let buffer = match item {
Some(Ok((buffer, _))) => buffer,
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
return glib::Continue(false);
}
};
match src_pad_handler.push_buffer(&pad, &element, buffer).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `socket` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
// Note that this won't deadlock as it doesn't lock the `SocketStream`
// in use within the `src_pad`'s `Task`.
let socket = self.socket.lock().unwrap();
let socket = socket.as_ref().unwrap();
if let Some(socket) = socket.as_ref() {
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
@ -627,31 +578,13 @@ impl TcpClientSrc {
gst_debug!(CAT, obj: element, "Stopping Flush");
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.flush(&self.src_pad.as_ref());
self.start_unchecked(element, socket);
self.src_pad_handler.set_need_segment();
self.start_task(element, socket);
gst_debug!(CAT, obj: element, "Stopped Flush");
} else {
gst_debug!(CAT, obj: element, "Socket not available");
}
fn start_unchecked(&self, element: &gst::Element, socket: &Socket<TcpClientReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.start_task(self.src_pad.as_ref(), element, socket_stream);
}
fn flush_start(&self, element: &gst::Element) {
@ -662,20 +595,16 @@ impl TcpClientSrc {
socket.pause();
}
self.src_pad.cancel_task();
self.task.cancel();
gst_debug!(CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Pausing");
if let Some(socket) = socket.as_ref() {
socket.pause();
}
self.src_pad.pause_task();
self.socket.lock().unwrap().as_ref().unwrap().pause();
self.task.pause();
gst_debug!(CAT, obj: element, "Paused");
@ -714,11 +643,11 @@ impl ObjectSubclass for TcpClientSrc {
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
src_pad,
src_pad_handler: StdMutex::new(None),
src_pad: PadSrc::new(gst::Pad::new_from_template(&templ, Some("src"))),
src_pad_handler: TcpClientSrcPadHandler::default(),
task: Task::default(),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}

View file

@ -38,16 +38,15 @@ use gst::{
use lazy_static::lazy_static;
use crate::runtime::prelude::*;
use crate::runtime::task::Task;
use crate::runtime::{self, Context, PadSink, PadSinkRef};
use crate::runtime::{self, Context, PadSink, PadSinkRef, Task};
use crate::socket::{wrap_socket, GioSocketWrapper};
use std::convert::TryInto;
use std::mem;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::string::ToString;
use std::sync::Mutex as StdMutex;
use std::sync::MutexGuard as StdMutexGuard;
use std::sync::{RwLock, RwLockWriteGuard};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::u16;
use std::u8;
@ -119,19 +118,6 @@ impl Default for Settings {
}
}
#[derive(Debug)]
enum TaskItem {
Buffer(gst::Buffer),
Event(gst::Event),
}
#[derive(Debug)]
struct UdpSink {
sink_pad: PadSink,
sink_pad_handler: UdpSinkPadHandler,
settings: Arc<StdMutex<Settings>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-udpsink",
@ -329,11 +315,16 @@ static PROPERTIES: [subclass::Property; 19] = [
];
#[derive(Debug)]
struct UdpSinkPadHandlerState {
enum TaskItem {
Buffer(gst::Buffer),
Event(gst::Event),
}
#[derive(Debug)]
struct UdpSinkPadHandlerInner {
sync: bool,
segment: Option<gst::Segment>,
latency: gst::ClockTime,
task: Option<Task>,
socket: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
clients: Arc<Vec<SocketAddr>>,
@ -343,16 +334,12 @@ struct UdpSinkPadHandlerState {
settings: Arc<StdMutex<Settings>>,
}
#[derive(Clone, Debug)]
struct UdpSinkPadHandler(Arc<RwLock<UdpSinkPadHandlerState>>);
impl UdpSinkPadHandler {
fn new(settings: Arc<StdMutex<Settings>>) -> UdpSinkPadHandler {
Self(Arc::new(RwLock::new(UdpSinkPadHandlerState {
impl UdpSinkPadHandlerInner {
fn new(settings: Arc<StdMutex<Settings>>) -> Self {
UdpSinkPadHandlerInner {
sync: DEFAULT_SYNC,
segment: None,
latency: gst::CLOCK_TIME_NONE,
task: None,
socket: Arc::new(Mutex::new(None)),
socket_v6: Arc::new(Mutex::new(None)),
clients: Arc::new(vec![SocketAddr::new(
@ -363,7 +350,134 @@ impl UdpSinkPadHandler {
clients_to_unconfigure: vec![],
sender: Arc::new(Mutex::new(None)),
settings,
})))
}
}
fn clear_clients(
&mut self,
gst_pad: &gst::Pad,
clients_to_add: impl Iterator<Item = SocketAddr>,
) {
Arc::make_mut(&mut self.clients).clear();
self.clients_to_configure = vec![];
self.clients_to_unconfigure = vec![];
for addr in clients_to_add {
self.add_client(gst_pad, addr);
}
}
fn remove_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) {
if !self.clients.contains(&addr) {
gst_warning!(CAT, obj: gst_pad, "Not removing unknown client {:?}", &addr);
return;
}
gst_info!(CAT, obj: gst_pad, "Removing client {:?}", addr);
Arc::make_mut(&mut self.clients).retain(|addr2| addr != *addr2);
self.clients_to_unconfigure.push(addr);
self.clients_to_configure.retain(|addr2| addr != *addr2);
}
fn replace_client(
&mut self,
gst_pad: &gst::Pad,
addr: Option<SocketAddr>,
new_addr: Option<SocketAddr>,
) {
if let Some(addr) = addr {
self.remove_client(gst_pad, addr);
}
if let Some(new_addr) = new_addr {
self.add_client(gst_pad, new_addr);
}
}
fn add_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) {
if self.clients.contains(&addr) {
gst_warning!(CAT, obj: gst_pad, "Not adding client {:?} again", &addr);
return;
}
gst_info!(CAT, obj: gst_pad, "Adding client {:?}", addr);
Arc::make_mut(&mut self.clients).push(addr);
self.clients_to_configure.push(addr);
self.clients_to_unconfigure.retain(|addr2| addr != *addr2);
}
}
#[derive(Debug)]
enum SocketQualified {
Ipv4(tokio::net::UdpSocket),
Ipv6(tokio::net::UdpSocket),
}
#[derive(Clone, Debug)]
struct UdpSinkPadHandler(Arc<RwLock<UdpSinkPadHandlerInner>>);
impl UdpSinkPadHandler {
fn new(settings: Arc<StdMutex<Settings>>) -> UdpSinkPadHandler {
Self(Arc::new(RwLock::new(UdpSinkPadHandlerInner::new(settings))))
}
fn set_latency(&self, latency: gst::ClockTime) {
self.0.write().unwrap().latency = latency;
}
fn prepare(&self) {
let mut inner = self.0.write().unwrap();
inner.clients_to_configure = inner.clients.to_vec();
}
fn prepare_socket(&self, socket: SocketQualified) {
let mut inner = self.0.write().unwrap();
match socket {
SocketQualified::Ipv4(socket) => inner.socket = Arc::new(Mutex::new(Some(socket))),
SocketQualified::Ipv6(socket) => inner.socket_v6 = Arc::new(Mutex::new(Some(socket))),
}
}
fn unprepare(&self) {
let mut inner = self.0.write().unwrap();
*inner = UdpSinkPadHandlerInner::new(Arc::clone(&inner.settings))
}
fn clear_clients(&self, gst_pad: &gst::Pad, clients_to_add: impl Iterator<Item = SocketAddr>) {
self.0
.write()
.unwrap()
.clear_clients(gst_pad, clients_to_add);
}
fn remove_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) {
self.0.write().unwrap().remove_client(gst_pad, addr);
}
fn replace_client(
&self,
gst_pad: &gst::Pad,
addr: Option<SocketAddr>,
new_addr: Option<SocketAddr>,
) {
self.0
.write()
.unwrap()
.replace_client(gst_pad, addr, new_addr);
}
fn add_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) {
self.0.write().unwrap().add_client(gst_pad, addr);
}
fn get_clients(&self) -> Vec<SocketAddr> {
(*self.0.read().unwrap().clients).clone()
}
fn configure_client(
@ -511,32 +625,32 @@ impl UdpSinkPadHandler {
socket_v6,
settings,
) = {
let mut state = self.0.write().unwrap();
let do_sync = state.sync;
let mut inner = self.0.write().unwrap();
let do_sync = inner.sync;
let mut rtime: gst::ClockTime = 0.into();
if let Some(segment) = &state.segment {
if let Some(segment) = &inner.segment {
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
rtime = segment.to_running_time(buffer.get_pts());
if state.latency.is_some() {
rtime += state.latency;
if inner.latency.is_some() {
rtime += inner.latency;
}
}
}
let clients_to_configure = mem::replace(&mut state.clients_to_configure, vec![]);
let clients_to_unconfigure = mem::replace(&mut state.clients_to_unconfigure, vec![]);
let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]);
let clients_to_unconfigure = mem::replace(&mut inner.clients_to_unconfigure, vec![]);
let settings = state.settings.lock().unwrap().clone();
let settings = inner.settings.lock().unwrap().clone();
(
do_sync,
rtime,
Arc::clone(&state.clients),
Arc::clone(&inner.clients),
clients_to_configure,
clients_to_unconfigure,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
Arc::clone(&inner.socket),
Arc::clone(&inner.socket_v6),
settings,
)
};
@ -642,64 +756,11 @@ impl UdpSinkPadHandler {
let _ = element.post_message(&gst::Message::new_eos().src(Some(element)).build());
}
EventView::Segment(e) => {
let mut state = self.0.write().unwrap();
state.segment = Some(e.get_segment().clone());
self.0.write().unwrap().segment = Some(e.get_segment().clone());
}
_ => (),
}
}
fn unprepare(&self) {
if let Some(task) = &self.0.read().unwrap().task {
task.unprepare().unwrap();
}
}
fn stop_task(&self) {
if let Some(task) = &self.0.read().unwrap().task {
task.stop();
}
}
fn start_task(&self, element: &gst::Element) {
let (sender, receiver) = mpsc::channel(0);
self.0.write().unwrap().sender = Arc::new(Mutex::new(Some(sender)));
if let Some(task) = &self.0.read().unwrap().task {
let receiver = Arc::new(Mutex::new(receiver));
let this = self.clone();
let element_clone = element.clone();
task.start(move || {
let receiver = Arc::clone(&receiver);
let element = element_clone.clone();
let this = this.clone();
async move {
match receiver.lock().await.next().await {
Some(TaskItem::Buffer(buffer)) => {
match this.render(&element, buffer).await {
Err(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to render item, stopping task: {}", err]
);
glib::Continue(false)
}
_ => glib::Continue(true),
}
}
Some(TaskItem::Event(event)) => {
this.handle_event(&element, event).await;
glib::Continue(true)
}
None => glib::Continue(false),
}
}
});
}
}
}
impl PadSinkHandler for UdpSinkPadHandler {
@ -752,15 +813,16 @@ impl PadSinkHandler for UdpSinkPadHandler {
event: gst::Event,
) -> BoxFuture<'static, bool> {
let sender = Arc::clone(&self.0.read().unwrap().sender);
let this = self.clone();
let element = element.clone();
async move {
if let EventView::FlushStop(_) = event.view() {
this.start_task(&element);
let udpsink = UdpSink::from_instance(&element);
let _ = udpsink.start(&element);
} else if let Some(sender) = sender.lock().await.as_mut() {
sender.send(TaskItem::Event(event)).await.unwrap();
}
true
}
.boxed()
@ -769,35 +831,49 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_event(
&self,
_pad: &PadSinkRef,
_udpsink: &UdpSink,
_element: &gst::Element,
udpsink: &UdpSink,
element: &gst::Element,
event: gst::Event,
) -> bool {
match event.view() {
EventView::FlushStart(..) => {
self.stop_task();
}
_ => (),
if let EventView::FlushStart(..) = event.view() {
let _ = udpsink.stop(&element);
}
true
}
}
#[derive(Debug)]
enum SocketFamily {
Ipv4,
Ipv6,
}
#[derive(Debug)]
struct UdpSink {
sink_pad: PadSink,
sink_pad_handler: UdpSinkPadHandler,
task: Task,
settings: Arc<StdMutex<Settings>>,
}
impl UdpSink {
fn prepare_socket_family(
fn prepare_socket(
&self,
family: SocketFamily,
context: &Context,
element: &gst::Element,
ipv6: bool,
) -> Result<(), gst::ErrorMessage> {
let mut settings = self.settings.lock().unwrap();
let socket = if let Some(ref wrapped_socket) = if ipv6 {
&settings.socket_v6
} else {
&settings.socket
} {
let wrapped_socket = match family {
SocketFamily::Ipv4 => &settings.socket,
SocketFamily::Ipv6 => &settings.socket_v6,
};
let socket_qualified: SocketQualified;
if let Some(ref wrapped_socket) = wrapped_socket {
use std::net::UdpSocket;
let socket: UdpSocket;
@ -820,18 +896,20 @@ impl UdpSink {
})
})?;
if ipv6 {
settings.used_socket_v6 = Some(wrapped_socket.clone());
} else {
match family {
SocketFamily::Ipv4 => {
settings.used_socket = Some(wrapped_socket.clone());
socket_qualified = SocketQualified::Ipv4(socket);
}
SocketFamily::Ipv6 => {
settings.used_socket_v6 = Some(wrapped_socket.clone());
socket_qualified = SocketQualified::Ipv6(socket);
}
}
socket
} else {
let bind_addr = if ipv6 {
&settings.bind_address_v6
} else {
&settings.bind_address
let bind_addr = match family {
SocketFamily::Ipv4 => &settings.bind_address,
SocketFamily::Ipv6 => &settings.bind_address_v6,
};
let bind_addr: IpAddr = bind_addr.parse().map_err(|err| {
@ -841,19 +919,17 @@ impl UdpSink {
)
})?;
let bind_port = if ipv6 {
settings.bind_port_v6
} else {
settings.bind_port
let bind_port = match family {
SocketFamily::Ipv4 => settings.bind_port,
SocketFamily::Ipv6 => settings.bind_port_v6,
};
let saddr = SocketAddr::new(bind_addr, bind_port as u16);
gst_debug!(CAT, obj: element, "Binding to {:?}", saddr);
let builder = if ipv6 {
net2::UdpBuilder::new_v6()
} else {
net2::UdpBuilder::new_v4()
let builder = match family {
SocketFamily::Ipv4 => net2::UdpBuilder::new_v4(),
SocketFamily::Ipv6 => net2::UdpBuilder::new_v6(),
};
let builder = match builder {
@ -863,7 +939,10 @@ impl UdpSink {
CAT,
obj: element,
"Failed to create {} socket builder: {}",
if ipv6 { "IPv6" } else { "IPv4" },
match family {
SocketFamily::Ipv4 => "IPv4",
SocketFamily::Ipv6 => "IPv6",
},
err
);
return Ok(());
@ -897,33 +976,19 @@ impl UdpSink {
})?;
}
if ipv6 {
settings.used_socket_v6 = Some(wrapper);
} else {
match family {
SocketFamily::Ipv4 => {
settings.used_socket = Some(wrapper);
socket_qualified = SocketQualified::Ipv4(socket)
}
SocketFamily::Ipv6 => {
settings.used_socket_v6 = Some(wrapper);
socket_qualified = SocketQualified::Ipv6(socket)
}
}
}
socket
};
let mut state = self.sink_pad_handler.0.write().unwrap();
if ipv6 {
state.socket_v6 = Arc::new(Mutex::new(Some(socket)));
} else {
state.socket = Arc::new(Mutex::new(Some(socket)));
}
Ok(())
}
fn prepare_sockets(
&self,
context: &Context,
element: &gst::Element,
) -> Result<(), gst::ErrorMessage> {
self.prepare_socket_family(context, element, false)?;
self.prepare_socket_family(context, element, true)?;
self.sink_pad_handler.prepare_socket(socket_qualified);
Ok(())
}
@ -942,20 +1007,18 @@ impl UdpSink {
})?
};
self.sink_pad.prepare(&self.sink_pad_handler);
self.prepare_sockets(&context, element).unwrap();
self.sink_pad_handler.prepare();
self.prepare_socket(SocketFamily::Ipv4, &context, element)?;
self.prepare_socket(SocketFamily::Ipv6, &context, element)?;
let task = Task::default();
task.prepare(context).map_err(|err| {
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to start task: {}", err]
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
let mut state = self.sink_pad_handler.0.write().unwrap();
state.task = Some(task);
state.clients_to_configure = state.clients.to_vec();
self.sink_pad.prepare(&self.sink_pad_handler);
gst_debug!(CAT, obj: element, "Started preparing");
@ -965,105 +1028,108 @@ impl UdpSink {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
self.sink_pad_handler.unprepare();
self.sink_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Starting");
self.sink_pad_handler.start_task(&element);
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Stopping");
self.sink_pad_handler.stop_task();
self.task.stop();
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn clear_clients(
&self,
element: &gst::Element,
state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>,
settings: &StdMutexGuard<'_, Settings>,
) {
let clients = Arc::make_mut(&mut state.clients);
clients.clear();
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Starting");
state.clients_to_configure = vec![];
state.clients_to_unconfigure = vec![];
let sink_pad_handler = self.sink_pad_handler.clone();
let element_clone = element.clone();
if let Some(host) = &settings.host {
self.add_client(&element, state, &host, settings.port as u16);
let (sender, receiver) = mpsc::channel(0);
let receiver = Arc::new(Mutex::new(receiver));
sink_pad_handler.0.write().unwrap().sender = Arc::new(Mutex::new(Some(sender)));
self.task.start(move || {
let receiver = Arc::clone(&receiver);
let element = element_clone.clone();
let sink_pad_handler = sink_pad_handler.clone();
async move {
match receiver.lock().await.next().await {
Some(TaskItem::Buffer(buffer)) => {
match sink_pad_handler.render(&element, buffer).await {
Err(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to render item, stopping task: {}", err]
);
glib::Continue(false)
}
_ => glib::Continue(true),
}
}
Some(TaskItem::Event(event)) => {
sink_pad_handler.handle_event(&element, event).await;
glib::Continue(true)
}
None => glib::Continue(false),
}
}
});
Ok(())
}
}
fn remove_client(
&self,
element: &gst::Element,
state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>,
host: &str,
port: u16,
) {
impl UdpSink {
fn clear_clients(&self, clients_to_add: impl Iterator<Item = SocketAddr>) {
self.sink_pad_handler
.clear_clients(&self.sink_pad.gst_pad(), clients_to_add);
}
fn remove_client(&self, addr: SocketAddr) {
self.sink_pad_handler
.remove_client(&self.sink_pad.gst_pad(), addr);
}
fn replace_client(&self, addr: Option<SocketAddr>, new_addr: Option<SocketAddr>) {
self.sink_pad_handler
.replace_client(&self.sink_pad.gst_pad(), addr, new_addr);
}
fn add_client(&self, addr: SocketAddr) {
self.sink_pad_handler
.add_client(&self.sink_pad.gst_pad(), addr);
}
}
fn try_into_socket_addr(element: &gst::Element, host: &str, port: u32) -> Result<SocketAddr, ()> {
let addr: IpAddr = match host.parse() {
Err(err) => {
gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err);
return;
return Err(());
}
Ok(addr) => addr,
};
let addr = SocketAddr::new(addr, port);
if !state.clients.contains(&addr) {
gst_warning!(CAT, obj: element, "Not removing unknown client {:?}", &addr);
return;
}
gst_info!(CAT, obj: element, "Removing client {:?}", addr);
let clients = Arc::make_mut(&mut state.clients);
clients.retain(|addr2| addr != *addr2);
state.clients_to_unconfigure.push(addr);
state.clients_to_configure.retain(|addr2| addr != *addr2);
}
fn add_client(
&self,
element: &gst::Element,
state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>,
host: &str,
port: u16,
) {
let addr: IpAddr = match host.parse() {
let port: u16 = match port.try_into() {
Err(err) => {
gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err);
return;
gst_error!(CAT, obj: element, "Invalid port {}: {}", port, err);
return Err(());
}
Ok(addr) => addr,
Ok(port) => port,
};
let addr = SocketAddr::new(addr, port);
if state.clients.contains(&addr) {
gst_warning!(CAT, obj: element, "Not adding client {:?} again", &addr);
return;
}
gst_info!(CAT, obj: element, "Adding client {:?}", addr);
let clients = Arc::make_mut(&mut state.clients);
clients.push(addr);
state.clients_to_configure.push(addr);
state.clients_to_unconfigure.retain(|addr2| addr != *addr2);
}
Ok(SocketAddr::new(addr, port))
}
impl ObjectSubclass for UdpSink {
@ -1109,12 +1175,12 @@ impl ObjectSubclass for UdpSink {
let port = args[2]
.get::<i32>()
.expect("signal arg")
.expect("missing signal arg");
.expect("missing signal arg") as u32;
if let Ok(addr) = try_into_socket_addr(&element, &host, port) {
let udpsink = Self::from_instance(&element);
let mut state = udpsink.sink_pad_handler.0.write().unwrap();
udpsink.add_client(&element, &mut state, &host, port as u16);
udpsink.add_client(addr);
}
None
},
@ -1137,15 +1203,15 @@ impl ObjectSubclass for UdpSink {
let port = args[2]
.get::<i32>()
.expect("signal arg")
.expect("missing signal arg");
.expect("missing signal arg") as u32;
let udpsink = Self::from_instance(&element);
let mut state = udpsink.sink_pad_handler.0.write().unwrap();
let settings = udpsink.settings.lock().unwrap();
if Some(&host) != settings.host.as_ref() || port != settings.port as i32 {
udpsink.remove_client(&element, &mut state, &host, port as u16);
if Some(&host) != settings.host.as_ref() || port != settings.port {
if let Ok(addr) = try_into_socket_addr(&element, &host, port) {
udpsink.remove_client(addr);
}
}
None
@ -1164,10 +1230,13 @@ impl ObjectSubclass for UdpSink {
.expect("missing signal arg");
let udpsink = Self::from_instance(&element);
let mut state = udpsink.sink_pad_handler.0.write().unwrap();
let settings = udpsink.settings.lock().unwrap();
let current_client = settings
.host
.iter()
.filter_map(|host| try_into_socket_addr(&element, host, settings.port).ok());
udpsink.clear_clients(&element, &mut state, &settings);
udpsink.clear_clients(current_client);
None
},
@ -1177,14 +1246,15 @@ impl ObjectSubclass for UdpSink {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = PadSink::new_from_template(&templ, Some("sink"));
let settings = Arc::new(StdMutex::new(Settings::default()));
let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings));
Self {
sink_pad,
sink_pad_handler,
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
sink_pad_handler: UdpSinkPadHandler::new(Arc::clone(&settings)),
task: Task::default(),
settings,
}
}
@ -1200,33 +1270,39 @@ impl ObjectImpl for UdpSink {
let mut settings = self.settings.lock().unwrap();
match *prop {
subclass::Property("host", ..) => {
let mut state = self.sink_pad_handler.0.write().unwrap();
if let Some(host) = &settings.host {
self.remove_client(&element, &mut state, &host, settings.port as u16);
}
let current_client = settings
.host
.as_ref()
.and_then(|host| try_into_socket_addr(&element, host, settings.port).ok());
settings.host = value.get().expect("type checked upstream");
let new_host = value.get().expect("type checked upstream");
if let Some(host) = &settings.host {
self.add_client(&element, &mut state, &host, settings.port as u16);
}
let new_client = new_host
.and_then(|host| try_into_socket_addr(&element, host, settings.port).ok());
self.replace_client(current_client, new_client);
settings.host = new_host.map(ToString::to_string);
}
subclass::Property("port", ..) => {
let mut state = self.sink_pad_handler.0.write().unwrap();
if let Some(host) = &settings.host {
self.remove_client(&element, &mut state, &host, settings.port as u16);
}
let current_client = settings
.host
.as_ref()
.and_then(|host| try_into_socket_addr(&element, host, settings.port).ok());
settings.port = value.get_some().expect("type checked upstream");
let new_port = value.get_some().expect("type checked upstream");
if let Some(host) = &settings.host {
self.add_client(&element, &mut state, &host, settings.port as u16);
}
let new_client = settings
.host
.as_ref()
.and_then(|host| try_into_socket_addr(&element, host, new_port).ok());
self.replace_client(current_client, new_client);
settings.port = new_port;
}
subclass::Property("sync", ..) => {
settings.sync = value.get_some().expect("type checked upstream");
let mut state = self.sink_pad_handler.0.write().unwrap();
state.sync = settings.sync;
}
subclass::Property("bind-address", ..) => {
settings.bind_address = value
@ -1284,21 +1360,35 @@ impl ObjectImpl for UdpSink {
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
let mut state = self.sink_pad_handler.0.write().unwrap();
let clients = clients.split(',');
self.clear_clients(element, &mut state, &settings);
drop(settings);
for client in clients {
let split: Vec<&str> = client.rsplitn(2, ':').collect();
let current_client = settings
.host
.iter()
.filter_map(|host| try_into_socket_addr(&element, host, settings.port).ok());
if split.len() == 2 {
match split[0].parse::<u16>() {
Ok(port) => self.add_client(element, &mut state, split[1], port),
Err(_) => (),
}
}
let clients_iter = current_client.chain(clients.split(',').filter_map(|client| {
let rsplit: Vec<&str> = client.rsplitn(2, ':').collect();
if rsplit.len() == 2 {
rsplit[0]
.parse::<u32>()
.map_err(|err| {
gst_error!(
CAT,
obj: element,
"Invalid port {}: {}",
rsplit[0],
err
);
})
.and_then(|port| try_into_socket_addr(&element, rsplit[1], port))
.ok()
} else {
None
}
}));
self.clear_clients(clients_iter);
}
subclass::Property("context", ..) => {
settings.context = value
@ -1351,10 +1441,13 @@ impl ObjectImpl for UdpSink {
subclass::Property("ttl-mc", ..) => Ok(settings.ttl_mc.to_value()),
subclass::Property("qos-dscp", ..) => Ok(settings.qos_dscp.to_value()),
subclass::Property("clients", ..) => {
let state = self.sink_pad_handler.0.read().unwrap();
let clients: Vec<String> = self
.sink_pad_handler
.get_clients()
.iter()
.map(ToString::to_string)
.collect();
let clients: Vec<String> =
state.clients.iter().map(|addr| addr.to_string()).collect();
Ok(clients.join(",").to_value())
}
subclass::Property("context", ..) => Ok(settings.context.to_value()),
@ -1406,9 +1499,7 @@ impl ElementImpl for UdpSink {
fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
match event.view() {
EventView::Latency(ev) => {
let mut state = self.sink_pad_handler.0.write().unwrap();
state.latency = ev.get_latency();
self.sink_pad_handler.set_latency(ev.get_latency());
self.sink_pad.gst_pad().push_event(event)
}
EventView::Step(..) => false,

View file

@ -44,11 +44,9 @@ use std::sync::Mutex as StdMutex;
use std::u16;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef};
use crate::runtime::{Context, PadSrc, PadSrcRef, Task};
use super::socket::{
wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState, SocketStream,
};
use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState};
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000;
@ -245,160 +243,34 @@ impl Default for UdpSrcPadHandlerState {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
struct UdpSrcPadHandlerInner {
state: FutMutex<UdpSrcPadHandlerState>,
configured_caps: StdMutex<Option<gst::Caps>>,
}
impl UdpSrcPadHandlerInner {
fn new(caps: Option<gst::Caps>, retrieve_sender_address: bool) -> Self {
UdpSrcPadHandlerInner {
state: FutMutex::new(UdpSrcPadHandlerState {
retrieve_sender_address,
caps,
..Default::default()
}),
configured_caps: StdMutex::new(None),
}
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
struct UdpSrcPadHandler(Arc<UdpSrcPadHandlerInner>);
impl UdpSrcPadHandler {
fn new(caps: Option<gst::Caps>, retrieve_sender_address: bool) -> UdpSrcPadHandler {
UdpSrcPadHandler(Arc::new(UdpSrcPadHandlerInner::new(
caps,
retrieve_sender_address,
)))
fn prepare(&self, caps: Option<gst::Caps>, retrieve_sender_address: bool) {
let mut state = self.0.state.try_lock().expect("State locked elsewhere");
state.caps = caps;
state.retrieve_sender_address = retrieve_sender_address;
}
fn reset(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler");
fn reset(&self) {
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
*self.0.configured_caps.lock().unwrap() = None;
gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset");
}
fn flush(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
fn set_need_segment(&self) {
self.0
.state
.try_lock()
.expect("state is locked elsewhere")
.expect("State locked elsewhere")
.need_segment = true;
gst_debug!(CAT, obj: pad.gst_pad(), "Flushed");
}
fn start_task(
&self,
pad: PadSrcRef<'_>,
element: &gst::Element,
socket_stream: SocketStream<UdpReader>,
) {
let this = self.clone();
let pad_weak = pad.downgrade();
let element = element.clone();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let (mut buffer, saddr) = match item {
Some(Ok((buffer, saddr))) => (buffer, saddr),
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
return glib::Continue(false);
}
};
if let Some(saddr) = saddr {
if this.0.state.lock().await.retrieve_sender_address {
let inet_addr = match saddr.ip() {
IpAddr::V4(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V4(&ip.octets()),
),
IpAddr::V6(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V6(&ip.octets()),
),
};
let inet_socket_addr =
&gio::InetSocketAddress::new(&inet_addr, saddr.port());
NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr);
}
}
let res = this.push_buffer(&pad, &element, buffer).await;
match res {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
@ -534,7 +406,8 @@ impl PadSrcHandler for UdpSrcPadHandler {
struct UdpSrc {
src_pad: PadSrc,
src_pad_handler: StdMutex<Option<UdpSrcPadHandler>>,
src_pad_handler: UdpSrcPadHandler,
task: Task,
socket: StdMutex<Option<Socket<UdpReader>>>,
settings: StdMutex<Settings>,
}
@ -549,7 +422,6 @@ lazy_static! {
impl UdpSrc {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut socket_storage = self.socket.lock().unwrap();
let mut settings = self.settings.lock().unwrap().clone();
gst_debug!(CAT, obj: element, "Preparing");
@ -729,24 +601,18 @@ impl UdpSrc {
)
})?;
*socket_storage = Some(socket);
drop(socket_storage);
*self.socket.lock().unwrap() = Some(socket);
element.notify("used-socket");
let src_pad_handler =
UdpSrcPadHandler::new(settings.caps, settings.retrieve_sender_address);
self.src_pad
.prepare(context, &src_pad_handler)
.map_err(|err| {
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing src_pads: {:?}", err]
["Error preparing Task: {:?}", err]
)
})?;
*self.src_pad_handler.lock().unwrap() = Some(src_pad_handler);
self.src_pad_handler
.prepare(settings.caps, settings.retrieve_sender_address);
self.src_pad.prepare(&self.src_pad_handler);
gst_debug!(CAT, obj: element, "Prepared");
@ -756,15 +622,12 @@ impl UdpSrc {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
*self.socket.lock().unwrap() = None;
self.settings.lock().unwrap().used_socket = None;
element.notify("used-socket");
if let Some(socket) = self.socket.lock().unwrap().take() {
drop(socket);
}
let _ = self.src_pad.unprepare();
*self.src_pad_handler.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -774,16 +637,8 @@ impl UdpSrc {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Stopping");
// Now stop the task if it was still running, blocking
// until this has actually happened
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.reset(&self.src_pad.as_ref());
self.task.stop();
self.src_pad_handler.reset();
gst_debug!(CAT, obj: element, "Stopped");
@ -792,31 +647,118 @@ impl UdpSrc {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
if let Some(socket) = socket.as_ref() {
let socket = socket.as_ref().unwrap();
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Err(());
return Ok(());
}
gst_debug!(CAT, obj: element, "Starting");
self.start_unchecked(element, socket);
self.start_task(element, socket);
gst_debug!(CAT, obj: element, "Started");
Ok(())
} else {
Err(())
}
fn start_task(&self, element: &gst::Element, socket: &Socket<UdpReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
let src_pad_handler = self.src_pad_handler.clone();
let pad_weak = self.src_pad.downgrade();
let element = element.clone();
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let (mut buffer, saddr) = match item {
Some(Ok((buffer, saddr))) => (buffer, saddr),
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
return glib::Continue(false);
}
};
if let Some(saddr) = saddr {
if src_pad_handler.0.state.lock().await.retrieve_sender_address {
let inet_addr = match saddr.ip() {
IpAddr::V4(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V4(&ip.octets()),
),
IpAddr::V6(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V6(&ip.octets()),
),
};
let inet_socket_addr =
&gio::InetSocketAddress::new(&inet_addr, saddr.port());
NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr);
}
}
match src_pad_handler.push_buffer(&pad, &element, buffer).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `socket` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
// Note that this won't deadlock as it doesn't lock the `SocketStream`
// in use within the `src_pad`'s `Task`.
let socket = self.socket.lock().unwrap();
let socket = socket.as_ref().unwrap();
if let Some(socket) = socket.as_ref() {
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
@ -824,31 +766,13 @@ impl UdpSrc {
gst_debug!(CAT, obj: element, "Stopping Flush");
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.flush(&self.src_pad.as_ref());
self.start_unchecked(element, socket);
self.src_pad_handler.set_need_segment();
self.start_task(element, socket);
gst_debug!(CAT, obj: element, "Stopped Flush");
} else {
gst_debug!(CAT, obj: element, "Socket not available");
}
fn start_unchecked(&self, element: &gst::Element, socket: &Socket<UdpReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.start_task(self.src_pad.as_ref(), element, socket_stream);
}
fn flush_start(&self, element: &gst::Element) {
@ -859,20 +783,16 @@ impl UdpSrc {
socket.pause();
}
self.src_pad.cancel_task();
self.task.cancel();
gst_debug!(CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Pausing");
if let Some(socket) = socket.as_ref() {
socket.pause();
}
self.src_pad.pause_task();
self.socket.lock().unwrap().as_ref().unwrap().pause();
self.task.pause();
gst_debug!(CAT, obj: element, "Paused");
@ -926,12 +846,13 @@ impl ObjectSubclass for UdpSrc {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
Self {
src_pad,
src_pad_handler: StdMutex::new(None),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad_handler: UdpSrcPadHandler::default(),
task: Task::default(),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
@ -1020,6 +941,7 @@ impl ObjectImpl for UdpSrc {
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(self.src_pad.gst_pad()).unwrap();
super::set_element_flags(element, gst::ElementFlags::SOURCE);
}
}

View file

@ -39,7 +39,7 @@ use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: u32 = 2;
@ -67,7 +67,7 @@ static SRC_PROPERTIES: [glib::subclass::Property; 1] =
)
})];
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
struct Settings {
context: String,
}
@ -80,106 +80,11 @@ lazy_static! {
);
}
#[derive(Debug)]
struct PadSrcHandlerTestInner {
receiver: FutMutex<mpsc::Receiver<Item>>,
}
impl PadSrcHandlerTestInner {
fn new(receiver: mpsc::Receiver<Item>) -> PadSrcHandlerTestInner {
PadSrcHandlerTestInner {
receiver: FutMutex::new(receiver),
}
}
}
#[derive(Clone, Debug)]
struct PadSrcHandlerTest(Arc<PadSrcHandlerTestInner>);
struct PadSrcTestHandler;
impl PadSrcHandlerTest {
fn new(receiver: mpsc::Receiver<Item>) -> PadSrcHandlerTest {
PadSrcHandlerTest(Arc::new(PadSrcHandlerTestInner::new(receiver)))
}
fn stop(&self, pad: &PadSrcRef<'_>) {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Stopping handler");
pad.stop_task();
// From here on, the task is stopped so it can't hold resources anymore
self.flush(pad);
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handler stopped");
}
fn flush(&self, pad: &PadSrcRef<'_>) {
// Precondition: task must be stopped
// TODO: assert the task state when Task & PadSrc are separated
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing");
// Purge the channel
let mut receiver = self
.0
.receiver
.try_lock()
.expect("Channel receiver is locked elsewhere");
loop {
match receiver.try_next() {
Ok(Some(_item)) => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Dropping pending item");
}
Err(_) => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "No more pending item");
break;
}
Ok(None) => {
panic!("Channel sender dropped");
}
}
}
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushed");
}
fn start_task(&self, pad: PadSrcRef<'_>) {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "SrcPad task starting");
let this = self.clone();
let pad_weak = pad.downgrade();
pad.start_task(move || {
let pad_weak = pad_weak.clone();
let this = this.clone();
async move {
let item = this.0.receiver.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted");
return glib::Continue(false);
}
};
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
match this.push_item(pad, item).await {
Ok(_) => glib::Continue(true),
Err(gst::FlowError::Flushing) => glib::Continue(false),
Err(err) => panic!("Got error {:?}", err),
}
}
});
}
async fn push_item(
self,
pad: PadSrcRef<'_>,
item: Item,
) -> Result<gst::FlowSuccess, gst::FlowError> {
impl PadSrcTestHandler {
async fn push_item(pad: PadSrcRef<'_>, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
match item {
@ -194,7 +99,7 @@ impl PadSrcHandlerTest {
}
}
impl PadSrcHandler for PadSrcHandlerTest {
impl PadSrcHandler for PadSrcTestHandler {
type ElementImpl = ElementSrcTest;
fn src_event(
@ -239,9 +144,10 @@ enum ElementSrcTestState {
#[derive(Debug)]
struct ElementSrcTest {
src_pad: PadSrc,
src_pad_handler: StdMutex<Option<PadSrcHandlerTest>>,
task: Task,
state: StdMutex<ElementSrcTestState>,
sender: StdMutex<Option<mpsc::Sender<Item>>>,
receiver: StdMutex<Option<Arc<FutMutex<mpsc::Receiver<Item>>>>>,
settings: StdMutex<Settings>,
}
@ -249,10 +155,7 @@ impl ElementSrcTest {
fn try_push(&self, item: Item) -> Result<(), Item> {
let state = self.state.lock().unwrap();
if *state == ElementSrcTestState::RejectItems {
gst_debug!(
SRC_CAT,
"ElementSrcTest rejecting item due to element state"
);
gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state");
return Err(item);
}
@ -276,21 +179,17 @@ impl ElementSrcTest {
)
})?;
let (sender, receiver) = mpsc::channel(1);
*self.sender.lock().unwrap() = Some(sender);
let src_pad_handler = PadSrcHandlerTest::new(receiver);
self.src_pad
.prepare(context, &src_pad_handler)
.map_err(|err| {
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error joining Context: {:?}", err]
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad.prepare(&PadSrcTestHandler);
*self.src_pad_handler.lock().unwrap() = Some(src_pad_handler);
let (sender, receiver) = mpsc::channel(1);
*self.sender.lock().unwrap() = Some(sender);
*self.receiver.lock().unwrap() = Some(Arc::new(FutMutex::new(receiver)));
gst_debug!(SRC_CAT, obj: element, "Prepared");
@ -300,14 +199,60 @@ impl ElementSrcTest {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(SRC_CAT, obj: element, "Unpreparing");
self.src_pad.unprepare().unwrap();
*self.src_pad_handler.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
*self.sender.lock().unwrap() = None;
*self.receiver.lock().unwrap() = None;
gst_debug!(SRC_CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
gst_debug!(SRC_CAT, obj: element, "Stopping");
self.flush(element);
*state = ElementSrcTestState::RejectItems;
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
fn flush(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Flushing");
self.task.stop();
let receiver = self.receiver.lock().unwrap();
let mut receiver = receiver
.as_ref()
.unwrap()
.try_lock()
.expect("receiver locked elsewhere");
// Purge the channel
loop {
match receiver.try_next() {
Ok(Some(_item)) => {
gst_debug!(SRC_CAT, obj: element, "Dropping pending item");
}
Err(_) => {
gst_debug!(SRC_CAT, obj: element, "No more pending item");
break;
}
Ok(None) => {
panic!("Channel sender dropped");
}
}
}
gst_debug!(SRC_CAT, obj: element, "Flushed");
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
if *state == ElementSrcTestState::Started {
@ -317,13 +262,45 @@ impl ElementSrcTest {
gst_debug!(SRC_CAT, obj: element, "Starting");
self.start_unchecked(&mut state);
self.start_task();
*state = ElementSrcTestState::Started;
gst_debug!(SRC_CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self) {
let pad_weak = self.src_pad.downgrade();
let receiver = Arc::clone(self.receiver.lock().unwrap().as_ref().expect("No receiver"));
self.task.start(move || {
let pad_weak = pad_weak.clone();
let receiver = Arc::clone(&receiver);
async move {
let item = receiver.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted");
return glib::Continue(false);
}
};
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
match PadSrcTestHandler::push_item(pad, item).await {
Ok(_) => glib::Continue(true),
Err(gst::FlowError::Flushing) => glib::Continue(false),
Err(err) => panic!("Got error {:?}", err),
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
if *state == ElementSrcTestState::Started {
@ -333,77 +310,34 @@ impl ElementSrcTest {
gst_debug!(SRC_CAT, obj: element, "Stopping Flush");
// Stop it so we wait for it to actually finish
self.src_pad.stop_task();
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.flush(&self.src_pad.as_ref());
// And then start it again
self.start_unchecked(&mut state);
self.flush(element);
self.start_task();
*state = ElementSrcTestState::Started;
gst_debug!(SRC_CAT, obj: element, "Stopped Flush");
}
fn start_unchecked(&self, state: &mut ElementSrcTestState) {
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.start_task(self.src_pad.as_ref());
*state = ElementSrcTestState::Started;
}
fn flush_start(&self, element: &gst::Element) {
// Keep the lock on the `state` until `flush_start` is complete
// so as to prevent race conditions due to concurrent state transitions.
let mut state = self.state.lock().unwrap();
gst_debug!(SRC_CAT, obj: element, "Starting Flush");
self.task.cancel();
*state = ElementSrcTestState::RejectItems;
self.src_pad.cancel_task();
gst_debug!(SRC_CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
// Lock the state to prevent race condition due to concurrent FlushStop
let mut state = self.state.lock().unwrap();
gst_debug!(SRC_CAT, obj: element, "Pausing");
self.src_pad.pause_task();
self.task.pause();
*state = ElementSrcTestState::Paused;
gst_debug!(SRC_CAT, obj: element, "Paused");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(SRC_CAT, obj: element, "Stopping");
*self.state.lock().unwrap() = ElementSrcTestState::RejectItems;
self.src_pad_handler
.lock()
.unwrap()
.as_ref()
.unwrap()
.stop(&self.src_pad.as_ref());
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
}
impl ObjectSubclass for ElementSrcTest {
@ -436,19 +370,16 @@ impl ObjectSubclass for ElementSrcTest {
}
fn new_with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = PadSrc::new_from_template(&templ, Some("src"));
let settings = Settings {
context: String::new(),
};
ElementSrcTest {
src_pad,
src_pad_handler: StdMutex::new(None),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
task: Task::default(),
state: StdMutex::new(ElementSrcTestState::RejectItems),
sender: StdMutex::new(None),
settings: StdMutex::new(settings),
receiver: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}
}
@ -525,9 +456,7 @@ impl ElementImpl for ElementSrcTest {
fn send_event(&self, element: &gst::Element, event: gst::Event) -> bool {
match event.view() {
EventView::FlushStart(..) => {
// Cancel the task so that it finishes ASAP
// and clear the sender
self.pause(element).unwrap();
self.flush_start(element);
}
EventView::FlushStop(..) => {
self.flush_stop(element);
@ -569,16 +498,10 @@ static SINK_PROPERTIES: [glib::subclass::Property; 1] =
)
})];
#[derive(Clone, Debug)]
struct PadSinkHandlerTest;
#[derive(Clone, Debug, Default)]
struct PadSinkTestHandler;
impl Default for PadSinkHandlerTest {
fn default() -> Self {
PadSinkHandlerTest
}
}
impl PadSinkHandler for PadSinkHandlerTest {
impl PadSinkHandler for PadSinkTestHandler {
type ElementImpl = ElementSinkTest;
fn sink_chain(
@ -764,10 +687,10 @@ impl ObjectSubclass for ElementSinkTest {
fn new_with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = PadSink::new_from_template(&templ, Some("sink"));
let gst_pad = gst::Pad::new_from_template(&templ, Some("sink"));
ElementSinkTest {
sink_pad,
sink_pad: PadSink::new(gst_pad),
flushing: AtomicBool::new(true),
sender: FutMutex::new(None),
}
@ -811,7 +734,7 @@ impl ElementImpl for ElementSinkTest {
match transition {
gst::StateChange::NullToReady => {
self.sink_pad.prepare(&PadSinkHandlerTest::default());
self.sink_pad.prepare(&PadSinkTestHandler::default());
}
gst::StateChange::PausedToReady => {
self.stop(element);
@ -1366,6 +1289,19 @@ fn start_flush() {
EventView::Segment(_) => (),
other => panic!("Unexpected event {:?}", other),
},
Item::Buffer(buffer) => {
// In some cases, the first Buffer might be processed before FlushStart
let data = buffer.map_readable().unwrap();
assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice());
match futures::executor::block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
EventView::Segment(_) => (),
other => panic!("Unexpected event {:?}", other),
},
other => panic!("Unexpected item {:?}", other),
}
}
other => panic!("Unexpected item {:?}", other),
}